Skip to content

Commit 4d1f585

Browse files
Support atomic writes in the docstore (#3500)
For #3501.
1 parent eaaf976 commit 4d1f585

File tree

11 files changed

+5795
-45
lines changed

11 files changed

+5795
-45
lines changed

docstore/awsdynamodb/dynamo.go

+18-11
Original file line numberDiff line numberDiff line change
@@ -162,12 +162,15 @@ func (c *collection) RevisionField() string { return c.opts.RevisionField }
162162

163163
func (c *collection) RunActions(ctx context.Context, actions []*driver.Action, opts *driver.RunActionsOptions) driver.ActionListError {
164164
errs := make([]error, len(actions))
165-
beforeGets, gets, writes, afterGets := driver.GroupActions(actions)
165+
beforeGets, gets, writes, writesTx, afterGets := driver.GroupActions(actions)
166166
c.runGets(ctx, beforeGets, errs, opts)
167167
ch := make(chan struct{})
168+
ch2 := make(chan struct{})
168169
go func() { defer close(ch); c.runWrites(ctx, writes, errs, opts) }()
170+
go func() { defer close(ch2); c.transactWrite(ctx, writesTx, errs, opts) }()
169171
c.runGets(ctx, gets, errs, opts)
170172
<-ch
173+
<-ch2
171174
c.runGets(ctx, afterGets, errs, opts)
172175
return driver.NewActionListError(errs)
173176
}
@@ -613,25 +616,29 @@ func revisionPrecondition(doc driver.Document, revField string) (*expression.Con
613616
return &cb, nil
614617
}
615618

616-
// TODO(jba): use this if/when we support atomic writes.
617-
func (c *collection) transactWrite(ctx context.Context, actions []*driver.Action, errs []error, opts *driver.RunActionsOptions, start, end int) {
619+
// transactWrite executes the write actions atomically: either they all succeed or they all fail together.
620+
func (c *collection) transactWrite(ctx context.Context, actions []*driver.Action, errs []error, opts *driver.RunActionsOptions) {
621+
if len(actions) == 0 {
622+
return
623+
}
624+
// all actions will fail atomically even if a single action fails
618625
setErr := func(err error) {
619-
for i := start; i <= end; i++ {
620-
errs[actions[i].Index] = err
626+
for _, a := range actions {
627+
errs[a.Index] = err
621628
}
622629
}
623630

631+
tws := make([]*dyn.TransactWriteItem, 0, len(actions))
624632
var ops []*writeOp
625-
tws := make([]*dyn.TransactWriteItem, 0, end-start+1)
626-
for i := start; i <= end; i++ {
627-
a := actions[i]
628-
op, err := c.newWriteOp(a, opts)
633+
for _, w := range actions {
634+
op, err := c.newWriteOp(w, opts)
629635
if err != nil {
630636
setErr(err)
631637
return
638+
} else {
639+
ops = append(ops, op)
640+
tws = append(tws, op.writeItem)
632641
}
633-
ops = append(ops, op)
634-
tws = append(tws, op.writeItem)
635642
}
636643

637644
in := &dyn.TransactWriteItemsInput{

0 commit comments

Comments
 (0)