diff --git a/doc/command-line-flags.md b/doc/command-line-flags.md index 2012c8c4c..b91929abb 100644 --- a/doc/command-line-flags.md +++ b/doc/command-line-flags.md @@ -78,6 +78,18 @@ See also: [`resuming-migrations`](resume.md) `--checkpoint-seconds` specifies the seconds between checkpoints. Default is 300. +### chunk-concurrent-size + +`--chunk-concurrent-size=1`, the number of goroutines to execute chunk-copy operations concurrently in each copy time slot. Default `1` (sequential). Minimum `1`. + +When set to a value greater than 1, multiple chunks are calculated and copied in parallel within each write-function invocation. This can significantly speed up row-copy on large tables when MySQL can handle concurrent writes to the ghost table. + +Each concurrent chunk calculates its own non-overlapping key range under a serialization lock, so there is no risk of duplicate or overlapping copies. A single dedicated producer goroutine streams these pre-calculated ranges to a pool of copy workers that run continuously (rather than in fixed barrier-synchronized batches), so the serialized boundary calculation overlaps with the parallel `INSERT`s and a slow chunk does not stall the others. Each chunk also applies its session variables and `INSERT` in a single autocommit round-trip, avoiding the per-chunk `BEGIN`/`SET SESSION`/`COMMIT` overhead. The applier connection pool is sized to `chunk-concurrent-size + 1 (producer) + headroom` automatically. + +For the speedup to materialize, MySQL should allow concurrent inserts to scale: on MySQL 8.0+ the default `innodb_autoinc_lock_mode = 2` (interleaved) is required for tables with an `AUTO_INCREMENT` column — under mode 0/1 an `INSERT ... SELECT` holds a table-level AUTO-INC lock that serializes concurrent chunks. + +Note: concurrency multiplies write pressure per time slot. Throttling (`--max-load`, `--nice-ratio`) applies per batch, not per chunk. Start with small values (2-8) and monitor replication lag. + ### conf `--conf=/path/to/my.cnf`: file where credentials are specified. Should be in (or contain) the following format: diff --git a/go/base/context.go b/go/base/context.go index 26d13fe07..fc58b7c13 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -27,6 +27,16 @@ import ( "github.com/go-ini/ini" ) +// IterationRangeValues holds the range boundaries for a single chunk-copy iteration. +// Used by concurrent row-copy to pass isolated range values to each worker goroutine. +type IterationRangeValues struct { + Min *sql.ColumnValues + Max *sql.ColumnValues + Size int64 + IncludeMinValues bool + HasFurtherRange bool +} + // RowsEstimateMethod is the type of row number estimation type RowsEstimateMethod string @@ -131,6 +141,7 @@ type MigrationContext struct { HeartbeatIntervalMilliseconds int64 defaultNumRetries int64 ChunkSize int64 + ChunkConcurrentSize int64 niceRatio float64 MaxLagMillisecondsThrottleThreshold int64 throttleControlReplicaKeys *mysql.InstanceKeyMap @@ -240,27 +251,28 @@ type MigrationContext struct { Metrics *metrics.Client - OriginalTableColumnsOnApplier *sql.ColumnList - OriginalTableColumns *sql.ColumnList - OriginalTableVirtualColumns *sql.ColumnList - OriginalTableUniqueKeys [](*sql.UniqueKey) - OriginalTableAutoIncrement uint64 - GhostTableColumns *sql.ColumnList - GhostTableVirtualColumns *sql.ColumnList - GhostTableUniqueKeys [](*sql.UniqueKey) - UniqueKey *sql.UniqueKey - SharedColumns *sql.ColumnList - ColumnRenameMap map[string]string - DroppedColumnsMap map[string]bool - MappedSharedColumns *sql.ColumnList - MigrationLastInsertSQLWarnings []string - MigrationRangeMinValues *sql.ColumnValues - MigrationRangeMaxValues *sql.ColumnValues - Iteration int64 - MigrationIterationRangeMinValues *sql.ColumnValues - MigrationIterationRangeMaxValues *sql.ColumnValues - InitialStreamerCoords mysql.BinlogCoordinates - ForceTmpTableName string + OriginalTableColumnsOnApplier *sql.ColumnList + OriginalTableColumns *sql.ColumnList + OriginalTableVirtualColumns *sql.ColumnList + OriginalTableUniqueKeys [](*sql.UniqueKey) + OriginalTableAutoIncrement uint64 + GhostTableColumns *sql.ColumnList + GhostTableVirtualColumns *sql.ColumnList + GhostTableUniqueKeys [](*sql.UniqueKey) + UniqueKey *sql.UniqueKey + SharedColumns *sql.ColumnList + ColumnRenameMap map[string]string + DroppedColumnsMap map[string]bool + MappedSharedColumns *sql.ColumnList + MigrationLastInsertSQLWarnings []string + MigrationRangeMinValues *sql.ColumnValues + MigrationRangeMaxValues *sql.ColumnValues + Iteration int64 + MigrationIterationRangeMinValues *sql.ColumnValues + MigrationIterationRangeMaxValues *sql.ColumnValues + CalculateNextIterationRangeEndValuesLock *sync.Mutex + InitialStreamerCoords mysql.BinlogCoordinates + ForceTmpTableName string IncludeTriggers bool RemoveTriggerSuffix bool @@ -310,29 +322,31 @@ type ContextConfig struct { func NewMigrationContext() *MigrationContext { ctx, cancelFunc := context.WithCancel(context.Background()) return &MigrationContext{ - Uuid: uuid.NewString(), - defaultNumRetries: 60, - ChunkSize: 1000, - InspectorConnectionConfig: mysql.NewConnectionConfig(), - ApplierConnectionConfig: mysql.NewConnectionConfig(), - MaxLagMillisecondsThrottleThreshold: 1500, - CutOverLockTimeoutSeconds: 3, - DMLBatchSize: 10, - etaNanoseonds: ETAUnknown, - maxLoad: NewLoadMap(), - criticalLoad: NewLoadMap(), - throttleMutex: &sync.Mutex{}, - throttleHTTPMutex: &sync.Mutex{}, - throttleControlReplicaKeys: mysql.NewInstanceKeyMap(), - configMutex: &sync.Mutex{}, - pointOfInterestTimeMutex: &sync.Mutex{}, - lastHeartbeatOnChangelogMutex: &sync.Mutex{}, - ColumnRenameMap: make(map[string]string), - PanicAbort: make(chan error), - ctx: ctx, - cancelFunc: cancelFunc, - abortMutex: &sync.Mutex{}, - Log: NewDefaultLogger(), + Uuid: uuid.NewString(), + defaultNumRetries: 60, + ChunkSize: 1000, + ChunkConcurrentSize: 1, + InspectorConnectionConfig: mysql.NewConnectionConfig(), + ApplierConnectionConfig: mysql.NewConnectionConfig(), + MaxLagMillisecondsThrottleThreshold: 1500, + CutOverLockTimeoutSeconds: 3, + DMLBatchSize: 10, + etaNanoseonds: ETAUnknown, + maxLoad: NewLoadMap(), + criticalLoad: NewLoadMap(), + throttleMutex: &sync.Mutex{}, + throttleHTTPMutex: &sync.Mutex{}, + throttleControlReplicaKeys: mysql.NewInstanceKeyMap(), + configMutex: &sync.Mutex{}, + pointOfInterestTimeMutex: &sync.Mutex{}, + lastHeartbeatOnChangelogMutex: &sync.Mutex{}, + CalculateNextIterationRangeEndValuesLock: &sync.Mutex{}, + ColumnRenameMap: make(map[string]string), + PanicAbort: make(chan error), + ctx: ctx, + cancelFunc: cancelFunc, + abortMutex: &sync.Mutex{}, + Log: NewDefaultLogger(), } } @@ -693,6 +707,13 @@ func (mctx *MigrationContext) SetChunkSize(chunkSize int64) { atomic.StoreInt64(&mctx.ChunkSize, chunkSize) } +func (mctx *MigrationContext) SetChunkConcurrentSize(chunkConcurrentSize int64) { + if chunkConcurrentSize < 1 { + chunkConcurrentSize = 1 + } + atomic.StoreInt64(&mctx.ChunkConcurrentSize, chunkConcurrentSize) +} + func (mctx *MigrationContext) SetDMLBatchSize(batchSize int64) { if batchSize < 1 { batchSize = 1 diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index d77046231..3a029f825 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -123,6 +123,7 @@ func main() { flag.BoolVar(&migrationContext.CutOverExponentialBackoff, "cut-over-exponential-backoff", false, "Wait exponentially longer intervals between failed cut-over attempts. Wait intervals obey a maximum configurable with 'exponential-backoff-max-interval').") exponentialBackoffMaxInterval := flag.Int64("exponential-backoff-max-interval", 64, "Maximum number of seconds to wait between attempts when performing various operations with exponential backoff.") chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 10-100,000)") + chunkConcurrentSize := flag.Int64("chunk-concurrent-size", 1, "number of goroutines to execute chunks concurrently in each copy time slot (range 1-100)") dmlBatchSize := flag.Int64("dml-batch-size", 10, "batch size for DML events to apply in a single transaction (range 1-1000)") defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking") flag.BoolVar(&migrationContext.PanicOnWarnings, "panic-on-warnings", false, "Panic when SQL warnings are encountered when copying a batch indicating data loss") @@ -375,6 +376,7 @@ func main() { migrationContext.SetHeartbeatIntervalMilliseconds(*heartbeatIntervalMillis) migrationContext.SetNiceRatio(*niceRatio) migrationContext.SetChunkSize(*chunkSize) + migrationContext.SetChunkConcurrentSize(*chunkConcurrentSize) migrationContext.SetDMLBatchSize(*dmlBatchSize) migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis) migrationContext.SetThrottleQuery(*throttleQuery) diff --git a/go/logic/applier.go b/go/logic/applier.go index f3474b3ef..d080414b3 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -123,6 +123,16 @@ func (apl *Applier) InitDBConnections() (err error) { if apl.db, _, err = mysql.GetDB(apl.migrationContext.Uuid, uriWithMulti); err != nil { return err } + concurrentSize := atomic.LoadInt64(&apl.migrationContext.ChunkConcurrentSize) + if concurrentSize > 1 { + // Size the pool for concurrentSize parallel chunk-INSERTs plus the dedicated + // range-producer connection, with MaxDBPoolConnections of additional headroom + // for other applier queries. Without this, small concurrency values (2, 3) + // would contend with the producer for a connection and serialize. + poolSize := int(concurrentSize) + 1 + mysql.MaxDBPoolConnections + apl.db.SetMaxOpenConns(poolSize) + apl.db.SetMaxIdleConns(poolSize) + } singletonApplierUri := fmt.Sprintf("%s&timeout=0", applierUri) if apl.singletonDB, _, err = mysql.GetDB(apl.migrationContext.Uuid, singletonApplierUri); err != nil { return err @@ -1021,10 +1031,40 @@ func (apl *Applier) ReadMigrationRangeValues() error { } // CalculateNextIterationRangeEndValues reads the next-iteration-range-end unique key values, -// which will be used for copying the next chunk of rows. Ir returns "false" if there is -// no further chunk to work through, i.e. we're past the last chunk and are done with -// iterating the range (and thus done with copying row chunks) -func (apl *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) { +// which will be used for copying the next chunk of rows. It returns an IterationRangeValues +// struct with HasFurtherRange=false if there is no further chunk to work through. +// Thread-safe: uses a mutex to serialize access for concurrent row-copy. +// When advanceCursor is true, the function determines min from MigrationIterationRangeMaxValues +// (for concurrent mode where each goroutine advances the cursor). +// When advanceCursor is false, min is read from MigrationIterationRangeMinValues (pre-set by +// SetNextIterationRangeMinValues for single-threaded retry compatibility). +func (apl *Applier) CalculateNextIterationRangeEndValues(advanceCursor bool) (values *base.IterationRangeValues, err error) { + apl.migrationContext.CalculateNextIterationRangeEndValuesLock.Lock() + defer apl.migrationContext.CalculateNextIterationRangeEndValuesLock.Unlock() + + result := &base.IterationRangeValues{ + Size: atomic.LoadInt64(&apl.migrationContext.ChunkSize), + } + + if advanceCursor { + // Concurrent mode: advance min from current max cursor + result.Min = apl.migrationContext.MigrationIterationRangeMaxValues + if result.Min == nil { + result.Min = apl.migrationContext.MigrationRangeMinValues + result.IncludeMinValues = true + } + } else { + // Single-threaded mode: min was pre-set by SetNextIterationRangeMinValues + result.Min = apl.migrationContext.MigrationIterationRangeMinValues + if result.Min == nil { + result.Min = apl.migrationContext.MigrationRangeMinValues + } + // First iteration: include the minimum values. Use Iteration counter (not cursor state) + // because cursor is mutated on first calc success, but Iteration only advances after + // successful insert — so on retry of the first chunk, this still returns true. + result.IncludeMinValues = (apl.migrationContext.GetIteration() == 0) + } + for i := 0; i < 2; i++ { buildFunc := sql.BuildUniqueKeyRangeEndPreparedQueryViaOffset if i == 1 { @@ -1034,46 +1074,56 @@ func (apl *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool apl.migrationContext.DatabaseName, apl.migrationContext.OriginalTableName, &apl.migrationContext.UniqueKey.Columns, - apl.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), + result.Min.AbstractValues(), apl.migrationContext.MigrationRangeMaxValues.AbstractValues(), - atomic.LoadInt64(&apl.migrationContext.ChunkSize), - apl.migrationContext.GetIteration() == 0, + result.Size, + result.IncludeMinValues, fmt.Sprintf("iteration:%d", apl.migrationContext.GetIteration()), ) if err != nil { - return hasFurtherRange, err + return result, err } rows, err := apl.db.Query(query, explodedArgs...) if err != nil { - return hasFurtherRange, err + return result, err } defer rows.Close() iterationRangeMaxValues := sql.NewColumnValues(apl.migrationContext.UniqueKey.Len()) for rows.Next() { if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil { - return hasFurtherRange, err + return result, err } - hasFurtherRange = true + result.HasFurtherRange = true } if err = rows.Err(); err != nil { - return hasFurtherRange, err + return result, err } - if hasFurtherRange { - apl.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues - return hasFurtherRange, nil + if result.HasFurtherRange { + result.Max = iterationRangeMaxValues + // Advance global cursor + apl.migrationContext.MigrationIterationRangeMinValues = result.Min + apl.migrationContext.MigrationIterationRangeMaxValues = result.Max + return result, nil } } apl.migrationContext.Log.Debugf("Iteration complete: no further range to iterate") - return hasFurtherRange, nil + return result, nil } // ApplyIterationInsertQuery issues a chunk-INSERT query on the ghost table. It is where // data actually gets copied from original table. -func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, err error) { +// +// The session variables (time_zone, sql_mode) and the chunk INSERT are sent as a single +// multi-statement, autocommit round-trip on one pinned connection. The applier pool sets +// multiStatements + interpolateParams + autocommit, so this avoids the extra +// BEGIN / SET SESSION / COMMIT round-trips an explicit transaction would add to every +// chunk — the dominant per-chunk overhead at small chunk sizes. `RowsAffected()` reports +// the last statement's count (the INSERT), so the returned row count stays correct. +func (apl *Applier) ApplyIterationInsertQuery(ctx context.Context, iterationRangeValues *base.IterationRangeValues) (chunkSize int64, rowsAffected int64, duration time.Duration, warnings []string, err error) { startTime := time.Now() - chunkSize = atomic.LoadInt64(&apl.migrationContext.ChunkSize) + chunkSize = iterationRangeValues.Size query, explodedArgs, err := sql.BuildRangeInsertPreparedQuery( apl.migrationContext.DatabaseName, @@ -1083,85 +1133,93 @@ func (apl *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected i apl.migrationContext.MappedSharedColumns.Names(), apl.migrationContext.UniqueKey.Name, &apl.migrationContext.UniqueKey.Columns, - apl.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), - apl.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(), - apl.migrationContext.GetIteration() == 0, + iterationRangeValues.Min.AbstractValues(), + iterationRangeValues.Max.AbstractValues(), + iterationRangeValues.IncludeMinValues, apl.migrationContext.IsTransactionalTable(), // TODO: Don't hardcode this strings.HasPrefix(apl.migrationContext.ApplierMySQLVersion, "8."), ) if err != nil { - return chunkSize, rowsAffected, duration, err + return chunkSize, rowsAffected, duration, nil, err } - sqlResult, err := func() (gosql.Result, error) { - tx, err := apl.db.Begin() + sessionQuery := fmt.Sprintf(`SET SESSION time_zone = '%s', %s`, + apl.migrationContext.ApplierTimeZone, apl.generateSqlModeQuery()) + combinedQuery := sessionQuery + "; " + query + + sqlResult, sqlWarnings, err := func() (gosql.Result, []string, error) { + // Pin a single connection so the optional SHOW WARNINGS observes this INSERT's + // warnings (and not another pooled query's). + conn, err := apl.db.Conn(ctx) if err != nil { - return nil, err + return nil, nil, err } - defer tx.Rollback() - - sessionQuery := fmt.Sprintf(`SET SESSION time_zone = '%s'`, apl.migrationContext.ApplierTimeZone) - sessionQuery = fmt.Sprintf("%s, %s", sessionQuery, apl.generateSqlModeQuery()) + defer conn.Close() - if _, err := tx.Exec(sessionQuery); err != nil { - return nil, err - } - result, err := tx.Exec(query, explodedArgs...) + result, err := conn.ExecContext(ctx, combinedQuery, explodedArgs...) if err != nil { - return nil, err + return nil, nil, err } + var collectedWarnings []string if apl.migrationContext.PanicOnWarnings { - rows, err := tx.Query("SHOW WARNINGS") + collectedWarnings, err = apl.collectChunkInsertWarnings(ctx, conn) if err != nil { - return nil, err - } - defer rows.Close() - if err = rows.Err(); err != nil { - return nil, err + return nil, nil, err } - - // Compile regex once before loop to avoid performance penalty and handle errors properly - migrationKeyRegex, err := apl.compileMigrationKeyWarningRegex() - if err != nil { - return nil, err - } - - var sqlWarnings []string - for rows.Next() { - var level, message string - var code int - if err := rows.Scan(&level, &code, &message); err != nil { - apl.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row") - continue - } - if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) { - continue - } - sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code)) - } - apl.migrationContext.MigrationLastInsertSQLWarnings = sqlWarnings } - - if err := tx.Commit(); err != nil { - return nil, err - } - return result, nil + return result, collectedWarnings, nil }() if err != nil { - return chunkSize, rowsAffected, duration, err + return chunkSize, rowsAffected, duration, nil, err } rowsAffected, _ = sqlResult.RowsAffected() duration = time.Since(startTime) + warnings = sqlWarnings apl.migrationContext.Log.Debugf( "Issued INSERT on range: [%s]..[%s]; iteration: %d; chunk-size: %d", - apl.migrationContext.MigrationIterationRangeMinValues, - apl.migrationContext.MigrationIterationRangeMaxValues, + iterationRangeValues.Min, + iterationRangeValues.Max, apl.migrationContext.GetIteration(), chunkSize) - return chunkSize, rowsAffected, duration, nil + return chunkSize, rowsAffected, duration, warnings, nil +} + +// collectChunkInsertWarnings runs SHOW WARNINGS on the given (pinned) connection right +// after a chunk INSERT and returns the warnings, skipping the benign duplicate-key +// warnings that INSERT IGNORE produces on the migration unique key. +func (apl *Applier) collectChunkInsertWarnings(ctx context.Context, conn *gosql.Conn) ([]string, error) { + rows, err := conn.QueryContext(ctx, "SHOW WARNINGS") + if err != nil { + return nil, err + } + defer rows.Close() + + // Compile regex once before loop to avoid performance penalty and handle errors properly + migrationKeyRegex, err := apl.compileMigrationKeyWarningRegex() + if err != nil { + return nil, err + } + + var collectedWarnings []string + for rows.Next() { + var level, message string + var code int + if err := rows.Scan(&level, &code, &message); err != nil { + apl.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row") + continue + } + if strings.Contains(message, "Duplicate entry") && migrationKeyRegex.MatchString(message) { + continue + } + collectedWarnings = append(collectedWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code)) + } + if err := rows.Err(); err != nil { + return nil, err + } + return collectedWarnings, nil } // LockOriginalTable places a write lock on the original table diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 85a5a01d3..7df6523f4 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -683,17 +683,17 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySuc suite.Require().NoError(err) migrationContext.SetNextIterationRangeMinValues() - hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues() + iterationRange, err := applier.CalculateNextIterationRangeEndValues(false) suite.Require().NoError(err) - suite.Require().True(hasFurtherRange) + suite.Require().True(iterationRange.HasFurtherRange) - _, rowsAffected, _, err := applier.ApplyIterationInsertQuery() + _, rowsAffected, _, sqlWarnings, err := applier.ApplyIterationInsertQuery(ctx, iterationRange) suite.Require().NoError(err) suite.Require().Equal(int64(0), rowsAffected) // Ensure Duplicate entry '42' for key '_testing_gho.item_id' is ignored correctly - suite.Require().Empty(applier.migrationContext.MigrationLastInsertSQLWarnings) + suite.Require().Empty(sqlWarnings) // Check that the row was inserted rows, err := suite.db.Query("SELECT * FROM " + getTestGhostTableName()) @@ -763,17 +763,17 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQueryFai suite.Require().NoError(err) migrationContext.SetNextIterationRangeMinValues() - hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues() + iterationRange, err := applier.CalculateNextIterationRangeEndValues(false) suite.Require().NoError(err) - suite.Require().True(hasFurtherRange) + suite.Require().True(iterationRange.HasFurtherRange) - _, rowsAffected, _, err := applier.ApplyIterationInsertQuery() + _, rowsAffected, _, sqlWarnings, err := applier.ApplyIterationInsertQuery(ctx, iterationRange) suite.Equal(int64(1), rowsAffected) suite.Require().NoError(err) // Verify the warning was recorded and will cause the migrator to panic - suite.Require().NotEmpty(applier.migrationContext.MigrationLastInsertSQLWarnings) - suite.Require().Contains(applier.migrationContext.MigrationLastInsertSQLWarnings[0], "Warning: Data truncated for column 'name' at row 1") + suite.Require().NotEmpty(sqlWarnings) + suite.Require().Contains(sqlWarnings[0], "Warning: Data truncated for column 'name' at row 1") } func (suite *ApplierTestSuite) TestWriteCheckpoint() { diff --git a/go/logic/migrator.go b/go/logic/migrator.go index bec13e594..dfd59ec15 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -20,6 +20,8 @@ import ( "github.com/github/gh-ost/go/binlog" "github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/sql" + + "golang.org/x/sync/errgroup" ) var ( @@ -29,6 +31,12 @@ var ( checkpointTimeout = 2 * time.Second ) +// rowCopyQuantumDuration bounds how long the concurrent row-copy worker pool keeps +// dispatching chunks before yielding control back to executeWriteFuncs (which then +// applies any pending binlog events and re-checks throttling). It trades a small +// increase in worst-case event-apply latency for far fewer per-batch barriers. +const rowCopyQuantumDuration = 200 * time.Millisecond + type ChangelogState string const ( @@ -1565,7 +1573,8 @@ func (mgtr *Migrator) initiateApplier() error { } // iterateChunks iterates the existing table rows, and generates a copy task of -// a chunk of rows onto the ghost table. +// a chunk of rows onto the ghost table. Supports concurrent chunk copying via +// --chunk-concurrent-size. func (mgtr *Migrator) iterateChunks() error { terminateRowIteration := func(err error) error { _ = base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.rowCopyComplete, err) @@ -1580,84 +1589,234 @@ func (mgtr *Migrator) iterateChunks() error { return terminateRowIteration(nil) } + concurrentSize := atomic.LoadInt64(&mgtr.migrationContext.ChunkConcurrentSize) + if concurrentSize < 1 { + concurrentSize = 1 + } + + if concurrentSize == 1 { + return mgtr.iterateChunksSingle(terminateRowIteration) + } + return mgtr.iterateChunksConcurrent(concurrentSize, terminateRowIteration) +} + +// iterateChunksSingle is the single-threaded row-copy loop. It matches master +// behavior exactly: the next-iteration range is (re)calculated inside the retry +// loop so that hook-based chunk-size reduction takes effect on retry. +func (mgtr *Migrator) iterateChunksSingle(terminateRowIteration func(error) error) error { + ctx := mgtr.migrationContext.GetContext() var hasNoFurtherRangeFlag int64 - // Iterate per chunk: for { if err := mgtr.checkAbort(); err != nil { return terminateRowIteration(err) } if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 { - // Done - // There's another such check down the line return nil } copyRowsFunc := func() error { + if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 { + return nil + } + // Min is fixed before retry loop; range calc + insert are retried together. mgtr.migrationContext.SetNextIterationRangeMinValues() - // Copy task: + applyCopyRowsFunc := func() error { if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 { - // Done. - // There's another such check down the line return nil } - // When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever - hasFurtherRange, err := mgtr.applier.CalculateNextIterationRangeEndValues() + iterationRangeValues, err := mgtr.applier.CalculateNextIterationRangeEndValues(false) if err != nil { - return err // wrapping call will retry + return err } - if !hasFurtherRange { + if !iterationRangeValues.HasFurtherRange { atomic.StoreInt64(&hasNoFurtherRangeFlag, 1) - return terminateRowIteration(nil) + return nil } if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 { - // No need for more writes. - // This is the de-facto place where we avoid writing in the event of completed cut-over. - // There could _still_ be a race condition, but that's as close as we can get. - // What about the race condition? Well, there's actually no data integrity issue. - // when rowCopyCompleteFlag==1 that means **guaranteed** all necessary rows have been copied. - // But some are still then collected at the binary log, and these are the ones we're trying to - // not apply here. If the race condition wins over us, then we just attempt to apply onto the - // _ghost_ table, which no longer exists. So, bothering error messages and all, but no damage. return nil } - _, rowsAffected, _, err := mgtr.applier.ApplyIterationInsertQuery() + + _, rowsAffected, _, sqlWarnings, err := mgtr.applier.ApplyIterationInsertQuery(ctx, iterationRangeValues) if err != nil { - return err // wrapping call will retry + return err } - - if mgtr.migrationContext.PanicOnWarnings { - if len(mgtr.migrationContext.MigrationLastInsertSQLWarnings) > 0 { - for _, warning := range mgtr.migrationContext.MigrationLastInsertSQLWarnings { - mgtr.migrationContext.Log.Infof("ApplyIterationInsertQuery has SQL warnings! %s", warning) - } - joinedWarnings := strings.Join(mgtr.migrationContext.MigrationLastInsertSQLWarnings, "; ") - return terminateRowIteration(fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", joinedWarnings)) - } + if err := mgtr.checkInsertWarnings(sqlWarnings); err != nil { + return err } atomic.AddInt64(&mgtr.migrationContext.TotalRowsCopied, rowsAffected) atomic.AddInt64(&mgtr.migrationContext.Iteration, 1) + + mgtr.recordLastIterationRange(iterationRangeValues) return nil } - if err := mgtr.retryBatchCopyWithHooks(applyCopyRowsFunc); err != nil { + if err := mgtr.retryBatchCopyWithHooks(applyCopyRowsFunc); err != nil { //nolint:contextcheck return terminateRowIteration(err) } + if atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 { + return terminateRowIteration(nil) + } + return nil + } + if err := base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.copyRowsQueue, copyRowsFunc); err != nil { + if abortErr := mgtr.checkAbort(); abortErr != nil { + return terminateRowIteration(abortErr) + } + return terminateRowIteration(err) + } + } +} + +// iterateChunksConcurrent copies row chunks using up to `concurrentSize` parallel +// INSERTs per batch. +// +// The dominant cost that previously capped concurrency was not the INSERTs but +// the per-chunk boundary calculation (CalculateNextIterationRangeEndValues): it +// runs a serialized, indexed scan of the source under a global mutex, and in the +// original implementation a batch's boundary calculations could not overlap with +// any INSERT — only the INSERTs ran in parallel, so the serial boundary scans +// (proportional to the whole table) showed up as pure overhead between batches. +// +// To remove that stall we run a single, dedicated producer goroutine that streams +// pre-calculated ranges into a buffered channel. The producer is the sole caller +// of CalculateNextIterationRangeEndValues, so cursor advancement stays correct and +// serialized, but it now runs *concurrently with* the INSERTs of earlier batches: +// by the time a batch is dequeued its ranges are already computed, so the worker +// goroutines never wait on boundary calculation. +func (mgtr *Migrator) iterateChunksConcurrent(concurrentSize int64, terminateRowIteration func(error) error) error { + ctx := mgtr.migrationContext.GetContext() + + // Buffer enough ranges so the producer can run a batch (or two) ahead of the + // consumers without blocking. + rangesChannel := make(chan *base.IterationRangeValues, 2*concurrentSize) + producerErrChannel := make(chan error, 1) - // record last successfully copied range - mgtr.applier.LastIterationRangeMutex.Lock() - if mgtr.migrationContext.MigrationIterationRangeMinValues != nil && mgtr.migrationContext.MigrationIterationRangeMaxValues != nil { - mgtr.applier.LastIterationRangeMinValues = mgtr.migrationContext.MigrationIterationRangeMinValues.Clone() - mgtr.applier.LastIterationRangeMaxValues = mgtr.migrationContext.MigrationIterationRangeMaxValues.Clone() + // Range producer: serialized boundary calculation, decoupled from the INSERTs. + go func() { + defer close(rangesChannel) + for { + if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 { + return + } + if err := mgtr.checkAbort(); err != nil { + return + } + iterationRangeValues, err := mgtr.applier.CalculateNextIterationRangeEndValues(true) + if err != nil { + producerErrChannel <- err + return + } + if !iterationRangeValues.HasFurtherRange { + return + } + if err := base.SendWithContext(ctx, rangesChannel, iterationRangeValues); err != nil { + return + } + } + }() + + var hasNoFurtherRangeFlag int64 + for { + if err := mgtr.checkAbort(); err != nil { + return terminateRowIteration(err) + } + if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 || atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 { + return nil + } + copyRowsFunc := func() error { + if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 { + return nil + } + + g, gctx := errgroup.WithContext(ctx) + g.SetLimit(int(concurrentSize)) + + // Continuously dispatch pre-calculated ranges to up to concurrentSize + // workers for one time quantum. g.Go blocks while concurrentSize inserts + // are in flight, so workers stay saturated and a slow chunk no longer + // stalls the others behind a per-batch barrier; the only barrier is + // g.Wait() at the quantum boundary. Bounding the quantum by time keeps the + // single executeWriteFuncs goroutine returning to apply binlog events. + quantum := time.NewTimer(rowCopyQuantumDuration) + defer quantum.Stop() + + var lastDispatched *base.IterationRangeValues + dispatching := true + for dispatching { + var iterationRangeValues *base.IterationRangeValues + select { + case rv, ok := <-rangesChannel: + if !ok { + // Producer finished: table exhausted, or it hit an error. + select { + case err := <-producerErrChannel: + if err != nil { + _ = g.Wait() + return terminateRowIteration(err) + } + default: + } + atomic.StoreInt64(&hasNoFurtherRangeFlag, 1) + dispatching = false + continue + } + iterationRangeValues = rv + case <-quantum.C: + dispatching = false + continue + case <-gctx.Done(): + // A worker failed (or the migration is aborting); stop dispatching + // and let g.Wait surface the underlying error. + dispatching = false + continue + } + + lastDispatched = iterationRangeValues + rv := iterationRangeValues + g.Go(func() error { + if gctx.Err() != nil { + return gctx.Err() + } + applyCopyRowsFunc := func() error { + if atomic.LoadInt64(&mgtr.rowCopyCompleteFlag) == 1 { + return nil + } + _, rowsAffected, _, sqlWarnings, err := mgtr.applier.ApplyIterationInsertQuery(ctx, rv) + if err != nil { + return err + } + if err := mgtr.checkInsertWarnings(sqlWarnings); err != nil { + return err + } + atomic.AddInt64(&mgtr.migrationContext.TotalRowsCopied, rowsAffected) + atomic.AddInt64(&mgtr.migrationContext.Iteration, 1) + return nil + } + return mgtr.retryBatchCopyWithHooks(applyCopyRowsFunc) //nolint:contextcheck + }) } - mgtr.applier.LastIterationRangeMutex.Unlock() + if err := g.Wait(); err != nil { + return terminateRowIteration(err) + } + + // Every range dispatched this quantum has completed. Ranges are pulled in + // contiguous, increasing order, so the last one dispatched is the highest + // fully-copied boundary — record it so a checkpoint resumes from copied + // data rather than from the producer's prefetched cursor. + if lastDispatched != nil { + mgtr.recordLastIterationRange(lastDispatched) + } + + if atomic.LoadInt64(&hasNoFurtherRangeFlag) == 1 { + return terminateRowIteration(nil) + } return nil } // Enqueue copy operation; to be executed by executeWriteFuncs() // Use helper to prevent deadlock if executeWriteFuncs exits - if err := base.SendWithContext(mgtr.migrationContext.GetContext(), mgtr.copyRowsQueue, copyRowsFunc); err != nil { - // Context cancelled, check for abort and exit + if err := base.SendWithContext(ctx, mgtr.copyRowsQueue, copyRowsFunc); err != nil { if abortErr := mgtr.checkAbort(); abortErr != nil { return terminateRowIteration(abortErr) } @@ -1666,6 +1825,30 @@ func (mgtr *Migrator) iterateChunks() error { } } +// checkInsertWarnings turns SQL warnings collected from a chunk INSERT into a fatal +// error when --panic-on-warnings is set. +func (mgtr *Migrator) checkInsertWarnings(sqlWarnings []string) error { + if !mgtr.migrationContext.PanicOnWarnings || len(sqlWarnings) == 0 { + return nil + } + for _, warning := range sqlWarnings { + mgtr.migrationContext.Log.Infof("ApplyIterationInsertQuery has SQL warnings! %s", warning) + } + return fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", strings.Join(sqlWarnings, "; ")) +} + +// recordLastIterationRange stores the most recently *completed* chunk range, used +// by Checkpoint to resume from a fully-copied boundary. +func (mgtr *Migrator) recordLastIterationRange(iterationRangeValues *base.IterationRangeValues) { + if iterationRangeValues == nil || iterationRangeValues.Min == nil || iterationRangeValues.Max == nil { + return + } + mgtr.applier.LastIterationRangeMutex.Lock() + mgtr.applier.LastIterationRangeMinValues = iterationRangeValues.Min.Clone() + mgtr.applier.LastIterationRangeMaxValues = iterationRangeValues.Max.Clone() + mgtr.applier.LastIterationRangeMutex.Unlock() +} + func (mgtr *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { handleNonDMLEventStruct := func(eventStruct *applyEventStruct) error { if eventStruct.writeFunc != nil {