Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 69 additions & 41 deletions sweepbatcher/sweep_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,14 +271,6 @@ type addSweepsRequest struct {
// Notifier is a notifier that is used to notify the requester of this
// sweep that the sweep was successful.
notifier *SpendNotifier

// completed is set if the sweep is spent and the spending transaction
// is reorg-safely confirmed.
completed bool

// parentBatch is the parent batch of this sweep. It is loaded ony if
// completed is true.
parentBatch *dbBatch
}

// SpendDetail is a notification that is send to the user of sweepbatcher when
Expand Down Expand Up @@ -687,10 +679,7 @@ func (b *Batcher) Run(ctx context.Context) error {
for {
select {
case req := <-b.addSweepsChan:
err = b.handleSweeps(
runCtx, req.sweeps, req.notifier, req.completed,
req.parentBatch,
)
err = b.handleSweeps(runCtx, req.sweeps, req.notifier)
if err != nil {
warnf("handleSweeps failed: %v.", err)

Expand Down Expand Up @@ -803,15 +792,12 @@ func (b *Batcher) AddSweep(ctx context.Context, sweepReq *SweepRequest) error {
return fmt.Errorf("failed to get the status of sweep %v: %w",
sweep.outpoint, err)
}
var (
parentBatch *dbBatch
fullyConfirmed bool
)
var fullyConfirmed bool
if completed {
// Verify that the parent batch is confirmed. Note that a batch
// is only considered confirmed after it has received three
// on-chain confirmations to prevent issues caused by reorgs.
parentBatch, err = b.store.GetParentBatch(ctx, sweep.outpoint)
parentBatch, err := b.store.GetParentBatch(ctx, sweep.outpoint)
if err != nil {
return fmt.Errorf("unable to get parent batch for "+
"sweep %x: %w", sweep.swapHash[:6], err)
Expand Down Expand Up @@ -847,10 +833,8 @@ func (b *Batcher) AddSweep(ctx context.Context, sweepReq *SweepRequest) error {
sweep.swapHash[:6], sweep.presigned, completed)

req := &addSweepsRequest{
sweeps: sweeps,
notifier: sweepReq.Notifier,
completed: completed,
parentBatch: parentBatch,
sweeps: sweeps,
notifier: sweepReq.Notifier,
}

select {
Expand Down Expand Up @@ -895,33 +879,17 @@ func (b *Batcher) testRunInEventLoop(ctx context.Context, handler func()) {
// handleSweeps handles a sweep request by either placing the group of sweeps in
// an existing batch, or by spinning up a new batch for it.
func (b *Batcher) handleSweeps(ctx context.Context, sweeps []*sweep,
notifier *SpendNotifier, completed bool, parentBatch *dbBatch) error {
notifier *SpendNotifier) error {

// Since the whole group is added to the same batch and belongs to
// the same transaction, we use sweeps[0] below where we need any sweep.
sweep := sweeps[0]

// If the sweep has already been completed in a confirmed batch then we
// can't attach its notifier to the batch as that is no longer running.
// Instead we directly detect and return the spend here.
if completed && parentBatch.Confirmed {
return b.monitorSpendAndNotify(
ctx, sweeps, parentBatch.ID, notifier,
)
}

sweep.notifier = notifier

// This is a check to see if a batch is completed. In that case we just
// lazily delete it.
for _, batch := range b.batches {
if batch.isComplete() {
delete(b.batches, batch.id)
}
}

// Check if the sweep is already in a batch. If that is the case, we
// provide the sweep to that batch and return.
readded := false
for _, batch := range b.batches {
if batch.sweepExists(sweep.outpoint) {
accepted, err := batch.addSweeps(ctx, sweeps)
Expand All @@ -936,12 +904,72 @@ func (b *Batcher) handleSweeps(ctx context.Context, sweeps []*sweep,
}

// The sweep was updated in the batch, our job is done.
return nil
readded = true
break
}
}

// Check whether any batch is already completed and lazily delete it. Do
// this after attempting the re-add so we do not remove the batch that
// would have accepted the sweep again; otherwise we could miss the
// re-addition and spin up a new batch for an already finished sweep.
for _, batch := range b.batches {
if batch.isComplete() {
delete(b.batches, batch.id)
}
}

// If the batch was re-added to an existing batch, our job is done.
if readded {
return nil
}

// If the sweep has already been completed in a confirmed batch then we
// can't attach its notifier to the batch as that is no longer running.
// Instead we directly detect and return the spend here. We cannot reuse
// the values gathered in AddSweep because the sweep status may change in
// the meantime. If the status flips while handleSweeps is running, the
// re-add path above will handle it. A batch is removed from b.batches
// only after the code below finds the sweep fully confirmed and switches
// to the monitorSpendAndNotify path.
completed, err := b.store.GetSweepStatus(ctx, sweep.outpoint)
if err != nil {
return fmt.Errorf("failed to get the status of sweep %v: %w",
sweep.outpoint, err)
}
debugf("Status of the sweep group of %d sweeps with primarySweep %x: "+
"presigned=%v, fully_confirmed=%v", len(sweeps),
sweep.swapHash[:6], sweep.presigned, completed)
if completed {
// Verify that the parent batch is confirmed. Note that a batch
// is only considered confirmed after it has received three
// on-chain confirmations to prevent issues caused by reorgs.
parentBatch, err := b.store.GetParentBatch(ctx, sweep.outpoint)
if err != nil {
return fmt.Errorf("unable to get parent batch for "+
"sweep %x: %w", sweep.swapHash[:6], err)
}

debugf("Status of the parent batch of the sweep group of %d "+
"sweeps with primarySweep %x: confirmed=%v",
len(sweeps), sweep.swapHash[:6], parentBatch.Confirmed)

// Note that sweeps are marked completed after the batch is
// marked confirmed because here we check the sweep status
// first and then check the batch status.
if parentBatch.Confirmed {
debugf("Sweep group of %d sweeps with primarySweep %x "+
"is fully confirmed, switching directly to "+
"monitoring", len(sweeps), sweep.swapHash[:6])

return b.monitorSpendAndNotify(
ctx, sweeps, parentBatch.ID, notifier,
)
}
}

// Try to run the greedy algorithm of batch selection to minimize costs.
err := b.greedyAddSweeps(ctx, sweeps)
err = b.greedyAddSweeps(ctx, sweeps)
if err == nil {
// The greedy algorithm succeeded.
return nil
Expand Down
Loading