Skip to content

sweepbatcher: fix reorg detection #975

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ require (
github.com/stretchr/testify v1.10.0
github.com/urfave/cli v1.22.14
go.etcd.io/bbolt v1.3.11
golang.org/x/net v0.38.0
golang.org/x/sync v0.12.0
google.golang.org/grpc v1.64.1
google.golang.org/protobuf v1.34.2
Expand Down Expand Up @@ -184,6 +183,7 @@ require (
golang.org/x/crypto v0.36.0 // indirect
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/net v0.38.0 // indirect
golang.org/x/sys v0.31.0 // indirect
golang.org/x/term v0.30.0 // indirect
golang.org/x/text v0.23.0 // indirect
Expand Down
5 changes: 5 additions & 0 deletions loopout.go
Original file line number Diff line number Diff line change
Expand Up @@ -1246,7 +1246,11 @@ func (s *loopOutSwap) waitForHtlcSpendConfirmedV2(globalCtx context.Context,
s.height = notification.(int32)
timerChan = s.timerFactory(repushDelay)

s.log.Infof("Received block %d", s.height)

case <-timerChan:
s.log.Infof("Checking the sweep")

// canSweep will return false if the preimage is
// not revealed yet but the conf target is closer than
// 20 blocks. In this case to be sure we won't attempt
Expand All @@ -1268,6 +1272,7 @@ func (s *loopOutSwap) waitForHtlcSpendConfirmedV2(globalCtx context.Context,
}

// Send the sweep to the sweeper.
s.log.Infof("(Re)adding the sweep to sweepbatcher")
err := s.batcher.AddSweep(ctx, &sweepReq)
if err != nil {
return nil, err
Expand Down
146 changes: 80 additions & 66 deletions sweepbatcher/sweep_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,10 @@ type batchConfig struct {
// initial delay completion and publishing the batch transaction.
batchPublishDelay time.Duration

// noBumping instructs sweepbatcher not to fee bump itself and rely on
// external source of fee rates (FeeRateProvider).
noBumping bool
// customFeeRate provides custom min fee rate per swap. The batch uses
// max of the fee rates of its swaps. In this mode confTarget is
// ignored and fee bumping by sweepbatcher is disabled.
customFeeRate FeeRateProvider

// txLabeler is a function generating a transaction label. It is called
// before publishing a batch transaction. Batch ID is passed to it.
Expand Down Expand Up @@ -232,6 +233,10 @@ type batch struct {
// spendChan is the channel over which spend notifications are received.
spendChan chan *chainntnfs.SpendDetail

// spendErrChan is the channel over which spend notifier errors are
// received.
spendErrChan chan error

// confChan is the channel over which confirmation notifications are
// received.
confChan chan *chainntnfs.TxConfirmation
Expand Down Expand Up @@ -378,9 +383,7 @@ func NewBatch(cfg batchConfig, bk batchKit) *batch {
id: -1,
state: Open,
sweeps: make(map[wire.OutPoint]sweep),
spendChan: make(chan *chainntnfs.SpendDetail),
confChan: make(chan *chainntnfs.TxConfirmation, 1),
reorgChan: make(chan struct{}, 1),
testReqs: make(chan *testRequest),
errChan: make(chan error, 1),
callEnter: make(chan struct{}),
Expand Down Expand Up @@ -423,9 +426,7 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) (*batch, error) {
state: bk.state,
primarySweepID: bk.primaryID,
sweeps: bk.sweeps,
spendChan: make(chan *chainntnfs.SpendDetail),
confChan: make(chan *chainntnfs.TxConfirmation, 1),
reorgChan: make(chan struct{}, 1),
testReqs: make(chan *testRequest),
errChan: make(chan error, 1),
callEnter: make(chan struct{}),
Expand Down Expand Up @@ -723,6 +724,9 @@ func (b *batch) addSweeps(ctx context.Context, sweeps []*sweep) (bool, error) {
// lower that minFeeRate of other sweeps (so it is
// applied).
if b.rbfCache.FeeRate < s.minFeeRate {
b.Infof("Increasing feerate of the batch "+
"from %v to %v", b.rbfCache.FeeRate,
s.minFeeRate)
b.rbfCache.FeeRate = s.minFeeRate
}
}
Expand Down Expand Up @@ -769,6 +773,9 @@ func (b *batch) addSweeps(ctx context.Context, sweeps []*sweep) (bool, error) {
// Update FeeRate. Max(s.minFeeRate) for all the sweeps of
// the batch is the basis for fee bumps.
if b.rbfCache.FeeRate < s.minFeeRate {
b.Infof("Increasing feerate of the batch "+
"from %v to %v", b.rbfCache.FeeRate,
s.minFeeRate)
b.rbfCache.FeeRate = s.minFeeRate
b.rbfCache.SkipNextBump = true
}
Expand Down Expand Up @@ -968,6 +975,45 @@ func (b *batch) Run(ctx context.Context) error {
continue
}

// Update feerate of sweeps. This is normally done by
// AddSweep, but it may not be called after the sweep
// is confirmed, but fresh feerate is still needed to
// keep publishing in case of reorg.
for outpoint, s := range b.sweeps {
minFeeRate, err := minimumSweepFeeRate(
ctx, b.cfg.customFeeRate, b.wallet,
s.swapHash, s.outpoint, s.confTarget,
)
if err != nil {
b.Warnf("failed to determine feerate "+
"for sweep %v of swap %x, "+
"confTarget %d: %w", s.outpoint,
s.swapHash[:6], s.confTarget,
err)
continue
}

if minFeeRate <= s.minFeeRate {
continue
}

b.Infof("Increasing feerate of sweep %v of "+
"swap %x from %v to %v", s.outpoint,
s.swapHash[:6], s.minFeeRate,
minFeeRate)
s.minFeeRate = minFeeRate
b.sweeps[outpoint] = s

if s.minFeeRate <= b.rbfCache.FeeRate {
continue
}

b.Infof("Increasing feerate of the batch "+
"from %v to %v", b.rbfCache.FeeRate,
s.minFeeRate)
b.rbfCache.FeeRate = s.minFeeRate
}

err := b.publish(ctx)
if err != nil {
return fmt.Errorf("publish error: %w", err)
Expand All @@ -979,23 +1025,26 @@ func (b *batch) Run(ctx context.Context) error {
return fmt.Errorf("handleSpend error: %w", err)
}

case err := <-b.spendErrChan:
b.writeToSpendErrChan(ctx, err)

return fmt.Errorf("spend notifier failed: %w", err)

case conf := <-b.confChan:
if err := b.handleConf(runCtx, conf); err != nil {
return fmt.Errorf("handleConf error: %w", err)
}

return nil

// A re-org has been detected. We set the batch state back to
// open since our batch transaction is no longer present in any
// block. We can accept more sweeps and try to publish.
case <-b.reorgChan:
b.state = Open
b.Warnf("reorg detected, batch is able to " +
"accept new sweeps")

err := b.monitorSpend(ctx, b.sweeps[b.primarySweepID])
if err != nil {
return fmt.Errorf("monitorSpend error: %w", err)
}

case testReq := <-b.testReqs:
testReq.handler()
close(testReq.quit)
Expand Down Expand Up @@ -1790,7 +1839,7 @@ func (b *batch) updateRbfRate(ctx context.Context) error {

// Set the initial value for our fee rate.
b.rbfCache.FeeRate = rate
} else if !b.cfg.noBumping {
} else if noBumping := b.cfg.customFeeRate != nil; !noBumping {
if b.rbfCache.SkipNextBump {
// Skip fee bumping, unset the flag, to bump next time.
b.rbfCache.SkipNextBump = false
Expand All @@ -1812,44 +1861,31 @@ func (b *batch) updateRbfRate(ctx context.Context) error {
// of the batch transaction gets confirmed, due to the uncertainty of RBF
// replacements and network propagation, we can always detect the transaction.
func (b *batch) monitorSpend(ctx context.Context, primarySweep sweep) error {
spendCtx, cancel := context.WithCancel(ctx)
if b.spendChan != nil || b.spendErrChan != nil || b.reorgChan != nil {
return fmt.Errorf("an attempt to run monitorSpend multiple " +
"times per batch")
}

spendChan, spendErr, err := b.chainNotifier.RegisterSpendNtfn(
spendCtx, &primarySweep.outpoint, primarySweep.htlc.PkScript,
reorgChan := make(chan struct{}, 1)

spendChan, spendErrChan, err := b.chainNotifier.RegisterSpendNtfn(
ctx, &primarySweep.outpoint, primarySweep.htlc.PkScript,
primarySweep.initiationHeight,
lndclient.WithReOrgChan(reorgChan),
)
if err != nil {
cancel()

return err
return fmt.Errorf("failed to register spend notifier for "+
"primary sweep %v, pkscript %x, height %d: %w",
primarySweep.outpoint, primarySweep.htlc.PkScript,
primarySweep.initiationHeight, err)
}

b.wg.Add(1)
go func() {
defer cancel()
defer b.wg.Done()

b.Infof("monitoring spend for outpoint %s",
primarySweep.outpoint.String())

select {
case spend := <-spendChan:
select {
case b.spendChan <- spend:

case <-ctx.Done():
}

case err := <-spendErr:
b.writeToSpendErrChan(ctx, err)

b.writeToErrChan(
fmt.Errorf("spend error: %w", err),
)
b.Infof("monitoring spend for outpoint %s",
primarySweep.outpoint.String())

case <-ctx.Done():
}
}()
b.spendChan = spendChan
b.spendErrChan = spendErrChan
b.reorgChan = reorgChan

return nil
}
Expand All @@ -1862,14 +1898,11 @@ func (b *batch) monitorConfirmations(ctx context.Context) error {
return fmt.Errorf("can't find primarySweep")
}

reorgChan := make(chan struct{})

confCtx, cancel := context.WithCancel(ctx)

confChan, errChan, err := b.chainNotifier.RegisterConfirmationsNtfn(
confCtx, b.batchTxid, b.batchPkScript, batchConfHeight,
primarySweep.initiationHeight,
lndclient.WithReOrgChan(reorgChan),
)
if err != nil {
cancel()
Expand All @@ -1895,18 +1928,6 @@ func (b *batch) monitorConfirmations(ctx context.Context) error {
b.writeToErrChan(fmt.Errorf("confirmations "+
"monitoring error: %w", err))

case <-reorgChan:
// A re-org has been detected. We set the batch
// state back to open since our batch
// transaction is no longer present in any
// block. We can accept more sweeps and try to
// publish new transactions, at this point we
// need to monitor again for a new spend.
select {
case b.reorgChan <- struct{}{}:
case <-ctx.Done():
}

case <-ctx.Done():
}
}()
Expand Down Expand Up @@ -2395,12 +2416,6 @@ func (b *batch) writeToErrChan(err error) {

// writeToSpendErrChan sends an error to spend error channels of all the sweeps.
func (b *batch) writeToSpendErrChan(ctx context.Context, spendErr error) {
done, err := b.scheduleNextCall()
if err != nil {
done()

return
}
notifiers := make([]*SpendNotifier, 0, len(b.sweeps))
for _, s := range b.sweeps {
// If the sweep's notifier is empty then this means that a swap
Expand All @@ -2412,7 +2427,6 @@ func (b *batch) writeToSpendErrChan(ctx context.Context, spendErr error) {

notifiers = append(notifiers, s.notifier)
}
done()

for _, notifier := range notifiers {
select {
Expand Down
72 changes: 49 additions & 23 deletions sweepbatcher/sweep_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1522,28 +1522,12 @@ func (b *Batcher) loadSweep(ctx context.Context, swapHash lntypes.Hash,

// Find minimum fee rate for the sweep. Use customFeeRate if it is
// provided, otherwise use wallet's EstimateFeeRate.
var minFeeRate chainfee.SatPerKWeight
if b.customFeeRate != nil {
minFeeRate, err = b.customFeeRate(ctx, swapHash, outpoint)
if err != nil {
return nil, fmt.Errorf("failed to fetch min fee rate "+
"for %x: %w", swapHash[:6], err)
}
if minFeeRate < chainfee.AbsoluteFeePerKwFloor {
return nil, fmt.Errorf("min fee rate too low (%v) for "+
"%x", minFeeRate, swapHash[:6])
}
} else {
if s.ConfTarget == 0 {
warnf("Fee estimation was requested for zero "+
"confTarget for sweep %x.", swapHash[:6])
}
minFeeRate, err = b.wallet.EstimateFeeRate(ctx, s.ConfTarget)
if err != nil {
return nil, fmt.Errorf("failed to estimate fee rate "+
"for %x, confTarget=%d: %w", swapHash[:6],
s.ConfTarget, err)
}
minFeeRate, err := minimumSweepFeeRate(
ctx, b.customFeeRate, b.wallet,
swapHash, outpoint, s.ConfTarget,
)
if err != nil {
return nil, err
}

return &sweep{
Expand All @@ -1567,11 +1551,53 @@ func (b *Batcher) loadSweep(ctx context.Context, swapHash lntypes.Hash,
}, nil
}

// feeRateEstimator determines feerate by confTarget.
type feeRateEstimator interface {
// EstimateFeeRate returns feerate corresponding to the confTarget.
EstimateFeeRate(ctx context.Context,
confTarget int32) (chainfee.SatPerKWeight, error)
}

// minimumSweepFeeRate determines minimum feerate for a sweep.
func minimumSweepFeeRate(ctx context.Context, customFeeRate FeeRateProvider,
wallet feeRateEstimator, swapHash lntypes.Hash, outpoint wire.OutPoint,
sweepConfTarget int32) (chainfee.SatPerKWeight, error) {

// Find minimum fee rate for the sweep. Use customFeeRate if it is
// provided, otherwise use wallet's EstimateFeeRate.
if customFeeRate != nil {
minFeeRate, err := customFeeRate(ctx, swapHash, outpoint)
if err != nil {
return 0, fmt.Errorf("failed to fetch min fee rate "+
"for %x: %w", swapHash[:6], err)
}
if minFeeRate < chainfee.AbsoluteFeePerKwFloor {
return 0, fmt.Errorf("min fee rate too low (%v) for "+
"%x", minFeeRate, swapHash[:6])
}

return minFeeRate, nil
}

if sweepConfTarget == 0 {
warnf("Fee estimation was requested for zero "+
"confTarget for sweep %x.", swapHash[:6])
}
minFeeRate, err := wallet.EstimateFeeRate(ctx, sweepConfTarget)
if err != nil {
return 0, fmt.Errorf("failed to estimate fee rate "+
"for %x, confTarget=%d: %w", swapHash[:6],
sweepConfTarget, err)
}

return minFeeRate, nil
}

// newBatchConfig creates new batch config.
func (b *Batcher) newBatchConfig(maxTimeoutDistance int32) batchConfig {
return batchConfig{
maxTimeoutDistance: maxTimeoutDistance,
noBumping: b.customFeeRate != nil,
customFeeRate: b.customFeeRate,
txLabeler: b.txLabeler,
customMuSig2Signer: b.customMuSig2Signer,
presignedHelper: b.presignedHelper,
Expand Down
Loading