Skip to content

Commit d90b4c2

Browse files
committed
sweepbatcher: harden AddSweep race handling
Re-ordered the cleanup in handleSweeps so completed batches are deleted only after any re-add attempt, preventing a finished batch from being dropped before it can absorb the re-add. Added a fresh status/parent-batch lookup inside handleSweeps so the logic now reacts to swaps that complete between AddSweep and the event-loop handler; fully confirmed sweeps are routed straight to monitorSpendAndNotify instead of spawning a duplicate batch. Added regression test testSweepBatcherHandleSweepRace, which replays the AddSweep/confirmation race and verifies that only the original batch remains and the sweep keeps its parent assignment.
1 parent 9f8f4e6 commit d90b4c2

File tree

2 files changed

+302
-41
lines changed

2 files changed

+302
-41
lines changed

sweepbatcher/sweep_batcher.go

Lines changed: 69 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -271,14 +271,6 @@ type addSweepsRequest struct {
271271
// Notifier is a notifier that is used to notify the requester of this
272272
// sweep that the sweep was successful.
273273
notifier *SpendNotifier
274-
275-
// completed is set if the sweep is spent and the spending transaction
276-
// is reorg-safely confirmed.
277-
completed bool
278-
279-
// parentBatch is the parent batch of this sweep. It is loaded ony if
280-
// completed is true.
281-
parentBatch *dbBatch
282274
}
283275

284276
// SpendDetail is a notification that is send to the user of sweepbatcher when
@@ -687,10 +679,7 @@ func (b *Batcher) Run(ctx context.Context) error {
687679
for {
688680
select {
689681
case req := <-b.addSweepsChan:
690-
err = b.handleSweeps(
691-
runCtx, req.sweeps, req.notifier, req.completed,
692-
req.parentBatch,
693-
)
682+
err = b.handleSweeps(runCtx, req.sweeps, req.notifier)
694683
if err != nil {
695684
warnf("handleSweeps failed: %v.", err)
696685

@@ -803,15 +792,12 @@ func (b *Batcher) AddSweep(ctx context.Context, sweepReq *SweepRequest) error {
803792
return fmt.Errorf("failed to get the status of sweep %v: %w",
804793
sweep.outpoint, err)
805794
}
806-
var (
807-
parentBatch *dbBatch
808-
fullyConfirmed bool
809-
)
795+
var fullyConfirmed bool
810796
if completed {
811797
// Verify that the parent batch is confirmed. Note that a batch
812798
// is only considered confirmed after it has received three
813799
// on-chain confirmations to prevent issues caused by reorgs.
814-
parentBatch, err = b.store.GetParentBatch(ctx, sweep.outpoint)
800+
parentBatch, err := b.store.GetParentBatch(ctx, sweep.outpoint)
815801
if err != nil {
816802
return fmt.Errorf("unable to get parent batch for "+
817803
"sweep %x: %w", sweep.swapHash[:6], err)
@@ -847,10 +833,8 @@ func (b *Batcher) AddSweep(ctx context.Context, sweepReq *SweepRequest) error {
847833
sweep.swapHash[:6], sweep.presigned, completed)
848834

849835
req := &addSweepsRequest{
850-
sweeps: sweeps,
851-
notifier: sweepReq.Notifier,
852-
completed: completed,
853-
parentBatch: parentBatch,
836+
sweeps: sweeps,
837+
notifier: sweepReq.Notifier,
854838
}
855839

856840
select {
@@ -895,33 +879,17 @@ func (b *Batcher) testRunInEventLoop(ctx context.Context, handler func()) {
895879
// handleSweeps handles a sweep request by either placing the group of sweeps in
896880
// an existing batch, or by spinning up a new batch for it.
897881
func (b *Batcher) handleSweeps(ctx context.Context, sweeps []*sweep,
898-
notifier *SpendNotifier, completed bool, parentBatch *dbBatch) error {
882+
notifier *SpendNotifier) error {
899883

900884
// Since the whole group is added to the same batch and belongs to
901885
// the same transaction, we use sweeps[0] below where we need any sweep.
902886
sweep := sweeps[0]
903887

904-
// If the sweep has already been completed in a confirmed batch then we
905-
// can't attach its notifier to the batch as that is no longer running.
906-
// Instead we directly detect and return the spend here.
907-
if completed && parentBatch.Confirmed {
908-
return b.monitorSpendAndNotify(
909-
ctx, sweeps, parentBatch.ID, notifier,
910-
)
911-
}
912-
913888
sweep.notifier = notifier
914889

915-
// This is a check to see if a batch is completed. In that case we just
916-
// lazily delete it.
917-
for _, batch := range b.batches {
918-
if batch.isComplete() {
919-
delete(b.batches, batch.id)
920-
}
921-
}
922-
923890
// Check if the sweep is already in a batch. If that is the case, we
924891
// provide the sweep to that batch and return.
892+
readded := false
925893
for _, batch := range b.batches {
926894
if batch.sweepExists(sweep.outpoint) {
927895
accepted, err := batch.addSweeps(ctx, sweeps)
@@ -936,12 +904,72 @@ func (b *Batcher) handleSweeps(ctx context.Context, sweeps []*sweep,
936904
}
937905

938906
// The sweep was updated in the batch, our job is done.
939-
return nil
907+
readded = true
908+
break
909+
}
910+
}
911+
912+
// Check whether any batch is already completed and lazily delete it. Do
913+
// this after attempting the re-add so we do not remove the batch that
914+
// would have accepted the sweep again; otherwise we could miss the
915+
// re-addition and spin up a new batch for an already finished sweep.
916+
for _, batch := range b.batches {
917+
if batch.isComplete() {
918+
delete(b.batches, batch.id)
919+
}
920+
}
921+
922+
// If the batch was re-added to an existing batch, our job is done.
923+
if readded {
924+
return nil
925+
}
926+
927+
// If the sweep has already been completed in a confirmed batch then we
928+
// can't attach its notifier to the batch as that is no longer running.
929+
// Instead we directly detect and return the spend here. We cannot reuse
930+
// the values gathered in AddSweep because the sweep status may change in
931+
// the meantime. If the status flips while handleSweeps is running, the
932+
// re-add path above will handle it. A batch is removed from b.batches
933+
// only after the code below finds the sweep fully confirmed and switches
934+
// to the monitorSpendAndNotify path.
935+
completed, err := b.store.GetSweepStatus(ctx, sweep.outpoint)
936+
if err != nil {
937+
return fmt.Errorf("failed to get the status of sweep %v: %w",
938+
sweep.outpoint, err)
939+
}
940+
debugf("Status of the sweep group of %d sweeps with primarySweep %x: "+
941+
"presigned=%v, fully_confirmed=%v", len(sweeps),
942+
sweep.swapHash[:6], sweep.presigned, completed)
943+
if completed {
944+
// Verify that the parent batch is confirmed. Note that a batch
945+
// is only considered confirmed after it has received three
946+
// on-chain confirmations to prevent issues caused by reorgs.
947+
parentBatch, err := b.store.GetParentBatch(ctx, sweep.outpoint)
948+
if err != nil {
949+
return fmt.Errorf("unable to get parent batch for "+
950+
"sweep %x: %w", sweep.swapHash[:6], err)
951+
}
952+
953+
debugf("Status of the parent batch of the sweep group of %d "+
954+
"sweeps with primarySweep %x: confirmed=%v",
955+
len(sweeps), sweep.swapHash[:6], parentBatch.Confirmed)
956+
957+
// Note that sweeps are marked completed after the batch is
958+
// marked confirmed because here we check the sweep status
959+
// first and then check the batch status.
960+
if parentBatch.Confirmed {
961+
debugf("Sweep group of %d sweeps with primarySweep %x "+
962+
"is fully confirmed, switching directly to "+
963+
"monitoring", len(sweeps), sweep.swapHash[:6])
964+
965+
return b.monitorSpendAndNotify(
966+
ctx, sweeps, parentBatch.ID, notifier,
967+
)
940968
}
941969
}
942970

943971
// Try to run the greedy algorithm of batch selection to minimize costs.
944-
err := b.greedyAddSweeps(ctx, sweeps)
972+
err = b.greedyAddSweeps(ctx, sweeps)
945973
if err == nil {
946974
// The greedy algorithm succeeded.
947975
return nil

0 commit comments

Comments
 (0)