@@ -20,8 +20,6 @@ import (
2020 "github.com/github/gh-ost/go/binlog"
2121 "github.com/github/gh-ost/go/mysql"
2222 "github.com/github/gh-ost/go/sql"
23-
24- "github.com/openark/golib/sqlutils"
2523)
2624
2725var (
3028 RetrySleepFn = time .Sleep
3129 checkpointTimeout = 2 * time .Second
3230
33- // moveTablesCutOverDrainTimeout caps how long T3 of the move-tables
34- // cooperative cutover (#8209) waits for the applier to catch up to the
35- // drain GTID captured at T2. 60s is a starting default chosen over
36- // CutOverLockTimeoutSeconds (which is capped at 1-10s by
37- // SetCutOverLockTimeoutSeconds and would be too short for a real drain
38- // under load). Up for review with Daniel/Eric in the PR.
39- moveTablesCutOverDrainTimeout = 60 * time .Second
4031 // moveTablesCutOverDrainPollInterval is the per-iteration sleep in T3's
4132 // drain poll. 100ms per move_table_mode.md §1.5.
4233 moveTablesCutOverDrainPollInterval = 100 * time .Millisecond
@@ -851,6 +842,15 @@ func (mgtr *Migrator) MoveTables() (err error) {
851842 if err := mgtr .initiateApplier (); err != nil {
852843 return err
853844 }
845+ if err := mgtr .checkAbort (); err != nil {
846+ return err
847+ }
848+ if err := mgtr .createFlagFiles (); err != nil {
849+ return err
850+ }
851+ if err := mgtr .checkAbort (); err != nil {
852+ return err
853+ }
854854 if err := mgtr .initiateStreaming (); err != nil {
855855 return err
856856 }
@@ -985,29 +985,38 @@ func (mgtr *Migrator) moveTablesCutOver() (err error) {
985985 return fmt .Errorf ("on-before-cut-over hook failed: %w" , err )
986986 }
987987
988- // ----- T1: RENAME source table -----
989- // Issued on mgtr.inspector.db (the privileged source connection per
990- // move_table_mode.md §1.5). No retry: RENAME is not idempotent — a partial
991- // success leaves the table already renamed and a retry would fail. The
992- // operator re-runs the whole hook chain on failure.
988+ // ----- T1 + T2: RENAME then capture @@gtid_executed on the same connection -----
989+ // Pin both operations to a single *sql.Conn so MySQL's within-session
990+ // ordering guarantee makes it impossible for T2 to observe a state that
991+ // pre-dates T1's commit. Using mgtr.inspector.db directly would let the
992+ // pool schedule T1 and T2 on different underlying TCP connections (or, with
993+ // a proxy, different servers), breaking the happens-before relationship.
994+ //
995+ // No retry on the RENAME: it is not idempotent — a partial success leaves
996+ // the table already renamed and a retry would fail. The operator re-runs
997+ // the whole hook chain on failure.
998+ pinnedConn , err := mgtr .inspector .db .Conn (context .Background ())
999+ if err != nil {
1000+ return fmt .Errorf ("failed to pin connection for T1/T2: %w" , err )
1001+ }
1002+ defer pinnedConn .Close ()
1003+
9931004 sourceDB := mgtr .migrationContext .DatabaseName
9941005 sourceTable := mgtr .migrationContext .OriginalTableName
9951006 delTable := mgtr .migrationContext .GetOldTableName ()
9961007 renameQuery := fmt .Sprintf ("RENAME TABLE %s.%s TO %s.%s" ,
9971008 sql .EscapeName (sourceDB ), sql .EscapeName (sourceTable ),
9981009 sql .EscapeName (sourceDB ), sql .EscapeName (delTable ))
9991010 mgtr .migrationContext .Log .Infof ("T1: renaming source table: %s" , renameQuery )
1000- if _ , err := sqlutils . ExecNoPrepare ( mgtr . inspector . db , renameQuery ); err != nil {
1011+ if _ , err := pinnedConn . ExecContext ( context . Background () , renameQuery ); err != nil {
10011012 return fmt .Errorf ("RENAME failed: %w" , err )
10021013 }
10031014
1004- // ----- T2: capture @@gtid_executed on the SAME *sql.DB handle as T1 -----
1005- // The design doc specifies @@gtid_executed. We query @@GLOBAL.gtid_executed explicitly rather than the unqualified
1006- // @@gtid_executed form to make the global server-wide scope unambiguous
1007- // in the SQL itself.
1015+ // ----- T2: capture @@gtid_executed on the SAME connection as T1 -----
1016+ // @@GLOBAL scope is explicit so the intent is unambiguous in the SQL itself.
10081017 // Design: https://github.com/github/gh-ost-tablemove-poc/blob/9dc6df75c4c88ff473906a497836c7518f5614ec/design/coop_cutover.md#32-correctness-verification-for-p4
10091018 var drainGTIDStr string
1010- if err := mgtr . inspector . db . QueryRow ( "select @@global. gtid_executed" ).Scan (& drainGTIDStr ); err != nil {
1019+ if err := pinnedConn . QueryRowContext ( context . Background (), "select @@gtid_executed" ).Scan (& drainGTIDStr ); err != nil {
10111020 return fmt .Errorf ("drain GTID capture failed: %w" , err )
10121021 }
10131022 drainGTID , err := mysql .NewGTIDBinlogCoordinates (drainGTIDStr )
@@ -1022,15 +1031,10 @@ func (mgtr *Migrator) moveTablesCutOver() (err error) {
10221031 // drain target (i.e. the applier contains every GTID in drainGTID). Reads
10231032 // of CurrentCoordinates hold the mutex per applier.go:75. Per-iteration
10241033 // logging is Debug only to avoid spamming Info on a hot loop.
1025- //
1026- // Timeout is a named constant (moveTablesCutOverDrainTimeout = 60s). NOTE:
1027- // the internalization doc suggested reusing CutOverLockTimeoutSeconds, but
1028- // that's capped at 1-10s (context.go:SetCutOverLockTimeoutSeconds) — too
1029- // short for a real drain under load. 60s default is up for review with
1030- // Daniel/Eric; flagged in the PR description.
1034+ drainTimeout := time .Duration (mgtr .migrationContext .CutOverLockTimeoutSeconds ) * time .Second
10311035 mgtr .migrationContext .Log .Infof ("T3: draining applier to drain GTID (timeout %s, poll %s)" ,
1032- moveTablesCutOverDrainTimeout , moveTablesCutOverDrainPollInterval )
1033- drainCtx , cancel := context .WithTimeout (context .Background (), moveTablesCutOverDrainTimeout )
1036+ drainTimeout , moveTablesCutOverDrainPollInterval )
1037+ drainCtx , cancel := context .WithTimeout (context .Background (), drainTimeout )
10341038 defer cancel ()
10351039 ticker := time .NewTicker (moveTablesCutOverDrainPollInterval )
10361040 defer ticker .Stop ()
@@ -1041,14 +1045,23 @@ func (mgtr *Migrator) moveTablesCutOver() (err error) {
10411045 mgtr .applier .CurrentCoordinatesMutex .Lock ()
10421046 applierCoords := mgtr .applier .CurrentCoordinates
10431047 mgtr .applier .CurrentCoordinatesMutex .Unlock ()
1044- if applierCoords != nil && ! applierCoords .IsEmpty () && ! applierCoords .SmallerThan (drainGTID ) {
1048+ applyBacklog := len (mgtr .applyEventsQueue )
1049+ streamerBacklog := 0
1050+ if mgtr .eventsStreamer != nil {
1051+ streamerBacklog = len (mgtr .eventsStreamer .eventsChannel )
1052+ }
1053+ if applierCoords != nil && ! applierCoords .IsEmpty () && ! applierCoords .SmallerThan (drainGTID ) && applyBacklog == 0 && streamerBacklog == 0 {
10451054 mgtr .migrationContext .Log .Infof ("T3: drain complete; applier caught up to drain GTID" )
10461055 break
10471056 }
1048- mgtr .migrationContext .Log .Debugf ("T3: applier still behind drain GTID, polling" )
1057+ if applierCoords != nil && ! applierCoords .IsEmpty () && ! applierCoords .SmallerThan (drainGTID ) {
1058+ mgtr .migrationContext .Log .Debugf ("T3: drain GTID reached but backlog remains (apply=%d, streamer=%d)" , applyBacklog , streamerBacklog )
1059+ } else {
1060+ mgtr .migrationContext .Log .Debugf ("T3: applier still behind drain GTID, polling" )
1061+ }
10491062 select {
10501063 case <- drainCtx .Done ():
1051- return fmt .Errorf ("drain poll timed out after %s: applier did not catch up to drain GTID" , moveTablesCutOverDrainTimeout )
1064+ return fmt .Errorf ("drain poll timed out after %s: applier did not catch up to drain GTID" , drainTimeout )
10521065 case <- ticker .C :
10531066 // next iteration
10541067 }
0 commit comments