Skip to content

Commit

Permalink
fix: use the correct GTID flavor when replicating from MariaDB (#314)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanyang01 authored Dec 24, 2024
1 parent cec8654 commit c23ff9a
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 21 deletions.
62 changes: 41 additions & 21 deletions binlogreplication/binlog_replica_applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"errors"
"fmt"
"io"
"regexp"
"strconv"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -54,9 +53,6 @@ const (
ERFatalReplicaError = 13117
)

// Match any strings starting with "ON" (case insensitive)
var gtidModeIsOnRegex = regexp.MustCompile(`(?i)^ON$`)

type tableIdentifier struct {
dbName, tableName string
}
Expand Down Expand Up @@ -127,27 +123,46 @@ func (a *binlogReplicaApplier) IsRunning() bool {
}

// This function will connect to the MySQL server and check the GTID_MODE.
func connAndCheckGtidModeEnabled(ctx *sql.Context, params mysql.ConnParams) (bool, error) {
func detectVersionAndGTIDMode(ctx *sql.Context, params mysql.ConnParams) (mariaDB, gtidMode bool, err error) {
conn, err := mysql.Connect(ctx, &params)
if err != nil {
return false, err
return false, false, err
}
defer conn.Close()

var qr *sqltypes.Result
qr, err = conn.ExecuteFetch("SELECT @@GLOBAL.GTID_MODE", 1, true)
qr, err = conn.ExecuteFetch("SELECT VERSION()", 1, true)
if err != nil {
// Maybe it's a MariaDB server, try to get the GTID_STRICT_MODE instead
return false, false, fmt.Errorf("failed to check MySQL version: %w", err)
}
if len(qr.Rows) == 0 {
return false, false, errors.New("no rows returned when checking MySQL version")
}
version := string(qr.Rows[0][0].Raw())

mariaDB = strings.Contains(version, "MariaDB")
if mariaDB {
qr, err = conn.ExecuteFetch("SELECT @@GLOBAL.GTID_STRICT_MODE", 1, true)
if err != nil {
return false, fmt.Errorf("error checking GTID_MODE: %v", err)
return mariaDB, false, fmt.Errorf("failed to check GTID_STRICT_MODE: %w", err)
}
} else {
qr, err = conn.ExecuteFetch("SELECT @@GLOBAL.GTID_MODE", 1, true)
if err != nil {
return mariaDB, false, fmt.Errorf("failed to check GTID_MODE: %w", err)
}
}
if len(qr.Rows) == 0 {
return false, fmt.Errorf("no rows returned when checking GTID_MODE")
return mariaDB, false, errors.New("no rows returned when checking GTID_MODE")
}
gtidMode := string(qr.Rows[0][0].Raw())
return gtidModeIsOnRegex.MatchString(gtidMode), nil

gtidMode, err = qr.Rows[0][0].ToBool()
if err != nil {
gtidMode = strings.EqualFold(string(qr.Rows[0][0].Raw()), "ON") ||
string(qr.Rows[0][0].Raw()) == "1"
}

return mariaDB, gtidMode, nil
}

// connectAndStartReplicationEventStream connects to the configured MySQL replication source, including pausing
Expand All @@ -162,10 +177,13 @@ func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Co
connectRetryDelay = status.ConnectRetry
})

var conn *mysql.Conn
var err error
gtidModeEnabled := false
flavorName := ""
var (
conn *mysql.Conn
err error
gtidMode = false
mariaDB = false
flavorName = ""
)
for connectionAttempts := uint64(0); ; connectionAttempts++ {
replicaSourceInfo, err := loadReplicationConfiguration(ctx, a.engine.Analyzer.Catalog.MySQLDb)

Expand Down Expand Up @@ -193,13 +211,15 @@ func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Co
ConnectTimeoutMs: 4_000,
}

gtidModeEnabled, err = connAndCheckGtidModeEnabled(ctx, connParams)
mariaDB, gtidMode, err = detectVersionAndGTIDMode(ctx, connParams)
if err != nil && connectionAttempts >= maxConnectionAttempts {
return nil, err
}

if !gtidModeEnabled {
if !gtidMode {
flavorName = replication.FilePosFlavorID
} else if mariaDB {
flavorName = replication.MariadbFlavorID
} else {
flavorName = replication.Mysql56FlavorID
}
Expand Down Expand Up @@ -232,7 +252,7 @@ func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Co

// Request binlog events to start
// TODO: This should also have retry logic
err = a.startReplicationEventStream(ctx, conn, gtidModeEnabled, flavorName)
err = a.startReplicationEventStream(ctx, conn, gtidMode, flavorName)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -316,14 +336,14 @@ func (a *binlogReplicaApplier) loadLogFilePosition(ctx *sql.Context, positionSto

// startReplicationEventStream sends a request over |conn|, the connection to the MySQL source server, to begin
// sending binlog events.
func (a *binlogReplicaApplier) startReplicationEventStream(ctx *sql.Context, conn *mysql.Conn, gtidModeEnabled bool, flavorName string) error {
func (a *binlogReplicaApplier) startReplicationEventStream(ctx *sql.Context, conn *mysql.Conn, gtidMode bool, flavorName string) error {
serverId, err := loadReplicaServerId()
if err != nil {
return err
}

var position replication.Position
if gtidModeEnabled {
if gtidMode {
position, err = a.loadGtidPosition(ctx, positionStore, flavorName)
if err != nil {
return err
Expand Down
21 changes: 21 additions & 0 deletions binlogreplication/binlog_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,14 @@ func TestResetReplica(t *testing.T) {
require.Equal(t, "No", status["Replica_IO_Running"])
require.Equal(t, "No", status["Replica_SQL_Running"])

// Now try querying the status using the older, deprecated 'show slave status' statement
// and spot check that the data is the same, but the column names have changed
status = querySlaveStatus(t)
require.Equal(t, "", status["Master_Host"])
require.Equal(t, "", status["Master_User"])
require.Equal(t, "No", status["Slave_IO_Running"])
require.Equal(t, "No", status["Slave_SQL_Running"])

rows, err = replicaDatabase.Queryx("select * from mysql.slave_master_info;")
require.NoError(t, err)
require.False(t, rows.Next())
Expand Down Expand Up @@ -953,6 +961,19 @@ func queryReplicaStatus(t *testing.T) map[string]any {
return status
}

// querySlaveStatus returns the results of `SHOW SLAVE STATUS` as a map, for the replica
// database. If any errors are encountered, this function will fail the current test.
// The queryReplicaStatus() function should generally be favored over this function for
// getting the status of a replica. This function exists only to help test that the
// deprecated 'show slave status' statement works.
func querySlaveStatus(t *testing.T) map[string]any {
rows, err := replicaDatabase.Queryx("SHOW SLAVE STATUS;")
require.NoError(t, err)
status := convertMapScanResultToStrings(readNextRow(t, rows))
require.NoError(t, rows.Close())
return status
}

// mustListDatabases returns a string slice of the databases (i.e. schemas) available on the specified |db|. If
// any errors are encountered, this function will fail the current test.
func mustListDatabases(t *testing.T, db *sqlx.DB) []string {
Expand Down

0 comments on commit c23ff9a

Please sign in to comment.