From 7fd7bc38a8ca1d9d34ee02356537a2e244d17e3c Mon Sep 17 00:00:00 2001 From: TianyuZhang1214 Date: Wed, 25 Dec 2024 14:45:24 +0800 Subject: [PATCH] to #316 fix: add HasSentRowDesc to make sure we only send the Row Desc once per statement. --- compatibility/pg/csharp/PGTest.cs | 2 +- pgserver/connection_data.go | 1 + pgserver/connection_handler.go | 11 +++++++---- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/compatibility/pg/csharp/PGTest.cs b/compatibility/pg/csharp/PGTest.cs index cb18adc5..5ed6b3e6 100644 --- a/compatibility/pg/csharp/PGTest.cs +++ b/compatibility/pg/csharp/PGTest.cs @@ -14,7 +14,7 @@ public class Tests public void Connect(string ip, int port, string user, string password) { - string connectionString = $"Host={ip};Port={port};Username={user};Password={password};Database=postgres;"; + string connectionString = $"Host={ip};Port={port};Username={user};Password={password};Database=postgres;Timeout=300;CommandTimeout=600;"; try { conn = new NpgsqlConnection(connectionString); diff --git a/pgserver/connection_data.go b/pgserver/connection_data.go index d9864557..61006bd2 100644 --- a/pgserver/connection_data.go +++ b/pgserver/connection_data.go @@ -57,6 +57,7 @@ type ConvertedStatement struct { AST tree.Statement Tag string PgParsable bool + HasSentRowDesc bool SubscriptionConfig *SubscriptionConfig BackupConfig *BackupConfig } diff --git a/pgserver/connection_handler.go b/pgserver/connection_handler.go index b82f3504..e6a088a1 100644 --- a/pgserver/connection_handler.go +++ b/pgserver/connection_handler.go @@ -752,7 +752,7 @@ func (h *ConnectionHandler) handleExecute(message *pgproto3.Execute) error { // |rowsAffected| gets altered by the callback below rowsAffected := int32(0) - callback := h.spoolRowsCallback(query.Tag, &rowsAffected, true) + callback := h.spoolRowsCallback(query, &rowsAffected, true) err = h.duckHandler.ComExecuteBound(context.Background(), h.mysqlConn, portalData, callback) if err != nil { return err @@ -1008,7 +1008,7 @@ func (h *ConnectionHandler) run(statement ConvertedStatement) error { }) } - callback := h.spoolRowsCallback(statement.Tag, &rowsAffected, false) + callback := h.spoolRowsCallback(statement, &rowsAffected, false) if err := h.duckHandler.ComQuery( context.Background(), h.mysqlConn, @@ -1024,20 +1024,23 @@ func (h *ConnectionHandler) run(statement ConvertedStatement) error { // spoolRowsCallback returns a callback function that will send RowDescription message, // then a DataRow message for each row in the result set. -func (h *ConnectionHandler) spoolRowsCallback(tag string, rows *int32, isExecute bool) func(res *Result) error { +func (h *ConnectionHandler) spoolRowsCallback(statement ConvertedStatement, rows *int32, isExecute bool) func(res *Result) error { // IsIUD returns whether the query is either an INSERT, UPDATE, or DELETE query. + tag := statement.Tag isIUD := tag == "INSERT" || tag == "UPDATE" || tag == "DELETE" return func(res *Result) error { logrus.Tracef("spooling %d rows for tag %s (execute = %v)", res.RowsAffected, tag, isExecute) if returnsRow(tag) { // EXECUTE does not send RowDescription; instead it should be sent from DESCRIBE prior to it - if !isExecute { + // We only send RowDescription once per statement execution. + if !isExecute && !statement.HasSentRowDesc { logrus.Tracef("sending RowDescription %+v for tag %s", res.Fields, tag) if err := h.send(&pgproto3.RowDescription{ Fields: res.Fields, }); err != nil { return err } + statement.HasSentRowDesc = true } logrus.Tracef("sending Rows %+v for tag %s", res.Rows, tag)