Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
package logic

import (
"crypto/sha1"
gosql "database/sql"
"encoding/hex"
"fmt"
"reflect"
"regexp"
Expand Down Expand Up @@ -83,6 +85,12 @@ type Applier struct {
dmlInsertQueryBuilder *sql.DMLInsertQueryBuilder
dmlUpdateQueryBuilder *sql.DMLUpdateQueryBuilder
checkpointInsertQueryBuilder *sql.CheckpointInsertQueryBuilder

migrationLockConn *gosql.Conn
migrationLockDB *gosql.DB
migrationLockName string
migrationLockStop chan struct{}
migrationLockDone chan struct{}
}

func NewApplier(migrationContext *base.MigrationContext) *Applier {
Expand Down Expand Up @@ -145,6 +153,149 @@ func (apl *Applier) InitDBConnections() (err error) {
return nil
}

// buildMigrationLockName returns a deterministic MySQL user-level lock name
// for the given database and table, hashed if longer than MySQL's 64-char limit.
func buildMigrationLockName(db, table string) string {
name := fmt.Sprintf("gh-ost::%s.%s", db, table)
if len(name) <= 64 {
return name
}
sum := sha1.Sum([]byte(name))
return "gh-ost::" + hex.EncodeToString(sum[:])
}

// AcquireMigrationLock takes a user-level lock on a pinned connection,
// preventing two gh-ost processes from migrating the same table concurrently
// on the same MySQL server.
func (apl *Applier) AcquireMigrationLock(ctx context.Context) error {
lockName := buildMigrationLockName(apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName)

// Use a dedicated *sql.DB so the pinned connection does not consume a
// slot in apl.db's small pool (mysql.MaxDBPoolConnections).
lockURI := apl.connectionConfig.GetDBUri(apl.migrationContext.DatabaseName)
lockDB, err := gosql.Open("mysql", lockURI)
if err != nil {
return fmt.Errorf("failed to open migration lock DB: %w", err)
}
lockDB.SetMaxOpenConns(1)
lockDB.SetMaxIdleConns(1)

conn, err := lockDB.Conn(ctx)
if err != nil {
lockDB.Close()
return fmt.Errorf("failed to obtain pinned connection for migration lock: %w", err)
}

var lockResult gosql.NullInt64
if err := conn.QueryRowContext(ctx, `select /* gh-ost */ get_lock(?, 0)`, lockName).Scan(&lockResult); err != nil {
conn.Close()
lockDB.Close()
return fmt.Errorf("failed to execute GET_LOCK for migration lock %s: %w", lockName, err)
}

if !lockResult.Valid {
conn.Close()
lockDB.Close()
return fmt.Errorf("GET_LOCK returned NULL while acquiring migration lock %s", lockName)
}

if lockResult.Int64 != 1 {
var holderID gosql.NullInt64
_ = conn.QueryRowContext(ctx, `select /* gh-ost */ is_used_lock(?)`, lockName).Scan(&holderID)
conn.Close()
lockDB.Close()
if holderID.Valid {
return fmt.Errorf("another gh-ost process is already migrating `%s`.`%s`: migration lock %s held by connection id %d",
apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName, lockName, holderID.Int64)
}
return fmt.Errorf("another gh-ost process is already migrating `%s`.`%s`: migration lock %s is held",
apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName, lockName)
}

apl.migrationLockConn = conn
apl.migrationLockDB = lockDB
apl.migrationLockName = lockName
apl.migrationLockStop = make(chan struct{})
apl.migrationLockDone = make(chan struct{})
go apl.keepMigrationLockAlive(ctx)
apl.migrationContext.Log.Infof("Acquired migration lock %s", lockName)
return nil
}

// keepMigrationLockAlive pings the pinned migration-lock connection. If the
// ping fails the lock is considered lost and the migration is aborted via
// PanicAbort.
func (apl *Applier) keepMigrationLockAlive(ctx context.Context) {
defer close(apl.migrationLockDone)
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-apl.migrationLockStop:
return
case <-ticker.C:
}
if err := apl.pingMigrationLockConn(ctx); err != nil {
// Shutdown may have started mid-ping; don't abort if so.
select {
case <-apl.migrationLockStop:
return
default:
}
if ctx.Err() != nil {
return
}
_ = base.SendWithContext(ctx, apl.migrationContext.PanicAbort,
fmt.Errorf("migration lock %s connection lost: %w", apl.migrationLockName, err))
return
}
}
}

// pingMigrationLockConn pings the pinned connection with a bounded timeout
// and propagates migrationLockStop as an early cancel so a teardown can
// interrupt a stuck ping.
func (apl *Applier) pingMigrationLockConn(parent context.Context) error {
pingCtx, cancel := context.WithTimeout(parent, 10*time.Second)
defer cancel()
done := make(chan struct{})
defer close(done)
go func() {
select {
case <-apl.migrationLockStop:
cancel()
case <-done:
}
}()
return apl.migrationLockConn.PingContext(pingCtx)
}

// releaseMigrationLock stops the keepalive goroutine, releases the user-level
// lock and closes the dedicated lock DB. Safe to call when no lock is held.
func (apl *Applier) releaseMigrationLock() {
if apl.migrationLockConn == nil {
return
}
// Stop keepalive before touching the pinned connection.
close(apl.migrationLockStop)
<-apl.migrationLockDone
releaseCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if _, err := apl.migrationLockConn.ExecContext(releaseCtx, `select /* gh-ost */ release_lock(?)`, apl.migrationLockName); err != nil {
apl.migrationContext.Log.Warningf("failed to release migration lock %s: %v", apl.migrationLockName, err)
}
if err := apl.migrationLockConn.Close(); err != nil {
apl.migrationContext.Log.Warningf("failed to close migration lock connection: %v", err)
}
if apl.migrationLockDB != nil {
apl.migrationLockDB.Close()
apl.migrationLockDB = nil
}
apl.migrationLockConn = nil
}

func (apl *Applier) prepareQueries() (err error) {
if apl.dmlDeleteQueryBuilder, err = sql.NewDMLDeleteQueryBuilder(
apl.migrationContext.DatabaseName,
Expand Down Expand Up @@ -1734,6 +1885,7 @@ func (apl *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) e

func (apl *Applier) Teardown() {
apl.migrationContext.Log.Debugf("Tearing down...")
apl.releaseMigrationLock()
apl.db.Close()
apl.singletonDB.Close()
atomic.StoreInt64(&apl.finishedMigrating, 1)
Expand Down
77 changes: 77 additions & 0 deletions go/logic/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,83 @@ func (suite *ApplierTestSuite) TestValidateOrDropExistingTablesWithGhostTableExi
suite.Require().Equal(gosql.ErrNoRows, err)
}

func (suite *ApplierTestSuite) TestAcquireMigrationLockSucceedsWhenFree() {
ctx := context.Background()

_, err := suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT, item_id INT);", getTestTableName()))
suite.Require().NoError(err)

connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
suite.Require().NoError(err)

migrationContext := newTestMigrationContext()
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.SetConnectionConfig("innodb")

applier := NewApplier(migrationContext)
defer applier.Teardown()

suite.Require().NoError(applier.InitDBConnections())
suite.Require().NoError(applier.AcquireMigrationLock(ctx))
suite.Require().NotNil(applier.migrationLockConn)
suite.Require().Equal(buildMigrationLockName(testMysqlDatabase, testMysqlTableName), applier.migrationLockName)
}

func (suite *ApplierTestSuite) TestAcquireMigrationLockFailsWhenHeld() {
ctx := context.Background()

_, err := suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id INT, item_id INT);", getTestTableName()))
suite.Require().NoError(err)

connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
suite.Require().NoError(err)

migrationContextA := newTestMigrationContext()
migrationContextA.ApplierConnectionConfig = connectionConfig
migrationContextA.SetConnectionConfig("innodb")

applierA := NewApplier(migrationContextA)
defer applierA.Teardown()
suite.Require().NoError(applierA.InitDBConnections())
suite.Require().NoError(applierA.AcquireMigrationLock(ctx))

connectionConfigB, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
suite.Require().NoError(err)

migrationContextB := newTestMigrationContext()
migrationContextB.ApplierConnectionConfig = connectionConfigB
migrationContextB.SetConnectionConfig("innodb")

applierB := NewApplier(migrationContextB)
defer applierB.Teardown()
suite.Require().NoError(applierB.InitDBConnections())

err = applierB.AcquireMigrationLock(ctx)
suite.Require().Error(err)
suite.Require().Contains(err.Error(), "already migrating")
suite.Require().Nil(applierB.migrationLockConn)
}

func TestBuildMigrationLockName(t *testing.T) {
t.Run("short name is returned verbatim", func(t *testing.T) {
name := buildMigrationLockName("mydb", "mytable")
require.Equal(t, "gh-ost::mydb.mytable", name)
require.LessOrEqual(t, len(name), 64)
})

t.Run("long name is hashed and within MySQL limit", func(t *testing.T) {
longDB := strings.Repeat("d", 40)
longTable := strings.Repeat("t", 40)
name := buildMigrationLockName(longDB, longTable)
require.LessOrEqual(t, len(name), 64)
require.True(t, strings.HasPrefix(name, "gh-ost::"))
// deterministic
require.Equal(t, name, buildMigrationLockName(longDB, longTable))
// distinct inputs produce distinct hashes
require.NotEqual(t, name, buildMigrationLockName(longDB, longTable+"x"))
})
}

func (suite *ApplierTestSuite) TestCreateGhostTable() {
ctx := context.Background()

Expand Down
3 changes: 3 additions & 0 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1511,6 +1511,9 @@ func (mgtr *Migrator) initiateApplier() error {
if err := mgtr.applier.InitDBConnections(); err != nil {
return err
}
if err := mgtr.applier.AcquireMigrationLock(mgtr.migrationContext.GetContext()); err != nil {
return err
}
if mgtr.migrationContext.Revert {
if err := mgtr.applier.CreateChangelogTable(); err != nil {
mgtr.migrationContext.Log.Errorf("unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out")
Expand Down
2 changes: 2 additions & 0 deletions go/logic/migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,7 @@ func (suite *MigratorTestSuite) TestCopierIntPK() {

migrator := NewMigrator(migrationContext, "0.0.0")
suite.Require().NoError(migrator.initiateApplier())
defer migrator.applier.Teardown()
suite.Require().NoError(migrator.applier.prepareQueries())
suite.Require().NoError(migrator.applier.ReadMigrationRangeValues())

Expand Down Expand Up @@ -746,6 +747,7 @@ func (suite *MigratorTestSuite) TestCopierCompositePK() {

migrator := NewMigrator(migrationContext, "0.0.0")
suite.Require().NoError(migrator.initiateApplier())
defer migrator.applier.Teardown()
suite.Require().NoError(migrator.applier.prepareQueries())
suite.Require().NoError(migrator.applier.ReadMigrationRangeValues())

Expand Down
Loading