Skip to content

Commit de32943

Browse files
dnovitskishaohkCopilot
committed
Add --chunk-concurrent-size for parallel row-copy
Port of PR #1398 by @shaohk: allows multiple row-copy chunks to execute in parallel within each iteration using errgroup. Key changes: - Add IterationRangeValues struct for thread-safe range passing - Serialize range calculation with CalculateNextIterationRangeEndValuesLock - Rewrite iterateChunks to spawn N goroutines per queue item via errgroup - Return SQL warnings from ApplyIterationInsertQuery (eliminates race on shared MigrationLastInsertSQLWarnings field) - Increase DB connection pool when concurrency > default pool size - Add --chunk-concurrent-size CLI flag (default 1, no behavior change) Co-authored-by: shaohk <shaohk@users.noreply.github.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 154d214 commit de32943

6 files changed

Lines changed: 272 additions & 137 deletions

File tree

doc/command-line-flags.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,16 @@ See also: [`resuming-migrations`](resume.md)
7878

7979
`--checkpoint-seconds` specifies the seconds between checkpoints. Default is 300.
8080

81+
### chunk-concurrent-size
82+
83+
`--chunk-concurrent-size=1`, the number of goroutines to execute chunk-copy operations concurrently in each copy time slot. Default `1` (sequential). Minimum `1`.
84+
85+
When set to a value greater than 1, multiple chunks are calculated and copied in parallel within each write-function invocation. This can significantly speed up row-copy on large tables when MySQL can handle concurrent writes to the ghost table.
86+
87+
Each concurrent chunk calculates its own non-overlapping key range under a serialization lock, so there is no risk of duplicate or overlapping copies.
88+
89+
Note: concurrency multiplies write pressure per time slot. Throttling (`--max-load`, `--nice-ratio`) applies per batch, not per chunk. Start with small values (2-8) and monitor replication lag.
90+
8191
### conf
8292

8393
`--conf=/path/to/my.cnf`: file where credentials are specified. Should be in (or contain) the following format:

go/base/context.go

Lines changed: 65 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,16 @@ import (
2626
"github.com/go-ini/ini"
2727
)
2828

29+
// IterationRangeValues holds the range boundaries for a single chunk-copy iteration.
30+
// Used by concurrent row-copy to pass isolated range values to each worker goroutine.
31+
type IterationRangeValues struct {
32+
Min *sql.ColumnValues
33+
Max *sql.ColumnValues
34+
Size int64
35+
IncludeMinValues bool
36+
HasFurtherRange bool
37+
}
38+
2939
// RowsEstimateMethod is the type of row number estimation
3040
type RowsEstimateMethod string
3141

@@ -130,6 +140,7 @@ type MigrationContext struct {
130140
HeartbeatIntervalMilliseconds int64
131141
defaultNumRetries int64
132142
ChunkSize int64
143+
ChunkConcurrentSize int64
133144
niceRatio float64
134145
MaxLagMillisecondsThrottleThreshold int64
135146
throttleControlReplicaKeys *mysql.InstanceKeyMap
@@ -237,27 +248,28 @@ type MigrationContext struct {
237248
AbortError error
238249
abortMutex *sync.Mutex
239250

240-
OriginalTableColumnsOnApplier *sql.ColumnList
241-
OriginalTableColumns *sql.ColumnList
242-
OriginalTableVirtualColumns *sql.ColumnList
243-
OriginalTableUniqueKeys [](*sql.UniqueKey)
244-
OriginalTableAutoIncrement uint64
245-
GhostTableColumns *sql.ColumnList
246-
GhostTableVirtualColumns *sql.ColumnList
247-
GhostTableUniqueKeys [](*sql.UniqueKey)
248-
UniqueKey *sql.UniqueKey
249-
SharedColumns *sql.ColumnList
250-
ColumnRenameMap map[string]string
251-
DroppedColumnsMap map[string]bool
252-
MappedSharedColumns *sql.ColumnList
253-
MigrationLastInsertSQLWarnings []string
254-
MigrationRangeMinValues *sql.ColumnValues
255-
MigrationRangeMaxValues *sql.ColumnValues
256-
Iteration int64
257-
MigrationIterationRangeMinValues *sql.ColumnValues
258-
MigrationIterationRangeMaxValues *sql.ColumnValues
259-
InitialStreamerCoords mysql.BinlogCoordinates
260-
ForceTmpTableName string
251+
OriginalTableColumnsOnApplier *sql.ColumnList
252+
OriginalTableColumns *sql.ColumnList
253+
OriginalTableVirtualColumns *sql.ColumnList
254+
OriginalTableUniqueKeys [](*sql.UniqueKey)
255+
OriginalTableAutoIncrement uint64
256+
GhostTableColumns *sql.ColumnList
257+
GhostTableVirtualColumns *sql.ColumnList
258+
GhostTableUniqueKeys [](*sql.UniqueKey)
259+
UniqueKey *sql.UniqueKey
260+
SharedColumns *sql.ColumnList
261+
ColumnRenameMap map[string]string
262+
DroppedColumnsMap map[string]bool
263+
MappedSharedColumns *sql.ColumnList
264+
MigrationLastInsertSQLWarnings []string
265+
MigrationRangeMinValues *sql.ColumnValues
266+
MigrationRangeMaxValues *sql.ColumnValues
267+
Iteration int64
268+
MigrationIterationRangeMinValues *sql.ColumnValues
269+
MigrationIterationRangeMaxValues *sql.ColumnValues
270+
CalculateNextIterationRangeEndValuesLock *sync.Mutex
271+
InitialStreamerCoords mysql.BinlogCoordinates
272+
ForceTmpTableName string
261273

262274
IncludeTriggers bool
263275
RemoveTriggerSuffix bool
@@ -307,29 +319,31 @@ type ContextConfig struct {
307319
func NewMigrationContext() *MigrationContext {
308320
ctx, cancelFunc := context.WithCancel(context.Background())
309321
return &MigrationContext{
310-
Uuid: uuid.NewString(),
311-
defaultNumRetries: 60,
312-
ChunkSize: 1000,
313-
InspectorConnectionConfig: mysql.NewConnectionConfig(),
314-
ApplierConnectionConfig: mysql.NewConnectionConfig(),
315-
MaxLagMillisecondsThrottleThreshold: 1500,
316-
CutOverLockTimeoutSeconds: 3,
317-
DMLBatchSize: 10,
318-
etaNanoseonds: ETAUnknown,
319-
maxLoad: NewLoadMap(),
320-
criticalLoad: NewLoadMap(),
321-
throttleMutex: &sync.Mutex{},
322-
throttleHTTPMutex: &sync.Mutex{},
323-
throttleControlReplicaKeys: mysql.NewInstanceKeyMap(),
324-
configMutex: &sync.Mutex{},
325-
pointOfInterestTimeMutex: &sync.Mutex{},
326-
lastHeartbeatOnChangelogMutex: &sync.Mutex{},
327-
ColumnRenameMap: make(map[string]string),
328-
PanicAbort: make(chan error),
329-
ctx: ctx,
330-
cancelFunc: cancelFunc,
331-
abortMutex: &sync.Mutex{},
332-
Log: NewDefaultLogger(),
322+
Uuid: uuid.NewString(),
323+
defaultNumRetries: 60,
324+
ChunkSize: 1000,
325+
ChunkConcurrentSize: 1,
326+
InspectorConnectionConfig: mysql.NewConnectionConfig(),
327+
ApplierConnectionConfig: mysql.NewConnectionConfig(),
328+
MaxLagMillisecondsThrottleThreshold: 1500,
329+
CutOverLockTimeoutSeconds: 3,
330+
DMLBatchSize: 10,
331+
etaNanoseonds: ETAUnknown,
332+
maxLoad: NewLoadMap(),
333+
criticalLoad: NewLoadMap(),
334+
throttleMutex: &sync.Mutex{},
335+
throttleHTTPMutex: &sync.Mutex{},
336+
throttleControlReplicaKeys: mysql.NewInstanceKeyMap(),
337+
configMutex: &sync.Mutex{},
338+
pointOfInterestTimeMutex: &sync.Mutex{},
339+
lastHeartbeatOnChangelogMutex: &sync.Mutex{},
340+
CalculateNextIterationRangeEndValuesLock: &sync.Mutex{},
341+
ColumnRenameMap: make(map[string]string),
342+
PanicAbort: make(chan error),
343+
ctx: ctx,
344+
cancelFunc: cancelFunc,
345+
abortMutex: &sync.Mutex{},
346+
Log: NewDefaultLogger(),
333347
}
334348
}
335349

@@ -690,6 +704,13 @@ func (mctx *MigrationContext) SetChunkSize(chunkSize int64) {
690704
atomic.StoreInt64(&mctx.ChunkSize, chunkSize)
691705
}
692706

707+
func (mctx *MigrationContext) SetChunkConcurrentSize(chunkConcurrentSize int64) {
708+
if chunkConcurrentSize < 1 {
709+
chunkConcurrentSize = 1
710+
}
711+
atomic.StoreInt64(&mctx.ChunkConcurrentSize, chunkConcurrentSize)
712+
}
713+
693714
func (mctx *MigrationContext) SetDMLBatchSize(batchSize int64) {
694715
if batchSize < 1 {
695716
batchSize = 1

go/cmd/gh-ost/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ func main() {
107107
flag.BoolVar(&migrationContext.CutOverExponentialBackoff, "cut-over-exponential-backoff", false, "Wait exponentially longer intervals between failed cut-over attempts. Wait intervals obey a maximum configurable with 'exponential-backoff-max-interval').")
108108
exponentialBackoffMaxInterval := flag.Int64("exponential-backoff-max-interval", 64, "Maximum number of seconds to wait between attempts when performing various operations with exponential backoff.")
109109
chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 10-100,000)")
110+
chunkConcurrentSize := flag.Int64("chunk-concurrent-size", 1, "number of goroutines to execute chunks concurrently in each copy time slot (range 1-100)")
110111
dmlBatchSize := flag.Int64("dml-batch-size", 10, "batch size for DML events to apply in a single transaction (range 1-1000)")
111112
defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking")
112113
flag.BoolVar(&migrationContext.PanicOnWarnings, "panic-on-warnings", false, "Panic when SQL warnings are encountered when copying a batch indicating data loss")
@@ -355,6 +356,7 @@ func main() {
355356
migrationContext.SetHeartbeatIntervalMilliseconds(*heartbeatIntervalMillis)
356357
migrationContext.SetNiceRatio(*niceRatio)
357358
migrationContext.SetChunkSize(*chunkSize)
359+
migrationContext.SetChunkConcurrentSize(*chunkConcurrentSize)
358360
migrationContext.SetDMLBatchSize(*dmlBatchSize)
359361
migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis)
360362
migrationContext.SetThrottleQuery(*throttleQuery)

0 commit comments

Comments
 (0)