Skip to content

Commit

Permalink
fix(bigtable): Mutate groups even after first error (#11434)
Browse files Browse the repository at this point in the history
* fix(bigtable): Mutate groups even after first error

* simplify range use

* fix unit tests

* refactor code

* resolve vet failures

* fix tests
  • Loading branch information
bhshkh authored Jan 15, 2025
1 parent 0a81f8f commit 6ffe32b
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 4 deletions.
42 changes: 38 additions & 4 deletions bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,8 @@ func mutationsAreRetryable(muts []*btpb.Mutation) bool {
return true
}

const maxMutations = 100000
// Overriden in tests
var maxMutations = 100000

// Apply mutates a row atomically. A mutation must contain at least one
// operation and at most 100000 operations.
Expand Down Expand Up @@ -1224,9 +1225,14 @@ func (m *Mutation) mergeToCell(family, column string, ts Timestamp, value *btpb.
type entryErr struct {
Entry *btpb.MutateRowsRequest_Entry
Err error

// TopLevelErr is the error received either from
// 1. client.MutateRows
// 2. stream.Recv
TopLevelErr error
}

// ApplyBulk applies multiple Mutations, up to a maximum of 100,000.
// ApplyBulk applies multiple Mutations.
// Each mutation is individually applied atomically,
// but the set of mutations may be applied in any order.
//
Expand Down Expand Up @@ -1254,17 +1260,31 @@ func (t *Table) ApplyBulk(ctx context.Context, rowKeys []string, muts []*Mutatio
origEntries[i] = &entryErr{Entry: &btpb.MutateRowsRequest_Entry{RowKey: []byte(key), Mutations: mut.ops}}
}

for _, group := range groupEntries(origEntries, maxMutations) {
var firstGroupErr error
numFailed := 0
groups := groupEntries(origEntries, maxMutations)
for _, group := range groups {
err := t.applyGroup(ctx, group, opts...)
if err != nil {
return nil, err
if firstGroupErr == nil {
firstGroupErr = err
}
numFailed++
}
}

if numFailed == len(groups) {
return nil, firstGroupErr
}

// All the errors are accumulated into an array and returned, interspersed with nils for successful
// entries. The absence of any errors means we should return nil.
var foundErr bool
for _, entry := range origEntries {
if entry.Err == nil && entry.TopLevelErr != nil {
// Populate per mutation error if top level error is not nil
entry.Err = entry.TopLevelErr
}
if entry.Err != nil {
foundErr = true
}
Expand All @@ -1289,6 +1309,7 @@ func (t *Table) applyGroup(ctx context.Context, group []*entryErr, opts ...Apply
// We want to retry the entire request with the current group
return err
}
// Get the entries that need to be retried
group = t.getApplyBulkRetries(group)
if len(group) > 0 && len(idempotentRetryCodes) > 0 {
// We have at least one mutation that needs to be retried.
Expand Down Expand Up @@ -1324,6 +1345,11 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD
}
}

var topLevelErr error
defer func() {
populateTopLevelError(entryErrs, topLevelErr)
}()

entries := make([]*btpb.MutateRowsRequest_Entry, len(entryErrs))
for i, entryErr := range entryErrs {
entries[i] = entryErr.Entry
Expand All @@ -1340,6 +1366,7 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD

stream, err := t.c.client.MutateRows(ctx, req)
if err != nil {
_, topLevelErr = convertToGrpcStatusErr(err)
return err
}

Expand All @@ -1354,6 +1381,7 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD
}
if err != nil {
*trailerMD = stream.Trailer()
_, topLevelErr = convertToGrpcStatusErr(err)
return err
}

Expand All @@ -1370,6 +1398,12 @@ func (t *Table) doApplyBulk(ctx context.Context, entryErrs []*entryErr, headerMD
return nil
}

func populateTopLevelError(entries []*entryErr, topLevelErr error) {
for _, entry := range entries {
entry.TopLevelErr = topLevelErr
}
}

// groupEntries groups entries into groups of a specified size without breaking up
// individual entries.
func groupEntries(entries []*entryErr, maxSize int) [][]*entryErr {
Expand Down
138 changes: 138 additions & 0 deletions bigtable/bigtable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"github.com/google/go-cmp/cmp"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

var disableMetricsConfig = ClientConfig{MetricsProvider: NoopMetricsProvider{}}
Expand Down Expand Up @@ -875,3 +877,139 @@ func TestMutateRowsWithAggregates_MergeToCell(t *testing.T) {
t.Error()
}
}

type rowKeyCheckingInterceptor struct {
grpc.ClientStream
failRow string
failErr error // error to use while sending failed reponse for fail row
requestCounter *int
}

func (i *rowKeyCheckingInterceptor) SendMsg(m interface{}) error {
*i.requestCounter = *i.requestCounter + 1
if req, ok := m.(*btpb.MutateRowsRequest); ok {
for _, entry := range req.Entries {
if string(entry.RowKey) == i.failRow {
return i.failErr
}
}
}
return i.ClientStream.SendMsg(m)
}

func (i *rowKeyCheckingInterceptor) RecvMsg(m interface{}) error {
return i.ClientStream.RecvMsg(m)
}

// Mutations are broken down into groups of 'maxMutations' and then MutateRowsRequest is sent to Cloud Bigtable Service
// This test validates that even if one of the group receives error, requests are sent for further groups
func TestApplyBulk_MutationsSucceedAfterGroupError(t *testing.T) {
testEnv, gotErr := NewEmulatedEnv(IntegrationTestConfig{})
if gotErr != nil {
t.Fatalf("NewEmulatedEnv failed: %v", gotErr)
}

// Add interceptor to fail rows
failedRow := "row2"
failErr := status.Error(codes.InvalidArgument, "Invalid row key")
reqCount := 0
conn, gotErr := grpc.Dial(testEnv.server.Addr, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(100<<20), grpc.MaxCallRecvMsgSize(100<<20)),
grpc.WithStreamInterceptor(func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
clientStream, err := streamer(ctx, desc, cc, method, opts...)
return &rowKeyCheckingInterceptor{
ClientStream: clientStream,
failRow: failedRow,
requestCounter: &reqCount,
failErr: failErr,
}, err
}),
)
if gotErr != nil {
t.Fatalf("grpc.Dial failed: %v", gotErr)
}

// Create client and table
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
adminClient, gotErr := NewAdminClient(ctx, testEnv.config.Project, testEnv.config.Instance, option.WithGRPCConn(conn))
if gotErr != nil {
t.Fatalf("NewClient failed: %v", gotErr)
}
defer adminClient.Close()
tableConf := &TableConf{
TableID: testEnv.config.Table,
ColumnFamilies: map[string]Family{
"f": {
ValueType: AggregateType{
Input: Int64Type{},
Aggregator: SumAggregator{},
},
},
},
}
if err := adminClient.CreateTableFromConf(ctx, tableConf); err != nil {
t.Fatalf("CreateTable(%v) failed: %v", testEnv.config.Table, err)
}
client, gotErr := NewClientWithConfig(ctx, testEnv.config.Project, testEnv.config.Instance, disableMetricsConfig, option.WithGRPCConn(conn))
if gotErr != nil {
t.Fatalf("NewClientWithConfig failed: %v", gotErr)
}
defer client.Close()
table := client.Open(testEnv.config.Table)

// Override maxMutations to break mutations into smaller groups
origMaxMutations := maxMutations
t.Cleanup(func() {
maxMutations = origMaxMutations
})
maxMutations = 2

// Create mutations
m1 := NewMutation()
m1.AddIntToCell("f", "q", 0, 1000)
m2 := NewMutation()
m2.AddIntToCell("f", "q", 0, 2000)

// Perform ApplyBulk operation and compare errors
rowKeys := []string{"row1", "row1", failedRow, failedRow, "row3", "row3"}
var wantErr error
wantErrs := []error{nil, nil, failErr, failErr, nil, nil}
gotErrs, gotErr := table.ApplyBulk(ctx, rowKeys, []*Mutation{m1, m2, m1, m2, m1, m2})

// Assert overall error
if !equalErrs(gotErr, wantErr) {
t.Fatalf("ApplyBulk err got: %v, want: %v", gotErr, wantErr)
}

// Assert individual muation errors
if len(gotErrs) != len(wantErrs) {
t.Fatalf("ApplyBulk errs got: %v, want: %v", gotErrs, wantErrs)
}
for i := range gotErrs {
if !equalErrs(gotErrs[i], wantErrs[i]) {
t.Errorf("#%d ApplyBulk err got: %v, want: %v", i, gotErrs[i], wantErrs[i])
}
}

// Assert number of requests sent
wantReqCount := len(rowKeys) / maxMutations
if reqCount != wantReqCount {
t.Errorf("Number of requests got: %v, want: %v", reqCount, wantReqCount)
}

// Assert individual mutation apply success/failure by reading rows
gotErr = table.ReadRows(ctx, RowList{"row1", failedRow, "row3"}, func(row Row) bool {
rowMutated := bytes.Equal(row["f"][0].Value, binary.BigEndian.AppendUint64([]byte{}, 3000))
if rowMutated && row.Key() == failedRow {
t.Error("Expected mutation to fail for row " + row.Key())
}
if !rowMutated && row.Key() != failedRow {
t.Error("Expected mutation to succeed for row " + row.Key())
}
return true
})
if gotErr != nil {
t.Fatalf("ReadRows failed: %v", gotErr)
}
}

0 comments on commit 6ffe32b

Please sign in to comment.