diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index df33d67c4..777d5c2c2 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -271,14 +271,6 @@ type addSweepsRequest struct { // Notifier is a notifier that is used to notify the requester of this // sweep that the sweep was successful. notifier *SpendNotifier - - // completed is set if the sweep is spent and the spending transaction - // is reorg-safely confirmed. - completed bool - - // parentBatch is the parent batch of this sweep. It is loaded ony if - // completed is true. - parentBatch *dbBatch } // SpendDetail is a notification that is send to the user of sweepbatcher when @@ -687,10 +679,7 @@ func (b *Batcher) Run(ctx context.Context) error { for { select { case req := <-b.addSweepsChan: - err = b.handleSweeps( - runCtx, req.sweeps, req.notifier, req.completed, - req.parentBatch, - ) + err = b.handleSweeps(runCtx, req.sweeps, req.notifier) if err != nil { warnf("handleSweeps failed: %v.", err) @@ -803,15 +792,12 @@ func (b *Batcher) AddSweep(ctx context.Context, sweepReq *SweepRequest) error { return fmt.Errorf("failed to get the status of sweep %v: %w", sweep.outpoint, err) } - var ( - parentBatch *dbBatch - fullyConfirmed bool - ) + var fullyConfirmed bool if completed { // Verify that the parent batch is confirmed. Note that a batch // is only considered confirmed after it has received three // on-chain confirmations to prevent issues caused by reorgs. - parentBatch, err = b.store.GetParentBatch(ctx, sweep.outpoint) + parentBatch, err := b.store.GetParentBatch(ctx, sweep.outpoint) if err != nil { return fmt.Errorf("unable to get parent batch for "+ "sweep %x: %w", sweep.swapHash[:6], err) @@ -847,10 +833,8 @@ func (b *Batcher) AddSweep(ctx context.Context, sweepReq *SweepRequest) error { sweep.swapHash[:6], sweep.presigned, completed) req := &addSweepsRequest{ - sweeps: sweeps, - notifier: sweepReq.Notifier, - completed: completed, - parentBatch: parentBatch, + sweeps: sweeps, + notifier: sweepReq.Notifier, } select { @@ -895,33 +879,17 @@ func (b *Batcher) testRunInEventLoop(ctx context.Context, handler func()) { // handleSweeps handles a sweep request by either placing the group of sweeps in // an existing batch, or by spinning up a new batch for it. func (b *Batcher) handleSweeps(ctx context.Context, sweeps []*sweep, - notifier *SpendNotifier, completed bool, parentBatch *dbBatch) error { + notifier *SpendNotifier) error { // Since the whole group is added to the same batch and belongs to // the same transaction, we use sweeps[0] below where we need any sweep. sweep := sweeps[0] - // If the sweep has already been completed in a confirmed batch then we - // can't attach its notifier to the batch as that is no longer running. - // Instead we directly detect and return the spend here. - if completed && parentBatch.Confirmed { - return b.monitorSpendAndNotify( - ctx, sweeps, parentBatch.ID, notifier, - ) - } - sweep.notifier = notifier - // This is a check to see if a batch is completed. In that case we just - // lazily delete it. - for _, batch := range b.batches { - if batch.isComplete() { - delete(b.batches, batch.id) - } - } - // Check if the sweep is already in a batch. If that is the case, we // provide the sweep to that batch and return. + readded := false for _, batch := range b.batches { if batch.sweepExists(sweep.outpoint) { accepted, err := batch.addSweeps(ctx, sweeps) @@ -936,12 +904,72 @@ func (b *Batcher) handleSweeps(ctx context.Context, sweeps []*sweep, } // The sweep was updated in the batch, our job is done. - return nil + readded = true + break + } + } + + // Check whether any batch is already completed and lazily delete it. Do + // this after attempting the re-add so we do not remove the batch that + // would have accepted the sweep again; otherwise we could miss the + // re-addition and spin up a new batch for an already finished sweep. + for _, batch := range b.batches { + if batch.isComplete() { + delete(b.batches, batch.id) + } + } + + // If the batch was re-added to an existing batch, our job is done. + if readded { + return nil + } + + // If the sweep has already been completed in a confirmed batch then we + // can't attach its notifier to the batch as that is no longer running. + // Instead we directly detect and return the spend here. We cannot reuse + // the values gathered in AddSweep because the sweep status may change in + // the meantime. If the status flips while handleSweeps is running, the + // re-add path above will handle it. A batch is removed from b.batches + // only after the code below finds the sweep fully confirmed and switches + // to the monitorSpendAndNotify path. + completed, err := b.store.GetSweepStatus(ctx, sweep.outpoint) + if err != nil { + return fmt.Errorf("failed to get the status of sweep %v: %w", + sweep.outpoint, err) + } + debugf("Status of the sweep group of %d sweeps with primarySweep %x: "+ + "presigned=%v, fully_confirmed=%v", len(sweeps), + sweep.swapHash[:6], sweep.presigned, completed) + if completed { + // Verify that the parent batch is confirmed. Note that a batch + // is only considered confirmed after it has received three + // on-chain confirmations to prevent issues caused by reorgs. + parentBatch, err := b.store.GetParentBatch(ctx, sweep.outpoint) + if err != nil { + return fmt.Errorf("unable to get parent batch for "+ + "sweep %x: %w", sweep.swapHash[:6], err) + } + + debugf("Status of the parent batch of the sweep group of %d "+ + "sweeps with primarySweep %x: confirmed=%v", + len(sweeps), sweep.swapHash[:6], parentBatch.Confirmed) + + // Note that sweeps are marked completed after the batch is + // marked confirmed because here we check the sweep status + // first and then check the batch status. + if parentBatch.Confirmed { + debugf("Sweep group of %d sweeps with primarySweep %x "+ + "is fully confirmed, switching directly to "+ + "monitoring", len(sweeps), sweep.swapHash[:6]) + + return b.monitorSpendAndNotify( + ctx, sweeps, parentBatch.ID, notifier, + ) } } // Try to run the greedy algorithm of batch selection to minimize costs. - err := b.greedyAddSweeps(ctx, sweeps) + err = b.greedyAddSweeps(ctx, sweeps) if err == nil { // The greedy algorithm succeeded. return nil diff --git a/sweepbatcher/sweep_batcher_test.go b/sweepbatcher/sweep_batcher_test.go index f74307818..546a23e06 100644 --- a/sweepbatcher/sweep_batcher_test.go +++ b/sweepbatcher/sweep_batcher_test.go @@ -4035,6 +4035,233 @@ func testSweepBatcherCloseDuringAdding(t *testing.T, store testStore, <-registrationChan } +// testSweepBatcherHandleSweepRace reproduces a race between AddSweep and the +// event loop handling the sweep after the sweep has already confirmed. During +// the race the handler gets stale completion data, incorrectly spins up a new +// batch and rewrites the sweep's parent batch. This test verifies that no +// extra batch is created and the sweep stays associated with its original +// batch. +func testSweepBatcherHandleSweepRace(t *testing.T, store testStore, + batcherStore testBatcherStore) { + + defer test.Guard(t)() + + lnd := test.NewMockLnd() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sweepStore, err := NewSweepFetcherFromSwapStore(store, lnd.ChainParams) + require.NoError(t, err) + + batcher := NewBatcher( + lnd.WalletKit, lnd.ChainNotifier, lnd.Signer, + testMuSig2SignSweep, testVerifySchnorrSig, lnd.ChainParams, + batcherStore, sweepStore, + ) + + var wg sync.WaitGroup + var runErr error + wg.Add(1) + go func() { + defer wg.Done() + runErr = batcher.Run(ctx) + }() + + <-batcher.initDone + + const ( + sweepValue btcutil.Amount = 1_000_000 + confHeight = 605 + ) + + sweepOutpoint := wire.OutPoint{ + Hash: chainhash.Hash{0, 0, 0, 1}, + Index: 5, + } + + spendChan := make(chan *SpendDetail, 10) + spendErrChan := make(chan error, 1) + confChan := make(chan *ConfDetail, 10) + confErrChan := make(chan error, 1) + notifier := &SpendNotifier{ + SpendChan: spendChan, + SpendErrChan: spendErrChan, + ConfChan: confChan, + ConfErrChan: confErrChan, + QuitChan: make(chan bool), + } + + swapHash := lntypes.Hash{7, 7, 7} + sweepReq := SweepRequest{ + SwapHash: swapHash, + Inputs: []Input{{ + Value: sweepValue, + Outpoint: sweepOutpoint, + }}, + Notifier: notifier, + } + + swap := &loopdb.LoopOutContract{ + SwapContract: loopdb.SwapContract{ + CltvExpiry: 144, + AmountRequested: sweepValue, + ProtocolVersion: loopdb.ProtocolVersionMuSig2, + HtlcKeys: htlcKeys, + Preimage: lntypes.Preimage{7}, + }, + DestAddr: destAddr, + SwapInvoice: swapInvoice, + SweepConfTarget: confTarget, + } + + err = store.CreateLoopOut(ctx, swapHash, swap) + require.NoError(t, err) + store.AssertLoopOutStored() + + require.NoError(t, batcher.AddSweep(ctx, &sweepReq)) + + // Make sure the batch starts monitoring the primary sweep. + <-lnd.RegisterSpendChannel + + publishedTx := <-lnd.TxPublishChannel + + var originalBatchID int32 + require.Eventually(t, func() bool { + batch := tryGetOnlyBatch(ctx, batcher) + if batch == nil { + return false + } + + originalBatchID = batch.snapshot(ctx).id + return true + }, test.Timeout, eventuallyCheckFrequency) + + var addWG sync.WaitGroup + addErrChan := make(chan error, 2) + + addCtx, addCancel := context.WithCancel(ctx) + defer addCancel() + + confCtx, confCancel := context.WithCancel(ctx) + defer confCancel() + + addWG.Add(1) + go func() { + defer addWG.Done() + + // After this goroutine completes, stop the goroutine that handles + // registrations as well. Give it one second to finish the last + // AddSweep to prevent goroutine leaks. + defer time.AfterFunc(time.Second, confCancel) + + for { + select { + case <-addCtx.Done(): + return + default: + } + + err := batcher.AddSweep(ctx, &sweepReq) + if err != nil { + addErrChan <- err + + return + } + } + }() + + // Wait a bit so the AddSweep loop runs and keeps handleSweep busy. + time.Sleep(100 * time.Millisecond) + + // This goroutine handles spending and confirmation registrations. + // One spending registration has been created above, so the loop starts + // with the next step - notifying about spending. + addWG.Add(1) + go func() { + defer addWG.Done() + for { + spendingTx := publishedTx + spendingHash := spendingTx.TxHash() + spendDetail := &chainntnfs.SpendDetail{ + SpentOutPoint: &sweepOutpoint, + SpendingTx: spendingTx, + SpenderTxHash: &spendingHash, + SpenderInputIndex: 0, + } + lnd.SpendChannel <- spendDetail + + select { + case <-spendChan: + case <-time.After(test.Timeout): + addErrChan <- fmt.Errorf("expected spend " + + "notification") + + return + } + + <-lnd.RegisterConfChannel + + require.NoError(t, lnd.NotifyHeight(confHeight)) + + lnd.ConfChannel <- &chainntnfs.TxConfirmation{ + BlockHeight: confHeight, + Tx: spendingTx, + } + + select { + case <-confChan: + case <-time.After(test.Timeout): + addErrChan <- fmt.Errorf("expected " + + "confirmation notification") + + return + } + + select { + // If another spending registration is issued, it means + // handleSweep chose the monitorSpendAndNotify path, so + // any race has already occurred. Stop calling AddSweep. + case <-lnd.RegisterSpendChannel: + addCancel() + + case <-confCtx.Done(): + return + } + } + }() + + addWG.Wait() + + select { + case err := <-addErrChan: + require.NoError(t, err, "error from a goroutine") + default: + } + + require.Eventually(t, func() bool { + running, err := batcherStore.FetchUnconfirmedSweepBatches(ctx) + if err != nil { + return false + } + return len(running) == 0 + }, test.Timeout, eventuallyCheckFrequency) + + // Make sure the sweep belongs to the original batch. If another batch + // launches, the sweep is reassigns to the new batch. + sweeps, err := batcherStore.FetchBatchSweeps(ctx, originalBatchID) + require.NoError(t, err) + require.Len(t, sweeps, 1) + require.Equal(t, sweepOutpoint, sweeps[0].Outpoint) + + parentBatch, err := batcherStore.GetParentBatch(ctx, sweepOutpoint) + require.NoError(t, err) + require.Equal(t, originalBatchID, parentBatch.ID) + + cancel() + wg.Wait() + checkBatcherError(t, runErr) +} + // testCustomSignMuSig2 tests the operation with custom musig2 signer. func testCustomSignMuSig2(t *testing.T, store testStore, batcherStore testBatcherStore) { @@ -4975,6 +5202,12 @@ func TestSweepBatcherCloseDuringAdding(t *testing.T) { runTests(t, testSweepBatcherCloseDuringAdding) } +// TestSweepBatcherHandleSweepRace ensures we reproduce the data race where a +// sweep is re-added while the original batch is confirming. +func TestSweepBatcherHandleSweepRace(t *testing.T) { + runTests(t, testSweepBatcherHandleSweepRace) +} + // TestCustomSignMuSig2 tests the operation with custom musig2 signer. func TestCustomSignMuSig2(t *testing.T) { runTests(t, testCustomSignMuSig2)