Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(bigtable): Track number of rows read to set rowsLimit in subsequent requests #10213

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
29 changes: 24 additions & 5 deletions bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,21 @@ func (t *Table) ReadRows(ctx context.Context, arg RowSet, f func(Row) bool, opts
func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *builtinMetricsTracer, opts ...ReadOption) (err error) {
var prevRowKey string
attrMap := make(map[string]interface{})

numRowsRead := int64(0)
rowLimitSet := false
intialRowLimit := int64(0)
for _, opt := range opts {
if l, ok := opt.(limitRows); ok {
rowLimitSet = true
intialRowLimit = l.limit
bhshkh marked this conversation as resolved.
Show resolved Hide resolved
}
}
err = gaxInvokeWithRecorder(ctx, mt, "ReadRows", func(ctx context.Context, headerMD, trailerMD *metadata.MD, _ gax.CallSettings) error {
if rowLimitSet && numRowsRead == intialRowLimit {
bhshkh marked this conversation as resolved.
Show resolved Hide resolved
return nil
bhshkh marked this conversation as resolved.
Show resolved Hide resolved
}

req := &btpb.ReadRowsRequest{
AppProfileId: t.c.appProfile,
}
Expand All @@ -410,7 +424,7 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *
}
req.Rows = arg.proto()
}
settings := makeReadSettings(req)
settings := makeReadSettings(req, numRowsRead)
for _, opt := range opts {
mutianf marked this conversation as resolved.
Show resolved Hide resolved
opt.set(&settings)
}
Expand Down Expand Up @@ -473,7 +487,9 @@ func (t *Table) readRows(ctx context.Context, arg RowSet, f func(Row) bool, mt *
continue
}
prevRowKey = row.Key()
if !f(row) {
continueReading := f(row)
numRowsRead++
if !continueReading {
// Cancel and drain stream.
cancel()
for {
Expand Down Expand Up @@ -939,10 +955,11 @@ type FullReadStatsFunc func(*FullReadStats)
type readSettings struct {
req *btpb.ReadRowsRequest
fullReadStatsFunc FullReadStatsFunc
numRowsRead int64
}

func makeReadSettings(req *btpb.ReadRowsRequest) readSettings {
return readSettings{req, nil}
func makeReadSettings(req *btpb.ReadRowsRequest, numRowsRead int64) readSettings {
bhshkh marked this conversation as resolved.
Show resolved Hide resolved
return readSettings{req, nil, numRowsRead}
}

// A ReadOption is an optional argument to ReadRows.
Expand All @@ -965,7 +982,9 @@ func LimitRows(limit int64) ReadOption { return limitRows{limit} }

type limitRows struct{ limit int64 }

func (lr limitRows) set(settings *readSettings) { settings.req.RowsLimit = lr.limit }
func (lr limitRows) set(settings *readSettings) {
settings.req.RowsLimit = lr.limit - settings.numRowsRead
bhshkh marked this conversation as resolved.
Show resolved Hide resolved
}

// WithFullReadStats returns a ReadOption that will request FullReadStats
// and invoke the given callback on the resulting FullReadStats.
Expand Down
6 changes: 6 additions & 0 deletions bigtable/internal/testproxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,13 +572,19 @@ func (s *goTestProxyServer) ReadRows(ctx context.Context, req *pb.ReadRowsReques

var c int32
var rowsPb []*btpb.Row

lim := req.GetCancelAfterRows()

reversed := req.GetRequest().GetReversed()
opts := []bigtable.ReadOption{}
if reversed {
opts = append(opts, bigtable.ReverseScan())
}

rowsLimit := req.GetRequest().GetRowsLimit()
if rowsLimit > 0 {
opts = append(opts, bigtable.LimitRows(rowsLimit))
}
err = t.ReadRows(ctx, rs, func(r bigtable.Row) bool {

c++
Expand Down
56 changes: 56 additions & 0 deletions bigtable/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,62 @@ func TestRetryReadRows(t *testing.T) {
}
}

func TestRetryReadRowsLimit(t *testing.T) {
ctx := context.Background()

// Intercept requests and delegate to an interceptor defined by the test case
errCount := 0
var f func(grpc.ServerStream) error
errInjector := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if strings.HasSuffix(info.FullMethod, "ReadRows") {
return f(ss)
}
return handler(ctx, ss)
}

tbl, cleanup, err := setupDefaultFakeServer(grpc.StreamInterceptor(errInjector))
defer cleanup()
if err != nil {
t.Fatalf("fake server setup: %v", err)
}

initialRowLimit := int64(3)

errCount = 0
// Test overall request failure and retries
f = func(ss grpc.ServerStream) error {
var err error
req := new(btpb.ReadRowsRequest)
must(ss.RecvMsg(req))
switch errCount {
case 0:
if want, got := initialRowLimit, req.RowsLimit; want != got {
t.Errorf("RowsLimit: got %v, want %v", got, want)
}
must(writeReadRowsResponse(ss, "a", "b"))
err = status.Errorf(codes.Unavailable, "")
case 1:
if want, got := initialRowLimit-2, req.RowsLimit; want != got {
t.Errorf("RowsLimit: got %v, want %v", got, want)
}
must(writeReadRowsResponse(ss, "c"))
err = nil
}
errCount++
return err
}

var got []string
must(tbl.ReadRows(ctx, NewRange("a", "z"), func(r Row) bool {
got = append(got, r.Key())
return true
}, LimitRows(initialRowLimit)))
want := []string{"a", "b", "c"}
if !testutil.Equal(got, want) {
t.Errorf("retry range integration: got %v, want %v", got, want)
}
}

func TestRetryReverseReadRows(t *testing.T) {
ctx := context.Background()

Expand Down
Loading