Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use the correct GTID flavor when replicating from MariaDB #314

Merged
merged 1 commit into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 41 additions & 21 deletions binlogreplication/binlog_replica_applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"errors"
"fmt"
"io"
"regexp"
"strconv"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -54,9 +53,6 @@ const (
ERFatalReplicaError = 13117
)

// Match any strings starting with "ON" (case insensitive)
var gtidModeIsOnRegex = regexp.MustCompile(`(?i)^ON$`)

type tableIdentifier struct {
dbName, tableName string
}
Expand Down Expand Up @@ -127,27 +123,46 @@ func (a *binlogReplicaApplier) IsRunning() bool {
}

// This function will connect to the MySQL server and check the GTID_MODE.
func connAndCheckGtidModeEnabled(ctx *sql.Context, params mysql.ConnParams) (bool, error) {
func detectVersionAndGTIDMode(ctx *sql.Context, params mysql.ConnParams) (mariaDB, gtidMode bool, err error) {
conn, err := mysql.Connect(ctx, &params)
if err != nil {
return false, err
return false, false, err
}
defer conn.Close()

var qr *sqltypes.Result
qr, err = conn.ExecuteFetch("SELECT @@GLOBAL.GTID_MODE", 1, true)
qr, err = conn.ExecuteFetch("SELECT VERSION()", 1, true)
if err != nil {
// Maybe it's a MariaDB server, try to get the GTID_STRICT_MODE instead
return false, false, fmt.Errorf("failed to check MySQL version: %w", err)
}
if len(qr.Rows) == 0 {
return false, false, errors.New("no rows returned when checking MySQL version")
}
version := string(qr.Rows[0][0].Raw())

mariaDB = strings.Contains(version, "MariaDB")
if mariaDB {
qr, err = conn.ExecuteFetch("SELECT @@GLOBAL.GTID_STRICT_MODE", 1, true)
if err != nil {
return false, fmt.Errorf("error checking GTID_MODE: %v", err)
return mariaDB, false, fmt.Errorf("failed to check GTID_STRICT_MODE: %w", err)
}
} else {
qr, err = conn.ExecuteFetch("SELECT @@GLOBAL.GTID_MODE", 1, true)
if err != nil {
return mariaDB, false, fmt.Errorf("failed to check GTID_MODE: %w", err)
}
}
if len(qr.Rows) == 0 {
return false, fmt.Errorf("no rows returned when checking GTID_MODE")
return mariaDB, false, errors.New("no rows returned when checking GTID_MODE")
}
gtidMode := string(qr.Rows[0][0].Raw())
return gtidModeIsOnRegex.MatchString(gtidMode), nil

gtidMode, err = qr.Rows[0][0].ToBool()
if err != nil {
gtidMode = strings.EqualFold(string(qr.Rows[0][0].Raw()), "ON") ||
string(qr.Rows[0][0].Raw()) == "1"
}

return mariaDB, gtidMode, nil
}

// connectAndStartReplicationEventStream connects to the configured MySQL replication source, including pausing
Expand All @@ -162,10 +177,13 @@ func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Co
connectRetryDelay = status.ConnectRetry
})

var conn *mysql.Conn
var err error
gtidModeEnabled := false
flavorName := ""
var (
conn *mysql.Conn
err error
gtidMode = false
mariaDB = false
flavorName = ""
)
for connectionAttempts := uint64(0); ; connectionAttempts++ {
replicaSourceInfo, err := loadReplicationConfiguration(ctx, a.engine.Analyzer.Catalog.MySQLDb)

Expand Down Expand Up @@ -193,13 +211,15 @@ func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Co
ConnectTimeoutMs: 4_000,
}

gtidModeEnabled, err = connAndCheckGtidModeEnabled(ctx, connParams)
mariaDB, gtidMode, err = detectVersionAndGTIDMode(ctx, connParams)
if err != nil && connectionAttempts >= maxConnectionAttempts {
return nil, err
}

if !gtidModeEnabled {
if !gtidMode {
flavorName = replication.FilePosFlavorID
} else if mariaDB {
flavorName = replication.MariadbFlavorID
} else {
flavorName = replication.Mysql56FlavorID
}
Expand Down Expand Up @@ -232,7 +252,7 @@ func (a *binlogReplicaApplier) connectAndStartReplicationEventStream(ctx *sql.Co

// Request binlog events to start
// TODO: This should also have retry logic
err = a.startReplicationEventStream(ctx, conn, gtidModeEnabled, flavorName)
err = a.startReplicationEventStream(ctx, conn, gtidMode, flavorName)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -316,14 +336,14 @@ func (a *binlogReplicaApplier) loadLogFilePosition(ctx *sql.Context, positionSto

// startReplicationEventStream sends a request over |conn|, the connection to the MySQL source server, to begin
// sending binlog events.
func (a *binlogReplicaApplier) startReplicationEventStream(ctx *sql.Context, conn *mysql.Conn, gtidModeEnabled bool, flavorName string) error {
func (a *binlogReplicaApplier) startReplicationEventStream(ctx *sql.Context, conn *mysql.Conn, gtidMode bool, flavorName string) error {
serverId, err := loadReplicaServerId()
if err != nil {
return err
}

var position replication.Position
if gtidModeEnabled {
if gtidMode {
position, err = a.loadGtidPosition(ctx, positionStore, flavorName)
if err != nil {
return err
Expand Down
21 changes: 21 additions & 0 deletions binlogreplication/binlog_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,14 @@ func TestResetReplica(t *testing.T) {
require.Equal(t, "No", status["Replica_IO_Running"])
require.Equal(t, "No", status["Replica_SQL_Running"])

// Now try querying the status using the older, deprecated 'show slave status' statement
// and spot check that the data is the same, but the column names have changed
status = querySlaveStatus(t)
require.Equal(t, "", status["Master_Host"])
require.Equal(t, "", status["Master_User"])
require.Equal(t, "No", status["Slave_IO_Running"])
require.Equal(t, "No", status["Slave_SQL_Running"])

rows, err = replicaDatabase.Queryx("select * from mysql.slave_master_info;")
require.NoError(t, err)
require.False(t, rows.Next())
Expand Down Expand Up @@ -953,6 +961,19 @@ func queryReplicaStatus(t *testing.T) map[string]any {
return status
}

// querySlaveStatus returns the results of `SHOW SLAVE STATUS` as a map, for the replica
// database. If any errors are encountered, this function will fail the current test.
// The queryReplicaStatus() function should generally be favored over this function for
// getting the status of a replica. This function exists only to help test that the
// deprecated 'show slave status' statement works.
func querySlaveStatus(t *testing.T) map[string]any {
rows, err := replicaDatabase.Queryx("SHOW SLAVE STATUS;")
require.NoError(t, err)
status := convertMapScanResultToStrings(readNextRow(t, rows))
require.NoError(t, rows.Close())
return status
}

// mustListDatabases returns a string slice of the databases (i.e. schemas) available on the specified |db|. If
// any errors are encountered, this function will fail the current test.
func mustListDatabases(t *testing.T, db *sqlx.DB) []string {
Expand Down
Loading