Skip to content

Commit

Permalink
to #316 fix: add HasSentRowDesc to make sure we only send the Row Des…
Browse files Browse the repository at this point in the history
…c once per statement.
  • Loading branch information
TianyuZhang1214 committed Dec 25, 2024
1 parent f48b083 commit 7fd7bc3
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 5 deletions.
2 changes: 1 addition & 1 deletion compatibility/pg/csharp/PGTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class Tests

public void Connect(string ip, int port, string user, string password)
{
string connectionString = $"Host={ip};Port={port};Username={user};Password={password};Database=postgres;";
string connectionString = $"Host={ip};Port={port};Username={user};Password={password};Database=postgres;Timeout=300;CommandTimeout=600;";
try
{
conn = new NpgsqlConnection(connectionString);
Expand Down
1 change: 1 addition & 0 deletions pgserver/connection_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type ConvertedStatement struct {
AST tree.Statement
Tag string
PgParsable bool
HasSentRowDesc bool
SubscriptionConfig *SubscriptionConfig
BackupConfig *BackupConfig
}
Expand Down
11 changes: 7 additions & 4 deletions pgserver/connection_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ func (h *ConnectionHandler) handleExecute(message *pgproto3.Execute) error {
// |rowsAffected| gets altered by the callback below
rowsAffected := int32(0)

callback := h.spoolRowsCallback(query.Tag, &rowsAffected, true)
callback := h.spoolRowsCallback(query, &rowsAffected, true)
err = h.duckHandler.ComExecuteBound(context.Background(), h.mysqlConn, portalData, callback)
if err != nil {
return err
Expand Down Expand Up @@ -1008,7 +1008,7 @@ func (h *ConnectionHandler) run(statement ConvertedStatement) error {
})
}

callback := h.spoolRowsCallback(statement.Tag, &rowsAffected, false)
callback := h.spoolRowsCallback(statement, &rowsAffected, false)
if err := h.duckHandler.ComQuery(
context.Background(),
h.mysqlConn,
Expand All @@ -1024,20 +1024,23 @@ func (h *ConnectionHandler) run(statement ConvertedStatement) error {

// spoolRowsCallback returns a callback function that will send RowDescription message,
// then a DataRow message for each row in the result set.
func (h *ConnectionHandler) spoolRowsCallback(tag string, rows *int32, isExecute bool) func(res *Result) error {
func (h *ConnectionHandler) spoolRowsCallback(statement ConvertedStatement, rows *int32, isExecute bool) func(res *Result) error {
// IsIUD returns whether the query is either an INSERT, UPDATE, or DELETE query.
tag := statement.Tag
isIUD := tag == "INSERT" || tag == "UPDATE" || tag == "DELETE"
return func(res *Result) error {
logrus.Tracef("spooling %d rows for tag %s (execute = %v)", res.RowsAffected, tag, isExecute)
if returnsRow(tag) {
// EXECUTE does not send RowDescription; instead it should be sent from DESCRIBE prior to it
if !isExecute {
// We only send RowDescription once per statement execution.
if !isExecute && !statement.HasSentRowDesc {
logrus.Tracef("sending RowDescription %+v for tag %s", res.Fields, tag)
if err := h.send(&pgproto3.RowDescription{
Fields: res.Fields,
}); err != nil {
return err
}
statement.HasSentRowDesc = true
}

logrus.Tracef("sending Rows %+v for tag %s", res.Rows, tag)
Expand Down

0 comments on commit 7fd7bc3

Please sign in to comment.