Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions pkg/copier/buffered.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import (
"errors"
"fmt"
"log/slog"
"math"
"sync"
"sync/atomic"
"time"

"github.com/block/spirit/pkg/applier"
"github.com/block/spirit/pkg/dbconn"
"github.com/block/spirit/pkg/metrics"
"github.com/block/spirit/pkg/status"
"github.com/block/spirit/pkg/table"
"github.com/block/spirit/pkg/throttler"
"github.com/block/spirit/pkg/utils"
Expand Down Expand Up @@ -348,27 +348,30 @@ func (c *buffered) GetETA() string {
c.Lock()
defer c.Unlock()
copiedRows, totalRows, pct := c.getCopyStats()
rowsPerSecond := c.rowsPerSecond.Load()
if pct > 99.99 {
estimate, st := etaEstimate(copiedRows, totalRows, pct, c.rowsPerSecond.Load(), c.startTime)
switch st {
case status.ETADue:
return "DUE"
}
if rowsPerSecond == 0 || time.Since(c.startTime) < copyETAInitialWaitTime {
case status.ETAMeasuring:
return "TBD"
case status.ETAReady, status.ETANone:
// A ready estimate is formatted below; ETANone cannot occur during copy.
}
// divide the remaining rows by how many rows we copied in the last interval per second
// "remainingRows" might be the actual rows or the logical rows since
// c.getCopyStats() and rowsPerSecond change estimation method when the PK is auto-inc.
remainingRows := totalRows - copiedRows
remainingSeconds := math.Floor(float64(remainingRows) / float64(rowsPerSecond))

estimate := time.Duration(remainingSeconds * float64(time.Second))
comparison := c.copierEtaHistory.addCurrentEstimateAndCompare(estimate)
if comparison != "" {
return fmt.Sprintf("%s (%s)", estimate.String(), comparison)
}
return estimate.String()
}

func (c *buffered) GetETAState() status.ETA {
c.Lock()
defer c.Unlock()
copiedRows, totalRows, pct := c.getCopyStats()
estimate, st := etaEstimate(copiedRows, totalRows, pct, c.rowsPerSecond.Load(), c.startTime)
return status.ETA{State: st, Duration: estimate}
}

func (c *buffered) estimateRowsPerSecondLoop(ctx context.Context) {
// We take >10 second averages because with parallel copy it bounces around a lot.
// Get progress from chunker since we no longer track rows locally
Expand Down
31 changes: 31 additions & 0 deletions pkg/copier/copier.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (
"database/sql"
"errors"
"log/slog"
"math"
"time"

"github.com/block/spirit/pkg/applier"
"github.com/block/spirit/pkg/dbconn"
"github.com/block/spirit/pkg/metrics"
"github.com/block/spirit/pkg/status"
"github.com/block/spirit/pkg/table"
"github.com/block/spirit/pkg/throttler"
)
Expand All @@ -22,6 +24,29 @@ const (
copyETAInitialWaitTime = 1 * time.Minute // how long to wait before first estimating copy speed (to allow for fast start)
)

// etaEstimate returns the estimated remaining copy time and the state of that
// estimate. The duration is meaningful only when the state is status.ETAReady.
// No estimate is available before a copy rate has been measured (the first
// copyETAInitialWaitTime, or while no rows have been timed; status.ETAMeasuring)
// or once the copy is essentially complete (pct > 99.99; status.ETADue) — the
// callers present each case (GetETA renders "TBD"/"DUE", GetETAState returns the
// state and 0 seconds).
func etaEstimate(copiedRows, totalRows uint64, pct float64, rowsPerSecond uint64, startTime time.Time) (time.Duration, status.ETAState) {
if pct > 99.99 {
return 0, status.ETADue
}
if rowsPerSecond == 0 || time.Since(startTime) < copyETAInitialWaitTime {
return 0, status.ETAMeasuring
}
// Divide the remaining rows by how many rows we copied in the last interval
// per second. "remainingRows" might be the actual rows or the logical rows
// since getCopyStats() and rowsPerSecond change estimation method when the PK
// is auto-inc.
remainingRows := totalRows - copiedRows
remainingSeconds := math.Floor(float64(remainingRows) / float64(rowsPerSecond))
return time.Duration(remainingSeconds * float64(time.Second)), status.ETAReady
}

// Copier is the interface which copiers use. Currently we only have
// one implementation, which we call unbuffered because it uses
// INSERT .. SELECT without any intermediate buffering in spirit.
Expand All @@ -30,6 +55,12 @@ const (
type Copier interface {
Run(ctx context.Context) error
GetETA() string
// GetETAState returns the structured copy ETA: its availability (so callers
// can distinguish "still measuring" from a real estimate or a near-complete
// copy) and, when available, the estimated remaining time. It is the
// structured counterpart of GetETA, computed in a single read so the state
// and duration are always consistent.
GetETAState() status.ETA
GetChunker() table.Chunker
SetThrottler(throttler throttler.Throttler)
GetThrottler() throttler.Throttler
Expand Down
62 changes: 62 additions & 0 deletions pkg/copier/copier_eta_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package copier

import (
"testing"
"time"

"github.com/block/spirit/pkg/status"
"github.com/stretchr/testify/assert"
)

// etaEstimate reports a remaining-time estimate only once a copy rate has been
// measured and while the copy is still meaningfully in flight; the not-ready
// cases are distinguished as ETAMeasuring (no rate yet) vs ETADue (near done),
// which the GUI/summary callers turn into "TBD"/"DUE"/0.
func TestETAEstimate(t *testing.T) {
measured := time.Now().Add(-2 * copyETAInitialWaitTime)

tests := []struct {
name string
copiedRows uint64
totalRows uint64
pct float64
rowsPerSec uint64
startTime time.Time
wantState status.ETAState
wantDuration time.Duration
}{
{
name: "due once the copy is essentially complete",
copiedRows: 999, totalRows: 1000, pct: 99.999, rowsPerSec: 10, startTime: measured,
wantState: status.ETADue,
},
{
name: "measuring before a copy rate is known",
copiedRows: 100, totalRows: 1000, pct: 10, rowsPerSec: 0, startTime: measured,
wantState: status.ETAMeasuring,
},
{
name: "measuring during the initial wait window",
copiedRows: 100, totalRows: 1000, pct: 10, rowsPerSec: 50, startTime: time.Now(),
wantState: status.ETAMeasuring,
},
{
name: "ready: estimate from remaining rows and rate",
copiedRows: 500, totalRows: 1000, pct: 50, rowsPerSec: 10, startTime: measured,
wantState: status.ETAReady, wantDuration: 50 * time.Second,
},
{
name: "ready: floors fractional seconds",
copiedRows: 0, totalRows: 1000, pct: 0, rowsPerSec: 3, startTime: measured,
wantState: status.ETAReady, wantDuration: 333 * time.Second,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
d, st := etaEstimate(tt.copiedRows, tt.totalRows, tt.pct, tt.rowsPerSec, tt.startTime)
assert.Equal(t, tt.wantState, st)
assert.Equal(t, tt.wantDuration, d)
})
}
}
27 changes: 15 additions & 12 deletions pkg/copier/unbuffered.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
"errors"
"fmt"
"log/slog"
"math"
"sync"
"sync/atomic"
"time"

"github.com/block/spirit/pkg/dbconn"
"github.com/block/spirit/pkg/metrics"
"github.com/block/spirit/pkg/status"
"github.com/block/spirit/pkg/table"
"github.com/block/spirit/pkg/throttler"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -175,27 +175,30 @@ func (c *Unbuffered) GetETA() string {
c.Lock()
defer c.Unlock()
copiedRows, totalRows, pct := c.getCopyStats()
rowsPerSecond := c.rowsPerSecond.Load()
if pct > 99.99 {
estimate, st := etaEstimate(copiedRows, totalRows, pct, c.rowsPerSecond.Load(), c.startTime)
switch st {
case status.ETADue:
return "DUE"
}
if rowsPerSecond == 0 || time.Since(c.startTime) < copyETAInitialWaitTime {
case status.ETAMeasuring:
return "TBD"
case status.ETAReady, status.ETANone:
// A ready estimate is formatted below; ETANone cannot occur during copy.
}
// divide the remaining rows by how many rows we copied in the last interval per second
// "remainingRows" might be the actual rows or the logical rows since
// c.getCopyStats() and rowsPerSecond change estimation method when the PK is auto-inc.
remainingRows := totalRows - copiedRows
remainingSeconds := math.Floor(float64(remainingRows) / float64(rowsPerSecond))

estimate := time.Duration(remainingSeconds * float64(time.Second))
comparison := c.copierEtaHistory.addCurrentEstimateAndCompare(estimate)
if comparison != "" {
return fmt.Sprintf("%s (%s)", estimate.String(), comparison)
}
return estimate.String()
}

func (c *Unbuffered) GetETAState() status.ETA {
c.Lock()
defer c.Unlock()
copiedRows, totalRows, pct := c.getCopyStats()
estimate, st := etaEstimate(copiedRows, totalRows, pct, c.rowsPerSecond.Load(), c.startTime)
return status.ETA{State: st, Duration: estimate}
}

func (c *Unbuffered) estimateRowsPerSecondLoop(ctx context.Context) {
// We take >10 second averages because with parallel copy it bounces around a lot.
// Get progress from chunker since we no longer track rows locally
Expand Down
4 changes: 2 additions & 2 deletions pkg/migration/binlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestE2EBinlogSubscribingCompositeKey(t *testing.T) {
require.NotNil(t, chunk)
require.Equal(t, "((`id1` < 1001)\n OR (`id1` = 1001 AND `id2` < 1))", chunk.String())
require.NoError(t, ccopier.CopyChunk(t.Context(), chunk))
require.Equal(t, status.Progress{CurrentState: status.CopyRows, Summary: "1000/1200 83.33% copyRows ETA TBD", Tables: []status.TableProgress{{TableName: "e2et1", RowsCopied: 1000, RowsTotal: 1200, IsComplete: false}}}, m.Progress())
require.Equal(t, status.Progress{CurrentState: status.CopyRows, Summary: "1000/1200 83.33% copyRows ETA TBD", ETA: status.ETA{State: status.ETAMeasuring}, Tables: []status.TableProgress{{TableName: "e2et1", RowsCopied: 1000, RowsTotal: 1200, IsComplete: false}}}, m.Progress())

// Now insert some data.
testutils.RunSQL(t, `insert into e2et1 (id1, id2) values (1002, 2)`)
Expand All @@ -178,7 +178,7 @@ func TestE2EBinlogSubscribingCompositeKey(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "((`id1` > 1001)\n OR (`id1` = 1001 AND `id2` >= 1))", chunk.String())
require.NoError(t, ccopier.CopyChunk(t.Context(), chunk))
require.Equal(t, status.Progress{CurrentState: status.CopyRows, Summary: "1201/1200 100.08% copyRows ETA DUE", Tables: []status.TableProgress{{TableName: "e2et1", RowsCopied: 1201, RowsTotal: 1200, IsComplete: true}}}, m.Progress())
require.Equal(t, status.Progress{CurrentState: status.CopyRows, Summary: "1201/1200 100.08% copyRows ETA DUE", ETA: status.ETA{State: status.ETADue}, Tables: []status.TableProgress{{TableName: "e2et1", RowsCopied: 1201, RowsTotal: 1200, IsComplete: true}}}, m.Progress())

// Now insert some data.
// This should be picked up by the binlog subscription
Expand Down
3 changes: 3 additions & 0 deletions pkg/migration/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1038,13 +1038,15 @@ func (r *Runner) createCheckpointTable(ctx context.Context) error {

func (r *Runner) Progress() status.Progress {
var summary string
var eta status.ETA
switch r.status.Get() { //nolint: exhaustive
case status.CopyRows:
summary = fmt.Sprintf("%v %s ETA %v",
r.copier.GetProgress(),
r.status.Get().String(),
r.copier.GetETA(),
)
eta = r.copier.GetETAState()
case status.WaitingOnSentinelTable:
summary = "Waiting on Sentinel Table"
case status.ApplyChangeset, status.PostChecksum:
Expand Down Expand Up @@ -1088,6 +1090,7 @@ func (r *Runner) Progress() status.Progress {
return status.Progress{
CurrentState: r.status.Get(),
Summary: summary,
ETA: eta,
Tables: tables,
}
}
Expand Down
34 changes: 34 additions & 0 deletions pkg/status/progress.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,46 @@
package status

import "time"

// Progress is returned as a struct because we may add more to it later.
// It is designed for wrappers (like a GUI) to be able to summarize the
// current status without parsing log output.
// ETAState describes the availability of the row-copy ETA estimate, so callers
// can distinguish "still measuring" from a real estimate without parsing the
// Summary string. It mirrors the cases GetETA renders as text.
type ETAState string

const (
// ETANone means there is no copy ETA because the migration is not in the
// row-copy phase. Duration is 0.
ETANone ETAState = ""
// ETAMeasuring means a copy is in progress but no copy rate has been measured
// yet, so no estimate is available (Summary shows "ETA TBD"). Duration is 0.
ETAMeasuring ETAState = "measuring"
// ETAReady means Duration holds a current remaining-time estimate.
ETAReady ETAState = "ready"
// ETADue means the copy is essentially complete (Summary shows "ETA DUE").
// Duration is 0.
ETADue ETAState = "due"
)

// ETA is the structured form of the ETA embedded in Summary. State reports
// whether Duration is available yet — e.g. ETAMeasuring during the initial
// window before a copy rate is known — so callers can show "calculating" rather
// than a misleading 0. Duration is the estimated remaining row-copy time, valid
// only when State is ETAReady and 0 otherwise.
type ETA struct {
State ETAState
Duration time.Duration
}

type Progress struct {
CurrentState State // current state, i.e. CopyRows
Summary string // text based representation, i.e. "12.5% copyRows ETA 1h 30m"

// ETA is the structured remaining row-copy estimate and its availability.
ETA ETA

// Tables contains per-table progress for multi-table migrations.
// For single-table migrations, this will have one entry.
Tables []TableProgress
Expand Down
Loading