diff --git a/pkg/copier/buffered.go b/pkg/copier/buffered.go index 2b5cdeaf..a0a9d95a 100644 --- a/pkg/copier/buffered.go +++ b/pkg/copier/buffered.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "log/slog" - "math" "sync" "sync/atomic" "time" @@ -14,6 +13,7 @@ import ( "github.com/block/spirit/pkg/applier" "github.com/block/spirit/pkg/dbconn" "github.com/block/spirit/pkg/metrics" + "github.com/block/spirit/pkg/status" "github.com/block/spirit/pkg/table" "github.com/block/spirit/pkg/throttler" "github.com/block/spirit/pkg/utils" @@ -348,20 +348,15 @@ func (c *buffered) GetETA() string { c.Lock() defer c.Unlock() copiedRows, totalRows, pct := c.getCopyStats() - rowsPerSecond := c.rowsPerSecond.Load() - if pct > 99.99 { + estimate, st := etaEstimate(copiedRows, totalRows, pct, c.rowsPerSecond.Load(), c.startTime) + switch st { + case status.ETADue: return "DUE" - } - if rowsPerSecond == 0 || time.Since(c.startTime) < copyETAInitialWaitTime { + case status.ETAMeasuring: return "TBD" + case status.ETAReady, status.ETANone: + // A ready estimate is formatted below; ETANone cannot occur during copy. } - // divide the remaining rows by how many rows we copied in the last interval per second - // "remainingRows" might be the actual rows or the logical rows since - // c.getCopyStats() and rowsPerSecond change estimation method when the PK is auto-inc. - remainingRows := totalRows - copiedRows - remainingSeconds := math.Floor(float64(remainingRows) / float64(rowsPerSecond)) - - estimate := time.Duration(remainingSeconds * float64(time.Second)) comparison := c.copierEtaHistory.addCurrentEstimateAndCompare(estimate) if comparison != "" { return fmt.Sprintf("%s (%s)", estimate.String(), comparison) @@ -369,6 +364,14 @@ func (c *buffered) GetETA() string { return estimate.String() } +func (c *buffered) GetETAState() status.ETA { + c.Lock() + defer c.Unlock() + copiedRows, totalRows, pct := c.getCopyStats() + estimate, st := etaEstimate(copiedRows, totalRows, pct, c.rowsPerSecond.Load(), c.startTime) + return status.ETA{State: st, Duration: estimate} +} + func (c *buffered) estimateRowsPerSecondLoop(ctx context.Context) { // We take >10 second averages because with parallel copy it bounces around a lot. // Get progress from chunker since we no longer track rows locally diff --git a/pkg/copier/copier.go b/pkg/copier/copier.go index f4321cab..ba700a42 100644 --- a/pkg/copier/copier.go +++ b/pkg/copier/copier.go @@ -8,11 +8,13 @@ import ( "database/sql" "errors" "log/slog" + "math" "time" "github.com/block/spirit/pkg/applier" "github.com/block/spirit/pkg/dbconn" "github.com/block/spirit/pkg/metrics" + "github.com/block/spirit/pkg/status" "github.com/block/spirit/pkg/table" "github.com/block/spirit/pkg/throttler" ) @@ -22,6 +24,29 @@ const ( copyETAInitialWaitTime = 1 * time.Minute // how long to wait before first estimating copy speed (to allow for fast start) ) +// etaEstimate returns the estimated remaining copy time and the state of that +// estimate. The duration is meaningful only when the state is status.ETAReady. +// No estimate is available before a copy rate has been measured (the first +// copyETAInitialWaitTime, or while no rows have been timed; status.ETAMeasuring) +// or once the copy is essentially complete (pct > 99.99; status.ETADue) — the +// callers present each case (GetETA renders "TBD"/"DUE", GetETAState returns the +// state and 0 seconds). +func etaEstimate(copiedRows, totalRows uint64, pct float64, rowsPerSecond uint64, startTime time.Time) (time.Duration, status.ETAState) { + if pct > 99.99 { + return 0, status.ETADue + } + if rowsPerSecond == 0 || time.Since(startTime) < copyETAInitialWaitTime { + return 0, status.ETAMeasuring + } + // Divide the remaining rows by how many rows we copied in the last interval + // per second. "remainingRows" might be the actual rows or the logical rows + // since getCopyStats() and rowsPerSecond change estimation method when the PK + // is auto-inc. + remainingRows := totalRows - copiedRows + remainingSeconds := math.Floor(float64(remainingRows) / float64(rowsPerSecond)) + return time.Duration(remainingSeconds * float64(time.Second)), status.ETAReady +} + // Copier is the interface which copiers use. Currently we only have // one implementation, which we call unbuffered because it uses // INSERT .. SELECT without any intermediate buffering in spirit. @@ -30,6 +55,12 @@ const ( type Copier interface { Run(ctx context.Context) error GetETA() string + // GetETAState returns the structured copy ETA: its availability (so callers + // can distinguish "still measuring" from a real estimate or a near-complete + // copy) and, when available, the estimated remaining time. It is the + // structured counterpart of GetETA, computed in a single read so the state + // and duration are always consistent. + GetETAState() status.ETA GetChunker() table.Chunker SetThrottler(throttler throttler.Throttler) GetThrottler() throttler.Throttler diff --git a/pkg/copier/copier_eta_test.go b/pkg/copier/copier_eta_test.go new file mode 100644 index 00000000..c50da4e9 --- /dev/null +++ b/pkg/copier/copier_eta_test.go @@ -0,0 +1,62 @@ +package copier + +import ( + "testing" + "time" + + "github.com/block/spirit/pkg/status" + "github.com/stretchr/testify/assert" +) + +// etaEstimate reports a remaining-time estimate only once a copy rate has been +// measured and while the copy is still meaningfully in flight; the not-ready +// cases are distinguished as ETAMeasuring (no rate yet) vs ETADue (near done), +// which the GUI/summary callers turn into "TBD"/"DUE"/0. +func TestETAEstimate(t *testing.T) { + measured := time.Now().Add(-2 * copyETAInitialWaitTime) + + tests := []struct { + name string + copiedRows uint64 + totalRows uint64 + pct float64 + rowsPerSec uint64 + startTime time.Time + wantState status.ETAState + wantDuration time.Duration + }{ + { + name: "due once the copy is essentially complete", + copiedRows: 999, totalRows: 1000, pct: 99.999, rowsPerSec: 10, startTime: measured, + wantState: status.ETADue, + }, + { + name: "measuring before a copy rate is known", + copiedRows: 100, totalRows: 1000, pct: 10, rowsPerSec: 0, startTime: measured, + wantState: status.ETAMeasuring, + }, + { + name: "measuring during the initial wait window", + copiedRows: 100, totalRows: 1000, pct: 10, rowsPerSec: 50, startTime: time.Now(), + wantState: status.ETAMeasuring, + }, + { + name: "ready: estimate from remaining rows and rate", + copiedRows: 500, totalRows: 1000, pct: 50, rowsPerSec: 10, startTime: measured, + wantState: status.ETAReady, wantDuration: 50 * time.Second, + }, + { + name: "ready: floors fractional seconds", + copiedRows: 0, totalRows: 1000, pct: 0, rowsPerSec: 3, startTime: measured, + wantState: status.ETAReady, wantDuration: 333 * time.Second, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + d, st := etaEstimate(tt.copiedRows, tt.totalRows, tt.pct, tt.rowsPerSec, tt.startTime) + assert.Equal(t, tt.wantState, st) + assert.Equal(t, tt.wantDuration, d) + }) + } +} diff --git a/pkg/copier/unbuffered.go b/pkg/copier/unbuffered.go index bf002a43..2226999d 100644 --- a/pkg/copier/unbuffered.go +++ b/pkg/copier/unbuffered.go @@ -6,13 +6,13 @@ import ( "errors" "fmt" "log/slog" - "math" "sync" "sync/atomic" "time" "github.com/block/spirit/pkg/dbconn" "github.com/block/spirit/pkg/metrics" + "github.com/block/spirit/pkg/status" "github.com/block/spirit/pkg/table" "github.com/block/spirit/pkg/throttler" "golang.org/x/sync/errgroup" @@ -175,20 +175,15 @@ func (c *Unbuffered) GetETA() string { c.Lock() defer c.Unlock() copiedRows, totalRows, pct := c.getCopyStats() - rowsPerSecond := c.rowsPerSecond.Load() - if pct > 99.99 { + estimate, st := etaEstimate(copiedRows, totalRows, pct, c.rowsPerSecond.Load(), c.startTime) + switch st { + case status.ETADue: return "DUE" - } - if rowsPerSecond == 0 || time.Since(c.startTime) < copyETAInitialWaitTime { + case status.ETAMeasuring: return "TBD" + case status.ETAReady, status.ETANone: + // A ready estimate is formatted below; ETANone cannot occur during copy. } - // divide the remaining rows by how many rows we copied in the last interval per second - // "remainingRows" might be the actual rows or the logical rows since - // c.getCopyStats() and rowsPerSecond change estimation method when the PK is auto-inc. - remainingRows := totalRows - copiedRows - remainingSeconds := math.Floor(float64(remainingRows) / float64(rowsPerSecond)) - - estimate := time.Duration(remainingSeconds * float64(time.Second)) comparison := c.copierEtaHistory.addCurrentEstimateAndCompare(estimate) if comparison != "" { return fmt.Sprintf("%s (%s)", estimate.String(), comparison) @@ -196,6 +191,14 @@ func (c *Unbuffered) GetETA() string { return estimate.String() } +func (c *Unbuffered) GetETAState() status.ETA { + c.Lock() + defer c.Unlock() + copiedRows, totalRows, pct := c.getCopyStats() + estimate, st := etaEstimate(copiedRows, totalRows, pct, c.rowsPerSecond.Load(), c.startTime) + return status.ETA{State: st, Duration: estimate} +} + func (c *Unbuffered) estimateRowsPerSecondLoop(ctx context.Context) { // We take >10 second averages because with parallel copy it bounces around a lot. // Get progress from chunker since we no longer track rows locally diff --git a/pkg/migration/binlog_test.go b/pkg/migration/binlog_test.go index bb719c7c..c8b07cf4 100644 --- a/pkg/migration/binlog_test.go +++ b/pkg/migration/binlog_test.go @@ -163,7 +163,7 @@ func TestE2EBinlogSubscribingCompositeKey(t *testing.T) { require.NotNil(t, chunk) require.Equal(t, "((`id1` < 1001)\n OR (`id1` = 1001 AND `id2` < 1))", chunk.String()) require.NoError(t, ccopier.CopyChunk(t.Context(), chunk)) - require.Equal(t, status.Progress{CurrentState: status.CopyRows, Summary: "1000/1200 83.33% copyRows ETA TBD", Tables: []status.TableProgress{{TableName: "e2et1", RowsCopied: 1000, RowsTotal: 1200, IsComplete: false}}}, m.Progress()) + require.Equal(t, status.Progress{CurrentState: status.CopyRows, Summary: "1000/1200 83.33% copyRows ETA TBD", ETA: status.ETA{State: status.ETAMeasuring}, Tables: []status.TableProgress{{TableName: "e2et1", RowsCopied: 1000, RowsTotal: 1200, IsComplete: false}}}, m.Progress()) // Now insert some data. testutils.RunSQL(t, `insert into e2et1 (id1, id2) values (1002, 2)`) @@ -178,7 +178,7 @@ func TestE2EBinlogSubscribingCompositeKey(t *testing.T) { require.NoError(t, err) require.Equal(t, "((`id1` > 1001)\n OR (`id1` = 1001 AND `id2` >= 1))", chunk.String()) require.NoError(t, ccopier.CopyChunk(t.Context(), chunk)) - require.Equal(t, status.Progress{CurrentState: status.CopyRows, Summary: "1201/1200 100.08% copyRows ETA DUE", Tables: []status.TableProgress{{TableName: "e2et1", RowsCopied: 1201, RowsTotal: 1200, IsComplete: true}}}, m.Progress()) + require.Equal(t, status.Progress{CurrentState: status.CopyRows, Summary: "1201/1200 100.08% copyRows ETA DUE", ETA: status.ETA{State: status.ETADue}, Tables: []status.TableProgress{{TableName: "e2et1", RowsCopied: 1201, RowsTotal: 1200, IsComplete: true}}}, m.Progress()) // Now insert some data. // This should be picked up by the binlog subscription diff --git a/pkg/migration/runner.go b/pkg/migration/runner.go index a1e9b4b0..0a91180c 100644 --- a/pkg/migration/runner.go +++ b/pkg/migration/runner.go @@ -1038,6 +1038,7 @@ func (r *Runner) createCheckpointTable(ctx context.Context) error { func (r *Runner) Progress() status.Progress { var summary string + var eta status.ETA switch r.status.Get() { //nolint: exhaustive case status.CopyRows: summary = fmt.Sprintf("%v %s ETA %v", @@ -1045,6 +1046,7 @@ func (r *Runner) Progress() status.Progress { r.status.Get().String(), r.copier.GetETA(), ) + eta = r.copier.GetETAState() case status.WaitingOnSentinelTable: summary = "Waiting on Sentinel Table" case status.ApplyChangeset, status.PostChecksum: @@ -1088,6 +1090,7 @@ func (r *Runner) Progress() status.Progress { return status.Progress{ CurrentState: r.status.Get(), Summary: summary, + ETA: eta, Tables: tables, } } diff --git a/pkg/status/progress.go b/pkg/status/progress.go index bc63cae3..2c6d8a68 100644 --- a/pkg/status/progress.go +++ b/pkg/status/progress.go @@ -1,12 +1,46 @@ package status +import "time" + // Progress is returned as a struct because we may add more to it later. // It is designed for wrappers (like a GUI) to be able to summarize the // current status without parsing log output. +// ETAState describes the availability of the row-copy ETA estimate, so callers +// can distinguish "still measuring" from a real estimate without parsing the +// Summary string. It mirrors the cases GetETA renders as text. +type ETAState string + +const ( + // ETANone means there is no copy ETA because the migration is not in the + // row-copy phase. Duration is 0. + ETANone ETAState = "" + // ETAMeasuring means a copy is in progress but no copy rate has been measured + // yet, so no estimate is available (Summary shows "ETA TBD"). Duration is 0. + ETAMeasuring ETAState = "measuring" + // ETAReady means Duration holds a current remaining-time estimate. + ETAReady ETAState = "ready" + // ETADue means the copy is essentially complete (Summary shows "ETA DUE"). + // Duration is 0. + ETADue ETAState = "due" +) + +// ETA is the structured form of the ETA embedded in Summary. State reports +// whether Duration is available yet — e.g. ETAMeasuring during the initial +// window before a copy rate is known — so callers can show "calculating" rather +// than a misleading 0. Duration is the estimated remaining row-copy time, valid +// only when State is ETAReady and 0 otherwise. +type ETA struct { + State ETAState + Duration time.Duration +} + type Progress struct { CurrentState State // current state, i.e. CopyRows Summary string // text based representation, i.e. "12.5% copyRows ETA 1h 30m" + // ETA is the structured remaining row-copy estimate and its availability. + ETA ETA + // Tables contains per-table progress for multi-table migrations. // For single-table migrations, this will have one entry. Tables []TableProgress