@@ -110,6 +110,27 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier {
110110 }
111111}
112112
113+ func (apl * Applier ) checkpointDB () * gosql.DB {
114+ if apl .migrationContext .IsMoveTablesMode () && apl .moveTablesTargetDB != nil {
115+ return apl .moveTablesTargetDB
116+ }
117+ return apl .db
118+ }
119+
120+ func (apl * Applier ) checkpointDatabaseName () string {
121+ if apl .migrationContext .IsMoveTablesMode () {
122+ return apl .migrationContext .GetTargetDatabaseName ()
123+ }
124+ return apl .migrationContext .DatabaseName
125+ }
126+
127+ func (apl * Applier ) checkpointDrainGTIDString (chk * Checkpoint ) string {
128+ if chk == nil || chk .DrainGTID == nil || chk .DrainGTID .IsEmpty () {
129+ return ""
130+ }
131+ return chk .DrainGTID .String ()
132+ }
133+
113134// compileMigrationKeyWarningRegex compiles a regex pattern that matches duplicate key warnings
114135// for the migration's unique key. Duplicate warnings are formatted differently across MySQL versions,
115136// hence the optional table name prefix. Metacharacters in table/index names are escaped to avoid
@@ -358,7 +379,7 @@ func (apl *Applier) prepareQueries() (err error) {
358379 }
359380 if apl .migrationContext .Checkpoint {
360381 if apl .checkpointInsertQueryBuilder , err = sql .NewCheckpointQueryBuilder (
361- apl .migrationContext . DatabaseName ,
382+ apl .checkpointDatabaseName () ,
362383 apl .migrationContext .GetCheckpointTableName (),
363384 & apl .migrationContext .UniqueKey .Columns ,
364385 ); err != nil {
@@ -751,6 +772,8 @@ func (apl *Applier) CreateCheckpointTable() error {
751772 "`gh_ost_rows_copied` bigint" ,
752773 "`gh_ost_dml_applied` bigint" ,
753774 "`gh_ost_is_cutover` tinyint(1) DEFAULT '0'" ,
775+ "`gh_ost_cutover_started` tinyint(1) DEFAULT '0'" ,
776+ "`gh_ost_drain_gtid` text charset ascii" ,
754777 }
755778 for _ , col := range apl .migrationContext .UniqueKey .Columns .Columns () {
756779 if col .MySQLType == "" {
@@ -768,12 +791,12 @@ func (apl *Applier) CreateCheckpointTable() error {
768791 }
769792
770793 query := fmt .Sprintf ("create /* gh-ost */ table %s.%s (\n %s\n )" ,
771- sql .EscapeName (apl .migrationContext . DatabaseName ),
794+ sql .EscapeName (apl .checkpointDatabaseName () ),
772795 sql .EscapeName (apl .migrationContext .GetCheckpointTableName ()),
773796 strings .Join (colDefs , ",\n " ),
774797 )
775798 apl .migrationContext .Log .Infof ("Created checkpoint table" )
776- if _ , err := sqlutils .ExecNoPrepare (apl .db , query ); err != nil {
799+ if _ , err := sqlutils .ExecNoPrepare (apl .checkpointDB () , query ); err != nil {
777800 return err
778801 }
779802 return nil
@@ -782,14 +805,14 @@ func (apl *Applier) CreateCheckpointTable() error {
782805// dropTable drops a given table on the applied host
783806func (apl * Applier ) dropTable (tableName string ) error {
784807 query := fmt .Sprintf (`drop /* gh-ost */ table if exists %s.%s` ,
785- sql .EscapeName (apl .migrationContext . DatabaseName ),
808+ sql .EscapeName (apl .checkpointDatabaseName () ),
786809 sql .EscapeName (tableName ),
787810 )
788811 apl .migrationContext .Log .Infof ("Dropping table %s.%s" ,
789- sql .EscapeName (apl .migrationContext . DatabaseName ),
812+ sql .EscapeName (apl .checkpointDatabaseName () ),
790813 sql .EscapeName (tableName ),
791814 )
792- if _ , err := sqlutils .ExecNoPrepare (apl .db , query ); err != nil {
815+ if _ , err := sqlutils .ExecNoPrepare (apl .checkpointDB () , query ); err != nil {
793816 return err
794817 }
795818 apl .migrationContext .Log .Infof ("Table dropped" )
@@ -953,25 +976,26 @@ func (apl *Applier) WriteCheckpoint(chk *Checkpoint) (int64, error) {
953976 if err != nil {
954977 return insertId , err
955978 }
956- args := sqlutils .Args (chk .LastTrxCoords .String (), chk .Iteration , chk .RowsCopied , chk .DMLApplied , chk .IsCutover )
979+ args := sqlutils .Args (chk .LastTrxCoords .String (), chk .Iteration , chk .RowsCopied , chk .DMLApplied , chk .IsCutover , chk . MoveTablesCutOverStarted , apl . checkpointDrainGTIDString ( chk ) )
957980 args = append (args , uniqueKeyArgs ... )
958- res , err := apl .db .Exec (query , args ... )
981+ res , err := apl .checkpointDB () .Exec (query , args ... )
959982 if err != nil {
960983 return insertId , err
961984 }
962985 return res .LastInsertId ()
963986}
964987
965988func (apl * Applier ) ReadLastCheckpoint () (* Checkpoint , error ) {
966- row := apl .db .QueryRow (fmt .Sprintf (`select /* gh-ost */ * from %s.%s order by gh_ost_chk_id desc limit 1` , sql .EscapeName (apl .migrationContext . DatabaseName ), sql .EscapeName (apl .migrationContext .GetCheckpointTableName ())))
989+ row := apl .checkpointDB () .QueryRow (fmt .Sprintf (`select /* gh-ost */ * from %s.%s order by gh_ost_chk_id desc limit 1` , sql .EscapeName (apl .checkpointDatabaseName () ), sql .EscapeName (apl .migrationContext .GetCheckpointTableName ())))
967990 chk := & Checkpoint {
968991 IterationRangeMin : sql .NewColumnValues (apl .migrationContext .UniqueKey .Columns .Len ()),
969992 IterationRangeMax : sql .NewColumnValues (apl .migrationContext .UniqueKey .Columns .Len ()),
970993 }
971994
972995 var coordStr string
996+ var drainGTIDStr string
973997 var timestamp int64
974- ptrs := []interface {}{& chk .Id , & timestamp , & coordStr , & chk .Iteration , & chk .RowsCopied , & chk .DMLApplied , & chk .IsCutover }
998+ ptrs := []interface {}{& chk .Id , & timestamp , & coordStr , & chk .Iteration , & chk .RowsCopied , & chk .DMLApplied , & chk .IsCutover , & chk . MoveTablesCutOverStarted , & drainGTIDStr }
975999 ptrs = append (ptrs , chk .IterationRangeMin .ValuesPointers ... )
9761000 ptrs = append (ptrs , chk .IterationRangeMax .ValuesPointers ... )
9771001 err := row .Scan (ptrs ... )
@@ -988,6 +1012,13 @@ func (apl *Applier) ReadLastCheckpoint() (*Checkpoint, error) {
9881012 return nil , err
9891013 }
9901014 chk .LastTrxCoords = gtidCoords
1015+ if drainGTIDStr != "" {
1016+ drainGTID , err := mysql .NewGTIDBinlogCoordinates (drainGTIDStr )
1017+ if err != nil {
1018+ return nil , err
1019+ }
1020+ chk .DrainGTID = drainGTID
1021+ }
9911022 } else {
9921023 fileCoords , err := mysql .ParseFileBinlogCoordinates (coordStr )
9931024 if err != nil {
@@ -998,6 +1029,36 @@ func (apl *Applier) ReadLastCheckpoint() (*Checkpoint, error) {
9981029 return chk , nil
9991030}
10001031
1032+ func (apl * Applier ) ReadMoveTablesCutOverCheckpoint () (* Checkpoint , error ) {
1033+ row := apl .checkpointDB ().QueryRow (fmt .Sprintf (`select /* gh-ost */ gh_ost_chk_id, gh_ost_chk_timestamp, gh_ost_chk_coords, gh_ost_chk_iteration, gh_ost_rows_copied, gh_ost_dml_applied, gh_ost_is_cutover, gh_ost_cutover_started, gh_ost_drain_gtid from %s.%s order by gh_ost_chk_id desc limit 1` , sql .EscapeName (apl .checkpointDatabaseName ()), sql .EscapeName (apl .migrationContext .GetCheckpointTableName ())))
1034+ chk := & Checkpoint {}
1035+ var coordStr , drainGTIDStr string
1036+ var timestamp int64
1037+ err := row .Scan (& chk .Id , & timestamp , & coordStr , & chk .Iteration , & chk .RowsCopied , & chk .DMLApplied , & chk .IsCutover , & chk .MoveTablesCutOverStarted , & drainGTIDStr )
1038+ if err != nil {
1039+ if errors .Is (err , gosql .ErrNoRows ) {
1040+ return nil , ErrNoCheckpointFound
1041+ }
1042+ return nil , err
1043+ }
1044+ chk .Timestamp = time .Unix (timestamp , 0 )
1045+ if coordStr != "" {
1046+ coords , err := mysql .NewGTIDBinlogCoordinates (coordStr )
1047+ if err != nil {
1048+ return nil , err
1049+ }
1050+ chk .LastTrxCoords = coords
1051+ }
1052+ if drainGTIDStr != "" {
1053+ drainGTID , err := mysql .NewGTIDBinlogCoordinates (drainGTIDStr )
1054+ if err != nil {
1055+ return nil , err
1056+ }
1057+ chk .DrainGTID = drainGTID
1058+ }
1059+ return chk , nil
1060+ }
1061+
10011062// InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table.
10021063// Apl is done asynchronously
10031064func (apl * Applier ) InitiateHeartbeat () {
0 commit comments