From 6ffe32b76e7228d99e12eeba60a5e719f2d3e5e3 Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Wed, 15 Jan 2025 14:05:03 -0800 Subject: [PATCH] fix(bigtable): Mutate groups even after first error (#11434) * fix(bigtable): Mutate groups even after first error * simplify range use * fix unit tests * refactor code * resolve vet failures * fix tests --- bigtable/bigtable.go | 42 ++++++++++-- bigtable/bigtable_test.go | 138 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 176 insertions(+), 4 deletions(-) diff --git a/bigtable/bigtable.go b/bigtable/bigtable.go index 3b1799c2d137..569c7f329870 100644 --- a/bigtable/bigtable.go +++ b/bigtable/bigtable.go @@ -1013,7 +1013,8 @@ func mutationsAreRetryable(muts []*btpb.Mutation) bool { return true } -const maxMutations = 100000 +// Overriden in tests +var maxMutations = 100000 // Apply mutates a row atomically. A mutation must contain at least one // operation and at most 100000 operations. @@ -1224,9 +1225,14 @@ func (m *Mutation) mergeToCell(family, column string, ts Timestamp, value *btpb. type entryErr struct { Entry *btpb.MutateRowsRequest_Entry Err error + + // TopLevelErr is the error received either from + // 1. client.MutateRows + // 2. stream.Recv + TopLevelErr error } -// ApplyBulk applies multiple Mutations, up to a maximum of 100,000. +// ApplyBulk applies multiple Mutations. // Each mutation is individually applied atomically, // but the set of mutations may be applied in any order. // @@ -1254,17 +1260,31 @@ func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutatio origEntries[i] = &entryErr{Entry: &btpb.MutateRowsRequest_Entry{RowKey: []byte(key), Mutations: mut.ops}} } - for _, group := range groupEntries(origEntries, maxMutations) { + var firstGroupErr error + numFailed := 0 + groups := groupEntries(origEntries, maxMutations) + for _, group := range groups { err := t.applyGroup(ctx, group, opts...) if err != nil { - return nil, err + if firstGroupErr == nil { + firstGroupErr = err + } + numFailed++ } } + if numFailed == len(groups) { + return nil, firstGroupErr + } + // All the errors are accumulated into an array and returned, interspersed with nils for successful // entries. The absence of any errors means we should return nil. var foundErr bool for _, entry := range origEntries { + if entry.Err == nil && entry.TopLevelErr != nil { + // Populate per mutation error if top level error is not nil + entry.Err = entry.TopLevelErr + } if entry.Err != nil { foundErr = true } @@ -1289,6 +1309,7 @@ func (t *Table) applyGroup(ctx context.Context, group []*entryErr, opts ...Apply // We want to retry the entire request with the current group return err } + // Get the entries that need to be retried group = t.getApplyBulkRetries(group) if len(group) > 0 && len(idempotentRetryCodes) > 0 { // We have at least one mutation that needs to be retried. @@ -1324,6 +1345,11 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD } } + var topLevelErr error + defer func() { + populateTopLevelError(entryErrs, topLevelErr) + }() + entries := make([]*btpb.MutateRowsRequest_Entry, len(entryErrs)) for i, entryErr := range entryErrs { entries[i] = entryErr.Entry @@ -1340,6 +1366,7 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD stream, err := t.c.client.MutateRows(ctx, req) if err != nil { + _, topLevelErr = convertToGrpcStatusErr(err) return err } @@ -1354,6 +1381,7 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD } if err != nil { *trailerMD = stream.Trailer() + _, topLevelErr = convertToGrpcStatusErr(err) return err } @@ -1370,6 +1398,12 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD return nil } +func populateTopLevelError(entries []*entryErr, topLevelErr error) { + for _, entry := range entries { + entry.TopLevelErr = topLevelErr + } +} + // groupEntries groups entries into groups of a specified size without breaking up // individual entries. func groupEntries(entries []*entryErr, maxSize int) [][]*entryErr { diff --git a/bigtable/bigtable_test.go b/bigtable/bigtable_test.go index 6664670ea6b0..472def5c4b38 100644 --- a/bigtable/bigtable_test.go +++ b/bigtable/bigtable_test.go @@ -28,6 +28,8 @@ import ( "github.com/google/go-cmp/cmp" "google.golang.org/api/option" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) var disableMetricsConfig = ClientConfig{MetricsProvider: NoopMetricsProvider{}} @@ -875,3 +877,139 @@ func TestMutateRowsWithAggregates_MergeToCell(t *testing.T) { t.Error() } } + +type rowKeyCheckingInterceptor struct { + grpc.ClientStream + failRow string + failErr error // error to use while sending failed reponse for fail row + requestCounter *int +} + +func (i *rowKeyCheckingInterceptor) SendMsg(m interface{}) error { + *i.requestCounter = *i.requestCounter + 1 + if req, ok := m.(*btpb.MutateRowsRequest); ok { + for _, entry := range req.Entries { + if string(entry.RowKey) == i.failRow { + return i.failErr + } + } + } + return i.ClientStream.SendMsg(m) +} + +func (i *rowKeyCheckingInterceptor) RecvMsg(m interface{}) error { + return i.ClientStream.RecvMsg(m) +} + +// Mutations are broken down into groups of 'maxMutations' and then MutateRowsRequest is sent to Cloud Bigtable Service +// This test validates that even if one of the group receives error, requests are sent for further groups +func TestApplyBulk_MutationsSucceedAfterGroupError(t *testing.T) { + testEnv, gotErr := NewEmulatedEnv(IntegrationTestConfig{}) + if gotErr != nil { + t.Fatalf("NewEmulatedEnv failed: %v", gotErr) + } + + // Add interceptor to fail rows + failedRow := "row2" + failErr := status.Error(codes.InvalidArgument, "Invalid row key") + reqCount := 0 + conn, gotErr := grpc.Dial(testEnv.server.Addr, grpc.WithInsecure(), grpc.WithBlock(), + grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(100<<20), grpc.MaxCallRecvMsgSize(100<<20)), + grpc.WithStreamInterceptor(func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + clientStream, err := streamer(ctx, desc, cc, method, opts...) + return &rowKeyCheckingInterceptor{ + ClientStream: clientStream, + failRow: failedRow, + requestCounter: &reqCount, + failErr: failErr, + }, err + }), + ) + if gotErr != nil { + t.Fatalf("grpc.Dial failed: %v", gotErr) + } + + // Create client and table + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + adminClient, gotErr := NewAdminClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn)) + if gotErr != nil { + t.Fatalf("NewClient failed: %v", gotErr) + } + defer adminClient.Close() + tableConf := &TableConf{ + TableID: testEnv.config.Table, + ColumnFamilies: map[string]Family{ + "f": { + ValueType: AggregateType{ + Input: Int64Type{}, + Aggregator: SumAggregator{}, + }, + }, + }, + } + if err := adminClient.CreateTableFromConf(ctx, tableConf); err != nil { + t.Fatalf("CreateTable(%v) failed: %v", testEnv.config.Table, err) + } + client, gotErr := NewClientWithConfig(ctx, testEnv.config.Project, testEnv.config.Instance, disableMetricsConfig, option.WithGRPCConn(conn)) + if gotErr != nil { + t.Fatalf("NewClientWithConfig failed: %v", gotErr) + } + defer client.Close() + table := client.Open(testEnv.config.Table) + + // Override maxMutations to break mutations into smaller groups + origMaxMutations := maxMutations + t.Cleanup(func() { + maxMutations = origMaxMutations + }) + maxMutations = 2 + + // Create mutations + m1 := NewMutation() + m1.AddIntToCell("f", "q", 0, 1000) + m2 := NewMutation() + m2.AddIntToCell("f", "q", 0, 2000) + + // Perform ApplyBulk operation and compare errors + rowKeys := []string{"row1", "row1", failedRow, failedRow, "row3", "row3"} + var wantErr error + wantErrs := []error{nil, nil, failErr, failErr, nil, nil} + gotErrs, gotErr := table.ApplyBulk(ctx, rowKeys, []*Mutation{m1, m2, m1, m2, m1, m2}) + + // Assert overall error + if !equalErrs(gotErr, wantErr) { + t.Fatalf("ApplyBulk err got: %v, want: %v", gotErr, wantErr) + } + + // Assert individual muation errors + if len(gotErrs) != len(wantErrs) { + t.Fatalf("ApplyBulk errs got: %v, want: %v", gotErrs, wantErrs) + } + for i := range gotErrs { + if !equalErrs(gotErrs[i], wantErrs[i]) { + t.Errorf("#%d ApplyBulk err got: %v, want: %v", i, gotErrs[i], wantErrs[i]) + } + } + + // Assert number of requests sent + wantReqCount := len(rowKeys) / maxMutations + if reqCount != wantReqCount { + t.Errorf("Number of requests got: %v, want: %v", reqCount, wantReqCount) + } + + // Assert individual mutation apply success/failure by reading rows + gotErr = table.ReadRows(ctx, RowList{"row1", failedRow, "row3"}, func(row Row) bool { + rowMutated := bytes.Equal(row["f"][0].Value, binary.BigEndian.AppendUint64([]byte{}, 3000)) + if rowMutated && row.Key() == failedRow { + t.Error("Expected mutation to fail for row " + row.Key()) + } + if !rowMutated && row.Key() != failedRow { + t.Error("Expected mutation to succeed for row " + row.Key()) + } + return true + }) + if gotErr != nil { + t.Fatalf("ReadRows failed: %v", gotErr) + } +}