Skip to content
3 changes: 3 additions & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,9 @@ func main() {
if migrationContext.PostponeCutOverFlagFile == "" {
log.Fatal("--postpone-cut-over-flag-file must be specified when using --move-tables")
}
if !migrationContext.Checkpoint {
log.Fatal("--checkpoint must be specified when using --move-tables")
}
migrationContext.MoveTables.TableNames = strings.Split(*moveTables, ",")
for i := range migrationContext.MoveTables.TableNames {
migrationContext.MoveTables.TableNames[i] = strings.TrimSpace(migrationContext.MoveTables.TableNames[i])
Expand Down
118 changes: 109 additions & 9 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,35 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier {
}
}

func (apl *Applier) checkpointDB() *gosql.DB {
if apl.migrationContext.IsMoveTablesMode() && apl.moveTablesTargetDB != nil {
return apl.moveTablesTargetDB
}
return apl.db
}

func (apl *Applier) checkpointDatabaseName() string {
if apl.migrationContext.IsMoveTablesMode() {
return apl.migrationContext.GetTargetDatabaseName()
}
return apl.migrationContext.DatabaseName
}

func (apl *Applier) checkpointDrainGTIDString(chk *Checkpoint) string {
if chk == nil || chk.MoveTablesCutOverDrainGTID == nil || chk.MoveTablesCutOverDrainGTID.IsEmpty() {
return ""
}
return chk.MoveTablesCutOverDrainGTID.String()
}

func (apl *Applier) checkpointRangeColumnNames() (minColumnNames []string, maxColumnNames []string) {
for _, col := range apl.migrationContext.UniqueKey.Columns.Columns() {
minColumnNames = append(minColumnNames, sql.TruncateColumnName(col.Name, sql.MaxColumnNameLength-4)+"_min")
maxColumnNames = append(maxColumnNames, sql.TruncateColumnName(col.Name, sql.MaxColumnNameLength-4)+"_max")
}
return minColumnNames, maxColumnNames
}

// compileMigrationKeyWarningRegex compiles a regex pattern that matches duplicate key warnings
// for the migration's unique key. Duplicate warnings are formatted differently across MySQL versions,
// hence the optional table name prefix. Metacharacters in table/index names are escaped to avoid
Expand Down Expand Up @@ -358,9 +387,10 @@ func (apl *Applier) prepareQueries() (err error) {
}
if apl.migrationContext.Checkpoint {
if apl.checkpointInsertQueryBuilder, err = sql.NewCheckpointQueryBuilder(
apl.migrationContext.DatabaseName,
apl.checkpointDatabaseName(),
apl.migrationContext.GetCheckpointTableName(),
&apl.migrationContext.UniqueKey.Columns,
apl.migrationContext.IsMoveTablesMode(),
); err != nil {
return err
}
Expand Down Expand Up @@ -752,6 +782,12 @@ func (apl *Applier) CreateCheckpointTable() error {
"`gh_ost_dml_applied` bigint",
"`gh_ost_is_cutover` tinyint(1) DEFAULT '0'",
}
if apl.migrationContext.IsMoveTablesMode() {
colDefs = append(colDefs,
"`gh_ost_move_tables_cutover_started` tinyint(1) DEFAULT '0'",
"`gh_ost_move_tables_drain_gtid` text charset ascii",
)
}
for _, col := range apl.migrationContext.UniqueKey.Columns.Columns() {
if col.MySQLType == "" {
return fmt.Errorf("column %s has no type information. applyColumnTypes must be called", sql.EscapeName(col.Name))
Expand All @@ -768,12 +804,12 @@ func (apl *Applier) CreateCheckpointTable() error {
}

query := fmt.Sprintf("create /* gh-ost */ table %s.%s (\n %s\n)",
sql.EscapeName(apl.migrationContext.DatabaseName),
sql.EscapeName(apl.checkpointDatabaseName()),
sql.EscapeName(apl.migrationContext.GetCheckpointTableName()),
strings.Join(colDefs, ",\n "),
)
apl.migrationContext.Log.Infof("Created checkpoint table")
if _, err := sqlutils.ExecNoPrepare(apl.db, query); err != nil {
if _, err := sqlutils.ExecNoPrepare(apl.checkpointDB(), query); err != nil {
return err
}
return nil
Expand All @@ -782,14 +818,14 @@ func (apl *Applier) CreateCheckpointTable() error {
// dropTable drops a given table on the applied host
func (apl *Applier) dropTable(tableName string) error {
query := fmt.Sprintf(`drop /* gh-ost */ table if exists %s.%s`,
sql.EscapeName(apl.migrationContext.DatabaseName),
sql.EscapeName(apl.checkpointDatabaseName()),
sql.EscapeName(tableName),
)
apl.migrationContext.Log.Infof("Dropping table %s.%s",
sql.EscapeName(apl.migrationContext.DatabaseName),
sql.EscapeName(apl.checkpointDatabaseName()),
sql.EscapeName(tableName),
)
if _, err := sqlutils.ExecNoPrepare(apl.db, query); err != nil {
if _, err := sqlutils.ExecNoPrepare(apl.checkpointDB(), query); err != nil {
return err
}
apl.migrationContext.Log.Infof("Table dropped")
Expand Down Expand Up @@ -954,24 +990,51 @@ func (apl *Applier) WriteCheckpoint(chk *Checkpoint) (int64, error) {
return insertId, err
}
args := sqlutils.Args(chk.LastTrxCoords.String(), chk.Iteration, chk.RowsCopied, chk.DMLApplied, chk.IsCutover)
if apl.migrationContext.IsMoveTablesMode() {
args = append(args, chk.MoveTablesCutOverStarted, apl.checkpointDrainGTIDString(chk))
}
args = append(args, uniqueKeyArgs...)
res, err := apl.db.Exec(query, args...)
res, err := apl.checkpointDB().Exec(query, args...)
if err != nil {
return insertId, err
}
return res.LastInsertId()
}

func (apl *Applier) ReadLastCheckpoint() (*Checkpoint, error) {
row := apl.db.QueryRow(fmt.Sprintf(`select /* gh-ost */ * from %s.%s order by gh_ost_chk_id desc limit 1`, sql.EscapeName(apl.migrationContext.DatabaseName), sql.EscapeName(apl.migrationContext.GetCheckpointTableName())))
minColumnNames, maxColumnNames := apl.checkpointRangeColumnNames()
selectColumns := []string{
"gh_ost_chk_id",
"gh_ost_chk_timestamp",
"gh_ost_chk_coords",
"gh_ost_chk_iteration",
"gh_ost_rows_copied",
"gh_ost_dml_applied",
"gh_ost_is_cutover",
}
if apl.migrationContext.IsMoveTablesMode() {
selectColumns = append(selectColumns, "gh_ost_move_tables_cutover_started", "gh_ost_move_tables_drain_gtid")
}
selectColumns = append(selectColumns, minColumnNames...)
selectColumns = append(selectColumns, maxColumnNames...)

row := apl.checkpointDB().QueryRow(fmt.Sprintf(
`select /* gh-ost */ %s from %s.%s order by gh_ost_chk_id desc limit 1`,
strings.Join(selectColumns, ", "),
sql.EscapeName(apl.checkpointDatabaseName()),
sql.EscapeName(apl.migrationContext.GetCheckpointTableName()),
))
chk := &Checkpoint{
IterationRangeMin: sql.NewColumnValues(apl.migrationContext.UniqueKey.Columns.Len()),
IterationRangeMax: sql.NewColumnValues(apl.migrationContext.UniqueKey.Columns.Len()),
}

var coordStr string
var coordStr, drainGTIDStr string
var timestamp int64
ptrs := []interface{}{&chk.Id, &timestamp, &coordStr, &chk.Iteration, &chk.RowsCopied, &chk.DMLApplied, &chk.IsCutover}
if apl.migrationContext.IsMoveTablesMode() {
ptrs = append(ptrs, &chk.MoveTablesCutOverStarted, &drainGTIDStr)
}
ptrs = append(ptrs, chk.IterationRangeMin.ValuesPointers...)
ptrs = append(ptrs, chk.IterationRangeMax.ValuesPointers...)
err := row.Scan(ptrs...)
Expand All @@ -995,6 +1058,43 @@ func (apl *Applier) ReadLastCheckpoint() (*Checkpoint, error) {
}
chk.LastTrxCoords = fileCoords
}
if apl.migrationContext.IsMoveTablesMode() && drainGTIDStr != "" {
drainGTID, err := mysql.NewGTIDBinlogCoordinates(drainGTIDStr)
if err != nil {
return nil, err
}
chk.MoveTablesCutOverDrainGTID = drainGTID
}
return chk, nil
}

func (apl *Applier) ReadMoveTablesCutOverCheckpoint() (*Checkpoint, error) {
row := apl.checkpointDB().QueryRow(fmt.Sprintf(`select /* gh-ost */ gh_ost_chk_id, gh_ost_chk_timestamp, gh_ost_chk_coords, gh_ost_chk_iteration, gh_ost_rows_copied, gh_ost_dml_applied, gh_ost_is_cutover, gh_ost_move_tables_cutover_started, gh_ost_move_tables_drain_gtid from %s.%s order by gh_ost_chk_id desc limit 1`, sql.EscapeName(apl.checkpointDatabaseName()), sql.EscapeName(apl.migrationContext.GetCheckpointTableName())))
chk := &Checkpoint{}
var coordStr, drainGTIDStr string
var timestamp int64
err := row.Scan(&chk.Id, &timestamp, &coordStr, &chk.Iteration, &chk.RowsCopied, &chk.DMLApplied, &chk.IsCutover, &chk.MoveTablesCutOverStarted, &drainGTIDStr)
if err != nil {
if errors.Is(err, gosql.ErrNoRows) {
return nil, ErrNoCheckpointFound
}
return nil, err
}
chk.Timestamp = time.Unix(timestamp, 0)
if coordStr != "" {
coords, err := mysql.NewGTIDBinlogCoordinates(coordStr)
if err != nil {
return nil, err
}
chk.LastTrxCoords = coords
}
if drainGTIDStr != "" {
drainGTID, err := mysql.NewGTIDBinlogCoordinates(drainGTIDStr)
if err != nil {
return nil, err
}
chk.MoveTablesCutOverDrainGTID = drainGTID
}
return chk, nil
}

Expand Down
90 changes: 90 additions & 0 deletions go/logic/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,96 @@ func (suite *ApplierTestSuite) TestWriteCheckpoint() {
suite.Require().Equal(chk.RowsCopied, gotChk.RowsCopied)
suite.Require().Equal(chk.DMLApplied, gotChk.DMLApplied)
suite.Require().Equal(chk.IsCutover, gotChk.IsCutover)
suite.Require().False(gotChk.MoveTablesCutOverStarted)
suite.Require().Nil(gotChk.MoveTablesCutOverDrainGTID)
}

func (suite *ApplierTestSuite) TestWriteCheckpointMoveTables() {
ctx := context.Background()

var err error

_, err = suite.db.ExecContext(ctx, fmt.Sprintf("CREATE TABLE %s (id int not null, id2 char(4) CHARACTER SET utf8mb4, primary key(id, id2))", getTestTableName()))
suite.Require().NoError(err)

_, err = suite.db.ExecContext(ctx, fmt.Sprintf("INSERT INTO %s (id, id2) VALUES (?,?), (?,?), (?,?)", getTestTableName()), 411, "君子懷德", 411, "小人懷土", 212, "君子不器")
suite.Require().NoError(err)

connectionConfig, err := getTestConnectionConfig(ctx, suite.mysqlContainer)
suite.Require().NoError(err)

migrationContext := newTestMigrationContext()
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.InspectorConnectionConfig = connectionConfig
migrationContext.SetConnectionConfig("innodb")
migrationContext.UseGTIDs = true

migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "id2"})
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "id2"})
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "id2"})
migrationContext.Checkpoint = true
migrationContext.MoveTables.TableNames = []string{testMysqlTableName}
migrationContext.MoveTables.TargetDatabase = testMysqlDatabase
migrationContext.MoveTables.ConnectionConfig = connectionConfig
migrationContext.UniqueKey = &sql.UniqueKey{
Name: "PRIMARY",
NameInGhostTable: "PRIMARY",
Columns: *sql.NewColumnList([]string{"id", "id2"}),
}

inspector := NewInspector(migrationContext)
suite.Require().NoError(inspector.InitDBConnections())

err = inspector.applyColumnTypes(testMysqlDatabase, testMysqlTableName, &migrationContext.UniqueKey.Columns)
suite.Require().NoError(err)

applier := NewApplier(migrationContext)

err = applier.InitDBConnections()
suite.Require().NoError(err)

err = applier.CreateCheckpointTable()
suite.Require().NoError(err)

err = applier.prepareQueries()
suite.Require().NoError(err)

err = applier.ReadMigrationRangeValues(inspector.db)
suite.Require().NoError(err)

coords, err := mysql.NewGTIDBinlogCoordinates("00000000-0000-0000-0000-000000000001:1-10")
suite.Require().NoError(err)
drainGTID, err := mysql.NewGTIDBinlogCoordinates("00000000-0000-0000-0000-000000000001:1-20")
suite.Require().NoError(err)

chk := &Checkpoint{
LastTrxCoords: coords,
IterationRangeMin: applier.migrationContext.MigrationRangeMinValues,
IterationRangeMax: applier.migrationContext.MigrationRangeMaxValues,
Iteration: 3,
RowsCopied: 1000,
DMLApplied: 2000,
IsCutover: false,
MoveTablesCutOverStarted: true,
MoveTablesCutOverDrainGTID: drainGTID,
}
id, err := applier.WriteCheckpoint(chk)
suite.Require().NoError(err)
suite.Require().Equal(int64(1), id)

gotChk, err := applier.ReadLastCheckpoint()
suite.Require().NoError(err)

suite.Require().Equal(chk.Iteration, gotChk.Iteration)
suite.Require().Equal(chk.LastTrxCoords.String(), gotChk.LastTrxCoords.String())
suite.Require().Equal(chk.IterationRangeMin.String(), gotChk.IterationRangeMin.String())
suite.Require().Equal(chk.IterationRangeMax.String(), gotChk.IterationRangeMax.String())
suite.Require().Equal(chk.RowsCopied, gotChk.RowsCopied)
suite.Require().Equal(chk.DMLApplied, gotChk.DMLApplied)
suite.Require().Equal(chk.IsCutover, gotChk.IsCutover)
suite.Require().True(gotChk.MoveTablesCutOverStarted)
suite.Require().NotNil(gotChk.MoveTablesCutOverDrainGTID)
suite.Require().Equal(drainGTID.String(), gotChk.MoveTablesCutOverDrainGTID.String())
}

func (suite *ApplierTestSuite) TestPanicOnWarningsWithDuplicateKeyOnNonMigrationIndex() {
Expand Down
12 changes: 7 additions & 5 deletions go/logic/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ type Checkpoint struct {
IterationRangeMin *sql.ColumnValues
// IterationRangeMax is the max shared key value
// for the chunk copier range.
IterationRangeMax *sql.ColumnValues
Iteration int64
RowsCopied int64
DMLApplied int64
IsCutover bool
IterationRangeMax *sql.ColumnValues
Iteration int64
RowsCopied int64
DMLApplied int64
IsCutover bool
MoveTablesCutOverStarted bool
MoveTablesCutOverDrainGTID mysql.BinlogCoordinates
}
Loading
Loading