From 326fdc5a3285511a601c410461e2ba95a45b3e7c Mon Sep 17 00:00:00 2001 From: Jan Grodowski Date: Wed, 20 May 2026 10:42:34 +0200 Subject: [PATCH 1/2] Fix resume data loss: route heartbeat coords through applyEventsQueue onChangelogHeartbeatEvent was mutating applier.CurrentCoordinates directly from the streamer goroutine, before any DML that preceded the heartbeat was applied to the ghost table. The checkpoint loop reads CurrentCoordinates as "applied through this GTID" and could persist a checkpoint whose LastTrxCoords was ahead of what was actually applied. If gh-ost crashed before applyEventsQueue drained, --resume read that checkpoint and called StartSyncGTID with the persisted set; MySQL treated the un-applied GTIDs as already-seen and never re-streamed them. The ghost table silently lost those DMLs and cut-over produced a stale table. Fix: enqueue a tableWriteFunc onto applyEventsQueue that performs the coords bump. The apply goroutine executes it in order, after the DMLs the streamer enqueued before the heartbeat, restoring the invariant. Adds TestMigratorHeartbeatDoesNotAdvancePastUnappliedDML, which fails at the previous HEAD and passes after the fix; also asserts queue ordering to guard against future changes that wrap the heartbeat enqueue in a goroutine. Co-authored-by: Bastian Bartmann --- go/logic/migrator.go | 13 +++++-- go/logic/migrator_test.go | 75 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+), 3 deletions(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 90fa8c509..67f35c288 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -336,13 +336,20 @@ func (mgtr *Migrator) onChangelogHeartbeatEvent(dmlEntry *binlog.BinlogEntry) (e heartbeatTime, err := time.Parse(time.RFC3339Nano, changelogHeartbeatString) if err != nil { return mgtr.migrationContext.Log.Errore(err) - } else { - mgtr.migrationContext.SetLastHeartbeatOnChangelogTime(heartbeatTime) + } + mgtr.migrationContext.SetLastHeartbeatOnChangelogTime(heartbeatTime) + + // Route the coords bump through applyEventsQueue so it is ordered after + // any DMLs the streamer enqueued before this heartbeat. + coords := dmlEntry.Coordinates + var writeFunc tableWriteFunc = func() error { mgtr.applier.CurrentCoordinatesMutex.Lock() - mgtr.applier.CurrentCoordinates = dmlEntry.Coordinates + mgtr.applier.CurrentCoordinates = coords mgtr.applier.CurrentCoordinatesMutex.Unlock() return nil } + mgtr.applyEventsQueue <- newApplyEventStructByFunc(&writeFunc) + return nil } // abort stores the error, cancels the context, and logs the abort. diff --git a/go/logic/migrator_test.go b/go/logic/migrator_test.go index 8fc48e326..95278fc3d 100644 --- a/go/logic/migrator_test.go +++ b/go/logic/migrator_test.go @@ -194,6 +194,81 @@ func TestMigratorOnChangelogEvent(t *testing.T) { }) } +// Regression: heartbeats must not advance applier.CurrentCoordinates past +// DMLs still sitting in applyEventsQueue. If they do, checkpointLoop will +// persist a GTID set that includes un-applied transactions, and resume via +// StartSyncGTID will skip them (the server treats them as already-seen). +func TestMigratorHeartbeatDoesNotAdvancePastUnappliedDML(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrationContext.UseGTIDs = true + migrator := NewMigrator(migrationContext, "test") + migrator.applier = NewApplier(migrationContext) + + const srcUUID = "00000000-0000-0000-0000-000000000001" + + // A DML on the original table at GTID :100 is observed and enqueued, but + // not yet applied. + dmlCoords, err := mysql.NewGTIDBinlogCoordinates(srcUUID + ":1-100") + require.NoError(t, err) + migrator.applyEventsQueue <- newApplyEventStructByDML(&binlog.BinlogEntry{ + DmlEvent: &binlog.BinlogDMLEvent{ + DatabaseName: "test", + TableName: migrationContext.OriginalTableName, + DML: binlog.UpdateDML, + }, + Coordinates: dmlCoords, + }) + require.Equal(t, 1, len(migrator.applyEventsQueue), + "DML must be sitting un-applied in the queue") + + // A heartbeat row is then written; its GTID set includes the un-applied + // DML plus a few additional transactions. + heartbeatCoords, err := mysql.NewGTIDBinlogCoordinates(srcUUID + ":1-105") + require.NoError(t, err) + heartbeatColumnValues := sql.ToColumnValues([]interface{}{ + 123, + time.Now().Unix(), + "heartbeat", + time.Now().Format(time.RFC3339Nano), + }) + require.NoError(t, migrator.onChangelogHeartbeatEvent(&binlog.BinlogEntry{ + DmlEvent: &binlog.BinlogDMLEvent{ + DatabaseName: "test", + DML: binlog.InsertDML, + NewColumnValues: heartbeatColumnValues, + }, + Coordinates: heartbeatCoords, + })) + + // The DML is still un-applied; the heartbeat's coords-bump sentinel has + // been enqueued behind it. + require.Equal(t, 2, len(migrator.applyEventsQueue), + "queue must hold the un-applied DML and the heartbeat sentinel; "+ + "this test does not drain the queue") + + // Invariant: CurrentCoordinates must NOT have advanced past the queued DML. + currentCoords := migrator.applier.CurrentCoordinates + require.False(t, currentCoords != nil && dmlCoords.SmallerThanOrEquals(currentCoords), + "CurrentCoordinates must not cover the un-applied DML at %s (got %v)", + dmlCoords.DisplayString(), currentCoords) + + // Consequence: the checkpoint gate in Migrator.Checkpoint must NOT fire + // for streamer coords that include the un-applied DML. + require.False(t, currentCoords != nil && heartbeatCoords.SmallerThanOrEquals(currentCoords), + "checkpoint gate must not fire while DML at %s is un-applied", + dmlCoords.DisplayString()) + + // Ordering: the DML must come first, then the heartbeat sentinel. If a + // future change ever wraps the heartbeat enqueue in `go func()`, this + // invariant breaks and the bug returns. + firstQueued := <-migrator.applyEventsQueue + secondQueued := <-migrator.applyEventsQueue + require.NotNil(t, firstQueued.dmlEvent, "first queued event must be the DML") + require.Nil(t, firstQueued.writeFunc, "first queued event must not be a sentinel") + require.Nil(t, secondQueued.dmlEvent, "second queued event must not be a DML") + require.NotNil(t, secondQueued.writeFunc, "second queued event must be the heartbeat sentinel") +} + func TestMigratorValidateStatement(t *testing.T) { t.Run("add-column", func(t *testing.T) { migrationContext := base.NewMigrationContext() From ce6455017ec13e6c313f10027c5dced7afa34c1b Mon Sep 17 00:00:00 2001 From: Jan Grodowski Date: Thu, 21 May 2026 10:44:25 +0200 Subject: [PATCH 2/2] Replace direct channel write with SendWithContext Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- go/logic/migrator.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 67f35c288..226cf13a7 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -348,7 +348,13 @@ func (mgtr *Migrator) onChangelogHeartbeatEvent(dmlEntry *binlog.BinlogEntry) (e mgtr.applier.CurrentCoordinatesMutex.Unlock() return nil } - mgtr.applyEventsQueue <- newApplyEventStructByFunc(&writeFunc) + if err := base.SendWithContext( + mgtr.migrationContext.GetContext(), + mgtr.applyEventsQueue, + newApplyEventStructByFunc(&writeFunc), + ); err != nil { + return mgtr.migrationContext.Log.Errore(err) + } return nil }