Skip to content

Commit

Permalink
Support excluding lagging tablets and use this in vstream manager
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Feb 20, 2025
1 parent 2118bc3 commit ca229ff
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 7 deletions.
17 changes: 10 additions & 7 deletions go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ func SetTabletPickerRetryDelay(delay time.Duration) {
}

type TabletPickerOptions struct {
CellPreference string
TabletOrder string
IncludeNonServingTablets bool
CellPreference string
TabletOrder string
IncludeNonServingTablets bool
ExcludeTabletsWithMaxReplicationLag time.Duration
}

func parseTabletPickerCellPreferenceString(str string) (TabletPickerCellPreference, error) {
Expand Down Expand Up @@ -356,8 +357,8 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table
if len(candidates) == 0 {
// If no viable candidates were found, sleep and try again.
tp.incNoTabletFoundStat()
log.Infof("No healthy serving tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, sleeping for %.3f seconds.",
tp.keyspace, tp.shard, tp.cells, tp.tabletTypes, float64(GetTabletPickerRetryDelay().Milliseconds())/1000.0)
log.Infof("No healthy serving tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, maxReplicationLag: %v, sleeping for %.3f seconds.",
tp.keyspace, tp.shard, tp.cells, tp.tabletTypes, tp.options.ExcludeTabletsWithMaxReplicationLag, float64(GetTabletPickerRetryDelay().Milliseconds())/1000.0)
timer := time.NewTimer(GetTabletPickerRetryDelay())
select {
case <-ctx.Done():
Expand Down Expand Up @@ -471,8 +472,10 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
if err := conn.StreamHealth(shortCtx, func(shr *querypb.StreamHealthResponse) error {
if shr != nil &&
(shr.Serving || tp.options.IncludeNonServingTablets) &&
shr.RealtimeStats != nil &&
shr.RealtimeStats.HealthError == "" {
(shr.RealtimeStats != nil && shr.RealtimeStats.HealthError == "" &&
(tabletInfo.Tablet.Type == topodatapb.TabletType_PRIMARY /* lag is not relevant */ ||
(tp.options.ExcludeTabletsWithMaxReplicationLag == 0 /* not set */ ||
shr.RealtimeStats.ReplicationLagSeconds <= uint32(tp.options.ExcludeTabletsWithMaxReplicationLag.Seconds())))) {
return io.EOF // End the stream
}
return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving")
Expand Down
59 changes: 59 additions & 0 deletions go/vt/discovery/tablet_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,60 @@ func TestPickNonServingTablets(t *testing.T) {
assert.True(t, picked3)
}

// TestPickNonLaggingTablets validates that lagging tablets are excluded when the
// ExcludeTabletsWithMaxReplicationLag option is set.
func TestPickNonLaggingTablets(t *testing.T) {
ctx := utils.LeakCheckContext(t)
cells := []string{"cell1"}
defaultCell := cells[0]
tabletTypes := "replica"
options := TabletPickerOptions{
ExcludeTabletsWithMaxReplicationLag: lowReplicationLag.Default(),
}
replLag := options.ExcludeTabletsWithMaxReplicationLag + (5 * time.Second)
te := newPickerTestEnv(t, ctx, cells)

// Tablet should not be selected as we only want replicas.
primaryTablet := addTablet(ctx, te, 100, topodatapb.TabletType_PRIMARY, defaultCell, true, true)
defer deleteTablet(t, te, primaryTablet)

// Tablet should not be selected as it is lagging.
laggingReplicaTablet := addTabletWithLag(ctx, te, 200, topodatapb.TabletType_REPLICA, defaultCell, true, true, uint32(replLag.Seconds()))
defer deleteTablet(t, te, laggingReplicaTablet)

// Tablet should be selected because it's a non-lagging replica.
nonLaggingReplicaTablet := addTablet(ctx, te, 300, topodatapb.TabletType_REPLICA, defaultCell, true, true)
defer deleteTablet(t, te, nonLaggingReplicaTablet)

_, err := te.topoServ.UpdateShardFields(ctx, te.keyspace, te.shard, func(si *topo.ShardInfo) error {
si.PrimaryAlias = primaryTablet.Alias
return nil
})
require.NoError(t, err)

tp, err := NewTabletPicker(ctx, te.topoServ, cells, defaultCell, te.keyspace, te.shard, tabletTypes, options)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()
var pickedPrimary, pickedLaggingReplica, pickedNonLaggingReplica int
for i := 0; i < numTestIterations; i++ {
tablet, err := tp.PickForStreaming(ctx)
require.NoError(t, err)
if proto.Equal(tablet, primaryTablet) {
pickedPrimary++
}
if proto.Equal(tablet, laggingReplicaTablet) {
pickedLaggingReplica++
}
if proto.Equal(tablet, nonLaggingReplicaTablet) {
pickedNonLaggingReplica++
}
}
assert.Zero(t, pickedPrimary)
assert.Zero(t, pickedLaggingReplica)
assert.Equal(t, numTestIterations, pickedNonLaggingReplica)
}

type pickerTestEnv struct {
t *testing.T
keyspace string
Expand Down Expand Up @@ -720,6 +774,10 @@ func newPickerTestEnv(t *testing.T, ctx context.Context, cells []string, extraCe
}

func addTablet(ctx context.Context, te *pickerTestEnv, id int, tabletType topodatapb.TabletType, cell string, serving, healthy bool) *topodatapb.Tablet {
return addTabletWithLag(ctx, te, id, tabletType, cell, serving, healthy, 0)
}

func addTabletWithLag(ctx context.Context, te *pickerTestEnv, id int, tabletType topodatapb.TabletType, cell string, serving, healthy bool, replLagSecs uint32) *topodatapb.Tablet {
tablet := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: cell,
Expand Down Expand Up @@ -748,6 +806,7 @@ func addTablet(ctx context.Context, te *pickerTestEnv, id int, tabletType topoda
if healthy {
shr.RealtimeStats.HealthError = ""
}
shr.RealtimeStats.ReplicationLagSeconds = replLagSecs

_ = createFixedHealthConn(tablet, shr)

Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta
tabletPickerOptions: discovery.TabletPickerOptions{
CellPreference: flags.GetCellPreference(),
TabletOrder: flags.GetTabletOrder(),
// This is NOT configurable via the API because we check the
// discovery.GetLowReplicationLag().Seconds() value in the tablet
// health stream.
ExcludeTabletsWithMaxReplicationLag: discovery.GetLowReplicationLag(),
},
flags: flags,
}
Expand Down

0 comments on commit ca229ff

Please sign in to comment.