From 906d88e65d04e46f5a51fa7a39a1257fcf4a9575 Mon Sep 17 00:00:00 2001 From: Jakub Pliszka Date: Thu, 28 May 2026 17:00:57 +0100 Subject: [PATCH 1/2] Add GET_LOCK to prevent concurrent migrations of the same table --- go/logic/applier.go | 152 ++++++++++++++++++++++++++++++++++++++ go/logic/applier_test.go | 77 +++++++++++++++++++ go/logic/migrator.go | 3 + go/logic/migrator_test.go | 2 + 4 files changed, 234 insertions(+) diff --git a/go/logic/applier.go b/go/logic/applier.go index b49e131b8..c159bbfb5 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -6,7 +6,9 @@ package logic import ( + "crypto/sha1" gosql "database/sql" + "encoding/hex" "fmt" "reflect" "regexp" @@ -83,6 +85,12 @@ type Applier struct { dmlInsertQueryBuilder *sql.DMLInsertQueryBuilder dmlUpdateQueryBuilder *sql.DMLUpdateQueryBuilder checkpointInsertQueryBuilder *sql.CheckpointInsertQueryBuilder + + migrationLockConn *gosql.Conn + migrationLockDB *gosql.DB + migrationLockName string + migrationLockStop chan struct{} + migrationLockDone chan struct{} } func NewApplier(migrationContext *base.MigrationContext) *Applier { @@ -145,6 +153,149 @@ func (apl *Applier) InitDBConnections() (err error) { return nil } +// buildMigrationLockName returns a deterministic MySQL user-level lock name +// for the given database and table, hashed if longer than MySQL's 64-char limit. +func buildMigrationLockName(db, table string) string { + name := fmt.Sprintf("gh-ost::%s.%s", db, table) + if len(name) <= 64 { + return name + } + sum := sha1.Sum([]byte(name)) + return "gh-ost::" + hex.EncodeToString(sum[:]) +} + +// AcquireMigrationLock takes a user-level lock on a pinned connection, +// preventing two gh-ost processes from migrating the same table concurrently +// on the same MySQL server. +func (apl *Applier) AcquireMigrationLock(ctx context.Context) error { + lockName := buildMigrationLockName(apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName) + + // Use a dedicated *sql.DB so the pinned connection does not consume a + // slot in apl.db's small pool (mysql.MaxDBPoolConnections). + lockURI := apl.connectionConfig.GetDBUri(apl.migrationContext.DatabaseName) + lockDB, err := gosql.Open("mysql", lockURI) + if err != nil { + return fmt.Errorf("failed to open migration lock DB: %w", err) + } + lockDB.SetMaxOpenConns(1) + lockDB.SetMaxIdleConns(1) + + conn, err := lockDB.Conn(ctx) + if err != nil { + lockDB.Close() + return fmt.Errorf("failed to obtain pinned connection for migration lock: %w", err) + } + + var lockResult gosql.NullInt64 + if err := conn.QueryRowContext(ctx, `select /* gh-ost */ get_lock(?, 0)`, lockName).Scan(&lockResult); err != nil { + conn.Close() + lockDB.Close() + return fmt.Errorf("failed to execute GET_LOCK for migration lock %s: %w", lockName, err) + } + + if !lockResult.Valid { + conn.Close() + lockDB.Close() + return fmt.Errorf("GET_LOCK returned NULL while acquiring migration lock %s", lockName) + } + + if lockResult.Int64 != 1 { + var holderID gosql.NullInt64 + _ = conn.QueryRowContext(ctx, `select /* gh-ost */ is_used_lock(?)`, lockName).Scan(&holderID) + conn.Close() + lockDB.Close() + if holderID.Valid { + return fmt.Errorf("another gh-ost process is already migrating `%s`.`%s`: migration lock %s held by connection id %d", + apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName, lockName, holderID.Int64) + } + return fmt.Errorf("another gh-ost process is already migrating `%s`.`%s`: migration lock %s is held", + apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName, lockName) + } + + apl.migrationLockConn = conn + apl.migrationLockDB = lockDB + apl.migrationLockName = lockName + apl.migrationLockStop = make(chan struct{}) + apl.migrationLockDone = make(chan struct{}) + go apl.keepMigrationLockAlive(ctx) + apl.migrationContext.Log.Infof("Acquired migration lock %s", lockName) + return nil +} + +// keepMigrationLockAlive pings the pinned migration-lock connection. If the +// ping fails the lock is considered lost and the migration is aborted via +// PanicAbort. +func (apl *Applier) keepMigrationLockAlive(ctx context.Context) { + defer close(apl.migrationLockDone) + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-apl.migrationLockStop: + return + case <-ticker.C: + } + if err := apl.pingMigrationLockConn(ctx); err != nil { + // Shutdown may have started mid-ping; don't abort if so. + select { + case <-apl.migrationLockStop: + return + default: + } + if ctx.Err() != nil { + return + } + _ = base.SendWithContext(apl.migrationContext.GetContext(), apl.migrationContext.PanicAbort, + fmt.Errorf("migration lock %s connection lost: %w", apl.migrationLockName, err)) + return + } + } +} + +// pingMigrationLockConn pings the pinned connection with a bounded timeout +// and propagates migrationLockStop as an early cancel so a teardown can +// interrupt a stuck ping. +func (apl *Applier) pingMigrationLockConn(parent context.Context) error { + pingCtx, cancel := context.WithTimeout(parent, 10*time.Second) + defer cancel() + done := make(chan struct{}) + defer close(done) + go func() { + select { + case <-apl.migrationLockStop: + cancel() + case <-done: + } + }() + return apl.migrationLockConn.PingContext(pingCtx) +} + +// releaseMigrationLock stops the keepalive goroutine, releases the user-level +// lock and closes the dedicated lock DB. Safe to call when no lock is held. +func (apl *Applier) releaseMigrationLock() { + if apl.migrationLockConn == nil { + return + } + // Stop keepalive before touching the pinned connection. + close(apl.migrationLockStop) + <-apl.migrationLockDone + releaseCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if _, err := apl.migrationLockConn.ExecContext(releaseCtx, `select /* gh-ost */ release_lock(?)`, apl.migrationLockName); err != nil { + apl.migrationContext.Log.Warningf("failed to release migration lock %s: %v", apl.migrationLockName, err) + } + if err := apl.migrationLockConn.Close(); err != nil { + apl.migrationContext.Log.Warningf("failed to close migration lock connection: %v", err) + } + if apl.migrationLockDB != nil { + apl.migrationLockDB.Close() + apl.migrationLockDB = nil + } + apl.migrationLockConn = nil +} + func (apl *Applier) prepareQueries() (err error) { if apl.dmlDeleteQueryBuilder, err = sql.NewDMLDeleteQueryBuilder( apl.migrationContext.DatabaseName, @@ -1734,6 +1885,7 @@ func (apl *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) e func (apl *Applier) Teardown() { apl.migrationContext.Log.Debugf("Tearing down...") + apl.releaseMigrationLock() apl.db.Close() apl.singletonDB.Close() atomic.StoreInt64(&apl.finishedMigrating, 1) diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 6d7ba42f4..85a5a01d3 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -507,6 +507,83 @@ func (suite *ApplierTestSuite) TestValidateOrDropExistingTablesWithGhostTableExi suite.Require().Equal(gosql.ErrNoRows, err) } +func (suite *ApplierTestSuite) TestAcquireMigrationLockSucceedsWhenFree() { + ctx := context.Background() + + _, err := suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT, item_id INT);", getTestTableName())) + suite.Require().NoError(err) + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContext := newTestMigrationContext() + migrationContext.ApplierConnectionConfig = connectionConfig + migrationContext.SetConnectionConfig("innodb") + + applier := NewApplier(migrationContext) + defer applier.Teardown() + + suite.Require().NoError(applier.InitDBConnections()) + suite.Require().NoError(applier.AcquireMigrationLock(ctx)) + suite.Require().NotNil(applier.migrationLockConn) + suite.Require().Equal(buildMigrationLockName(testMysqlDatabase, testMysqlTableName), applier.migrationLockName) +} + +func (suite *ApplierTestSuite) TestAcquireMigrationLockFailsWhenHeld() { + ctx := context.Background() + + _, err := suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT, item_id INT);", getTestTableName())) + suite.Require().NoError(err) + + connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContextA := newTestMigrationContext() + migrationContextA.ApplierConnectionConfig = connectionConfig + migrationContextA.SetConnectionConfig("innodb") + + applierA := NewApplier(migrationContextA) + defer applierA.Teardown() + suite.Require().NoError(applierA.InitDBConnections()) + suite.Require().NoError(applierA.AcquireMigrationLock(ctx)) + + connectionConfigB, err := getTestConnectionConfig(ctx, suite.mysqlContainer) + suite.Require().NoError(err) + + migrationContextB := newTestMigrationContext() + migrationContextB.ApplierConnectionConfig = connectionConfigB + migrationContextB.SetConnectionConfig("innodb") + + applierB := NewApplier(migrationContextB) + defer applierB.Teardown() + suite.Require().NoError(applierB.InitDBConnections()) + + err = applierB.AcquireMigrationLock(ctx) + suite.Require().Error(err) + suite.Require().Contains(err.Error(), "already migrating") + suite.Require().Nil(applierB.migrationLockConn) +} + +func TestBuildMigrationLockName(t *testing.T) { + t.Run("short name is returned verbatim", func(t *testing.T) { + name := buildMigrationLockName("mydb", "mytable") + require.Equal(t, "gh-ost::mydb.mytable", name) + require.LessOrEqual(t, len(name), 64) + }) + + t.Run("long name is hashed and within MySQL limit", func(t *testing.T) { + longDB := strings.Repeat("d", 40) + longTable := strings.Repeat("t", 40) + name := buildMigrationLockName(longDB, longTable) + require.LessOrEqual(t, len(name), 64) + require.True(t, strings.HasPrefix(name, "gh-ost::")) + // deterministic + require.Equal(t, name, buildMigrationLockName(longDB, longTable)) + // distinct inputs produce distinct hashes + require.NotEqual(t, name, buildMigrationLockName(longDB, longTable+"x")) + }) +} + func (suite *ApplierTestSuite) TestCreateGhostTable() { ctx := context.Background() diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 226cf13a7..bec13e594 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1511,6 +1511,9 @@ func (mgtr *Migrator) initiateApplier() error { if err := mgtr.applier.InitDBConnections(); err != nil { return err } + if err := mgtr.applier.AcquireMigrationLock(mgtr.migrationContext.GetContext()); err != nil { + return err + } if mgtr.migrationContext.Revert { if err := mgtr.applier.CreateChangelogTable(); err != nil { mgtr.migrationContext.Log.Errorf("unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out") diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index 95278fc3d..fc8feada1 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -675,6 +675,7 @@ func (suite *MigratorTestSuite) TestCopierIntPK() { migrator := NewMigrator(migrationContext, "0.0.0") suite.Require().NoError(migrator.initiateApplier()) + defer migrator.applier.Teardown() suite.Require().NoError(migrator.applier.prepareQueries()) suite.Require().NoError(migrator.applier.ReadMigrationRangeValues()) @@ -746,6 +747,7 @@ func (suite *MigratorTestSuite) TestCopierCompositePK() { migrator := NewMigrator(migrationContext, "0.0.0") suite.Require().NoError(migrator.initiateApplier()) + defer migrator.applier.Teardown() suite.Require().NoError(migrator.applier.prepareQueries()) suite.Require().NoError(migrator.applier.ReadMigrationRangeValues()) From 910a5ca4771c3cec9a4c3c6d70d9fd43266a3191 Mon Sep 17 00:00:00 2001 From: Jakub Pliszka Date: Thu, 28 May 2026 17:13:44 +0100 Subject: [PATCH 2/2] Fix lint --- go/logic/applier.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index c159bbfb5..f3474b3ef 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -247,7 +247,7 @@ func (apl *Applier) keepMigrationLockAlive(ctx context.Context) { if ctx.Err() != nil { return } - _ = base.SendWithContext(apl.migrationContext.GetContext(), apl.migrationContext.PanicAbort, + _ = base.SendWithContext(ctx, apl.migrationContext.PanicAbort, fmt.Errorf("migration lock %s connection lost: %w", apl.migrationLockName, err)) return }