diff --git a/executor.go b/executor.go index cfaa5ea75..2e18053ad 100644 --- a/executor.go +++ b/executor.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "strings" "sync" "sync/atomic" "time" @@ -14,6 +13,7 @@ import ( "github.com/lightninglabs/loop/loopdb" "github.com/lightninglabs/loop/sweep" "github.com/lightninglabs/loop/sweepbatcher" + "github.com/lightninglabs/loop/utils" "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/queue" ) @@ -67,40 +67,15 @@ func (s *executor) run(mainCtx context.Context, statusChan chan<- SwapInfo, abandonChans map[lntypes.Hash]chan struct{}) error { - var ( - err error - blockEpochChan <-chan int32 - blockErrorChan <-chan error - batcherErrChan chan error + blockEpochChan, blockErrorChan, err := utils.RegisterBlockEpochNtfnWithRetry( + mainCtx, s.lnd.ChainNotifier, ) - - for { - blockEpochChan, blockErrorChan, err = - s.lnd.ChainNotifier.RegisterBlockEpochNtfn(mainCtx) - if err == nil { - break - } - - if strings.Contains(err.Error(), - "in the process of starting") { - - log.Warnf("LND chain notifier server not ready yet, " + - "retrying with delay") - - // Give chain notifier some time to start and try to - // re-attempt block epoch subscription. - select { - case <-time.After(500 * time.Millisecond): - continue - - case <-mainCtx.Done(): - return err - } - } - + if err != nil { return err } + var batcherErrChan chan error + // Before starting, make sure we have an up-to-date block height. // Otherwise, we might reveal a preimage for a swap that is already // expired. diff --git a/instantout/actions.go b/instantout/actions.go index eb8a64152..4c9a85819 100644 --- a/instantout/actions.go +++ b/instantout/actions.go @@ -16,6 +16,7 @@ import ( "github.com/lightninglabs/loop/loopdb" "github.com/lightninglabs/loop/swap" "github.com/lightninglabs/loop/swapserverrpc" + "github.com/lightninglabs/loop/utils" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnrpc/walletrpc" "github.com/lightningnetwork/lnd/lntypes" @@ -460,11 +461,10 @@ func (f *FSM) WaitForSweeplessSweepConfirmedAction(ctx context.Context, return f.HandleError(err) } - confChan, confErrChan, err := f.cfg.ChainNotifier. - RegisterConfirmationsNtfn( - ctx, f.InstantOut.SweepTxHash, pkscript, - 1, f.InstantOut.initiationHeight, - ) + confChan, confErrChan, err := utils.RegisterConfirmationsNtfnWithRetry( + ctx, f.cfg.ChainNotifier, f.InstantOut.SweepTxHash, pkscript, + 1, f.InstantOut.initiationHeight, + ) if err != nil { return f.HandleError(err) } @@ -505,12 +505,11 @@ func (f *FSM) PublishHtlcAction(ctx context.Context, f.Debugf("published htlc tx: %v", txHash) // We'll now wait for the htlc to be confirmed. - confChan, confErrChan, err := f.cfg.ChainNotifier. - RegisterConfirmationsNtfn( - ctx, &txHash, - f.InstantOut.finalizedHtlcTx.TxOut[0].PkScript, - 1, f.InstantOut.initiationHeight, - ) + confChan, confErrChan, err := utils.RegisterConfirmationsNtfnWithRetry( + ctx, f.cfg.ChainNotifier, &txHash, + f.InstantOut.finalizedHtlcTx.TxOut[0].PkScript, + 1, f.InstantOut.initiationHeight, + ) if err != nil { return f.HandleError(err) } @@ -575,9 +574,9 @@ func (f *FSM) WaitForHtlcSweepConfirmedAction(ctx context.Context, return f.HandleError(err) } - confChan, confErrChan, err := f.cfg.ChainNotifier.RegisterConfirmationsNtfn( - ctx, f.InstantOut.SweepTxHash, sweepPkScript, - 1, f.InstantOut.initiationHeight, + confChan, confErrChan, err := utils.RegisterConfirmationsNtfnWithRetry( + ctx, f.cfg.ChainNotifier, f.InstantOut.SweepTxHash, + sweepPkScript, 1, f.InstantOut.initiationHeight, ) if err != nil { return f.HandleError(err) diff --git a/instantout/manager.go b/instantout/manager.go index 37ccb681b..3b2cedddf 100644 --- a/instantout/manager.go +++ b/instantout/manager.go @@ -11,6 +11,7 @@ import ( "github.com/lightninglabs/loop/fsm" "github.com/lightninglabs/loop/instantout/reservation" "github.com/lightninglabs/loop/swapserverrpc" + "github.com/lightninglabs/loop/utils" "github.com/lightningnetwork/lnd/lntypes" ) @@ -68,8 +69,8 @@ func (m *Manager) Run(ctx context.Context, initChan chan struct{}) error { return err } - newBlockChan, newBlockErrChan, err := m.cfg.ChainNotifier. - RegisterBlockEpochNtfn(ctx) + newBlockChan, newBlockErrChan, err := + utils.RegisterBlockEpochNtfnWithRetry(ctx, m.cfg.ChainNotifier) if err != nil { close(initChan) return err diff --git a/instantout/reservation/actions.go b/instantout/reservation/actions.go index c8fbedfd7..7af05519a 100644 --- a/instantout/reservation/actions.go +++ b/instantout/reservation/actions.go @@ -7,6 +7,7 @@ import ( "github.com/btcsuite/btcd/btcutil" "github.com/lightninglabs/loop/fsm" "github.com/lightninglabs/loop/swapserverrpc" + "github.com/lightninglabs/loop/utils" "github.com/lightningnetwork/lnd/chainntnfs" ) @@ -94,17 +95,17 @@ func (f *FSM) SubscribeToConfirmationAction(ctx context.Context, "initiation height: %v", f.reservation.ID, pkscript, f.reservation.InitiationHeight) - confChan, errConfChan, err := f.cfg.ChainNotifier.RegisterConfirmationsNtfn( - callCtx, nil, pkscript, DefaultConfTarget, - f.reservation.InitiationHeight, + confChan, errConfChan, err := utils.RegisterConfirmationsNtfnWithRetry( + callCtx, f.cfg.ChainNotifier, nil, pkscript, + DefaultConfTarget, f.reservation.InitiationHeight, ) if err != nil { f.Errorf("unable to subscribe to conf notification: %v", err) return f.HandleError(err) } - blockChan, errBlockChan, err := f.cfg.ChainNotifier.RegisterBlockEpochNtfn( - callCtx, + blockChan, errBlockChan, err := utils.RegisterBlockEpochNtfnWithRetry( + callCtx, f.cfg.ChainNotifier, ) if err != nil { f.Errorf("unable to subscribe to block notifications: %v", err) @@ -158,8 +159,9 @@ func (f *FSM) AsyncWaitForExpiredOrSweptAction(ctx context.Context, notifCtx, cancel := context.WithCancel(ctx) - blockHeightChan, errEpochChan, err := f.cfg.ChainNotifier. - RegisterBlockEpochNtfn(notifCtx) + blockHeightChan, errEpochChan, err := utils.RegisterBlockEpochNtfnWithRetry( + notifCtx, f.cfg.ChainNotifier, + ) if err != nil { cancel() return f.HandleError(err) @@ -171,8 +173,8 @@ func (f *FSM) AsyncWaitForExpiredOrSweptAction(ctx context.Context, return f.HandleError(err) } - spendChan, errSpendChan, err := f.cfg.ChainNotifier.RegisterSpendNtfn( - notifCtx, f.reservation.Outpoint, pkScript, + spendChan, errSpendChan, err := utils.RegisterSpendNtfnWithRetry( + notifCtx, f.cfg.ChainNotifier, f.reservation.Outpoint, pkScript, f.reservation.InitiationHeight, ) if err != nil { diff --git a/instantout/reservation/manager.go b/instantout/reservation/manager.go index 600febfe9..646a5aff8 100644 --- a/instantout/reservation/manager.go +++ b/instantout/reservation/manager.go @@ -11,6 +11,7 @@ import ( "github.com/btcsuite/btcd/btcutil" "github.com/lightninglabs/loop/fsm" reservationrpc "github.com/lightninglabs/loop/swapserverrpc" + "github.com/lightninglabs/loop/utils" ) // Manager manages the reservation state machines. @@ -49,8 +50,8 @@ func (m *Manager) Run(ctx context.Context, height int32, return err } - newBlockChan, newBlockErrChan, err := m.cfg.ChainNotifier. - RegisterBlockEpochNtfn(runCtx) + newBlockChan, newBlockErrChan, err := + utils.RegisterBlockEpochNtfnWithRetry(runCtx, m.cfg.ChainNotifier) if err != nil { return err } diff --git a/loopin.go b/loopin.go index 8f3b82860..04e541c4e 100644 --- a/loopin.go +++ b/loopin.go @@ -702,8 +702,9 @@ func (s *loopInSwap) waitForHtlcConf(globalCtx context.Context) ( return nil, nil, nil } - return notifier.RegisterConfirmationsNtfn( - ctx, s.htlcTxHash, htlc.PkScript, 1, s.InitiationHeight, + return utils.RegisterConfirmationsNtfnWithRetry( + ctx, notifier, s.htlcTxHash, htlc.PkScript, 1, + s.InitiationHeight, ) } @@ -867,8 +868,9 @@ func (s *loopInSwap) waitForSwapComplete(ctx context.Context, // Register the htlc spend notification. rpcCtx, cancel := context.WithCancel(ctx) defer cancel() - spendChan, spendErr, err := s.lnd.ChainNotifier.RegisterSpendNtfn( - rpcCtx, htlcOutpoint, s.htlc.PkScript, s.InitiationHeight, + spendChan, spendErr, err := utils.RegisterSpendNtfnWithRetry( + rpcCtx, s.lnd.ChainNotifier, htlcOutpoint, s.htlc.PkScript, + s.InitiationHeight, ) if err != nil { return fmt.Errorf("register spend ntfn: %v", err) diff --git a/loopout.go b/loopout.go index b3d8b643f..6ff0f0fb4 100644 --- a/loopout.go +++ b/loopout.go @@ -1012,9 +1012,10 @@ func (s *loopOutSwap) waitForConfirmedHtlc(globalCtx context.Context) ( ctx, cancel := context.WithCancel(globalCtx) defer cancel() htlcConfChan, htlcErrChan, err := - s.lnd.ChainNotifier.RegisterConfirmationsNtfn( - ctx, s.htlcTxHash, s.htlc.PkScript, - int32(s.HtlcConfirmations), s.InitiationHeight, + utils.RegisterConfirmationsNtfnWithRetry( + ctx, s.lnd.ChainNotifier, s.htlcTxHash, + s.htlc.PkScript, int32(s.HtlcConfirmations), + s.InitiationHeight, ) if err != nil { return nil, err diff --git a/staticaddr/address/manager.go b/staticaddr/address/manager.go index 1d63bd7f9..bf7dd21f8 100644 --- a/staticaddr/address/manager.go +++ b/staticaddr/address/manager.go @@ -16,6 +16,7 @@ import ( "github.com/lightninglabs/loop/staticaddr/version" "github.com/lightninglabs/loop/swap" staticaddressrpc "github.com/lightninglabs/loop/swapserverrpc" + "github.com/lightninglabs/loop/utils" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/lnwallet" @@ -69,7 +70,7 @@ func NewManager(cfg *ManagerConfig, currentHeight int32) *Manager { // Run runs the address manager. func (m *Manager) Run(ctx context.Context, initChan chan struct{}) error { newBlockChan, newBlockErrChan, err := - m.cfg.ChainNotifier.RegisterBlockEpochNtfn(ctx) + utils.RegisterBlockEpochNtfnWithRetry(ctx, m.cfg.ChainNotifier) if err != nil { return err diff --git a/staticaddr/deposit/actions.go b/staticaddr/deposit/actions.go index 3347ec03c..026cb2980 100644 --- a/staticaddr/deposit/actions.go +++ b/staticaddr/deposit/actions.go @@ -11,6 +11,7 @@ import ( "github.com/lightninglabs/lndclient" "github.com/lightninglabs/loop/fsm" "github.com/lightninglabs/loop/staticaddr/script" + "github.com/lightninglabs/loop/utils" "github.com/lightningnetwork/lnd/lntypes" ) @@ -123,8 +124,9 @@ func (f *FSM) WaitForExpirySweepAction(ctx context.Context, txID = &f.deposit.ExpirySweepTxid } - spendChan, errSpendChan, err := f.cfg.ChainNotifier.RegisterConfirmationsNtfn( //nolint:lll - ctx, txID, f.deposit.TimeOutSweepPkScript, DefaultConfTarget, + spendChan, errSpendChan, err := utils.RegisterConfirmationsNtfnWithRetry( //nolint:lll + ctx, f.cfg.ChainNotifier, txID, + f.deposit.TimeOutSweepPkScript, DefaultConfTarget, int32(f.deposit.ConfirmationHeight), ) if err != nil { diff --git a/staticaddr/deposit/manager.go b/staticaddr/deposit/manager.go index ccce718ba..a5fd95f8d 100644 --- a/staticaddr/deposit/manager.go +++ b/staticaddr/deposit/manager.go @@ -15,6 +15,7 @@ import ( "github.com/lightninglabs/loop" "github.com/lightninglabs/loop/fsm" staticaddressrpc "github.com/lightninglabs/loop/swapserverrpc" + "github.com/lightninglabs/loop/utils" "github.com/lightningnetwork/lnd/lnrpc/walletrpc" "github.com/lightningnetwork/lnd/lnwallet" ) @@ -101,7 +102,8 @@ func NewManager(cfg *ManagerConfig) *Manager { // Run runs the address manager. func (m *Manager) Run(ctx context.Context, initChan chan struct{}) error { - newBlockChan, newBlockErrChan, err := m.cfg.ChainNotifier.RegisterBlockEpochNtfn(ctx) //nolint:lll + newBlockChan, newBlockErrChan, err := + utils.RegisterBlockEpochNtfnWithRetry(ctx, m.cfg.ChainNotifier) if err != nil { log.Errorf("unable to register block epoch notifier: %v", err) @@ -328,9 +330,10 @@ func (m *Manager) getBlockHeight(ctx context.Context, } notifChan, errChan, err := - m.cfg.ChainNotifier.RegisterConfirmationsNtfn( - ctx, &utxo.OutPoint.Hash, addressParams.PkScript, - MinConfs, addressParams.InitiationHeight, + utils.RegisterConfirmationsNtfnWithRetry( + ctx, m.cfg.ChainNotifier, &utxo.OutPoint.Hash, + addressParams.PkScript, MinConfs, + addressParams.InitiationHeight, ) if err != nil { return 0, err diff --git a/staticaddr/loopin/actions.go b/staticaddr/loopin/actions.go index bf8963301..dd252e40e 100644 --- a/staticaddr/loopin/actions.go +++ b/staticaddr/loopin/actions.go @@ -20,6 +20,7 @@ import ( "github.com/lightninglabs/loop/staticaddr/version" "github.com/lightninglabs/loop/swap" "github.com/lightninglabs/loop/swapserverrpc" + "github.com/lightninglabs/loop/utils" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/invoices" @@ -481,9 +482,9 @@ func (f *FSM) MonitorInvoiceAndHtlcTxAction(ctx context.Context, registerHtlcConf := func() (chan *chainntnfs.TxConfirmation, chan error, error) { - return f.cfg.ChainNotifier.RegisterConfirmationsNtfn( - ctx, nil, htlc.PkScript, defaultConfTarget, - int32(f.loopIn.InitiationHeight), + return utils.RegisterConfirmationsNtfnWithRetry( + ctx, f.cfg.ChainNotifier, nil, htlc.PkScript, + defaultConfTarget, int32(f.loopIn.InitiationHeight), lndclient.WithReOrgChan(reorgChan), ) } @@ -497,8 +498,9 @@ func (f *FSM) MonitorInvoiceAndHtlcTxAction(ctx context.Context, } // Subscribe to new blocks. - registerBlocks := f.cfg.ChainNotifier.RegisterBlockEpochNtfn - blockChan, blockChanErr, err := registerBlocks(ctx) + blockChan, blockChanErr, err := utils.RegisterBlockEpochNtfnWithRetry( + ctx, f.cfg.ChainNotifier, + ) if err != nil { err = fmt.Errorf("unable to subscribe to new blocks: %w", err) @@ -730,8 +732,8 @@ func (f *FSM) MonitorHtlcTimeoutSweepAction(ctx context.Context, } htlcTimeoutTxidChan, errChan, err := - f.cfg.ChainNotifier.RegisterConfirmationsNtfn( - ctx, f.loopIn.HtlcTimeoutSweepTxHash, + utils.RegisterConfirmationsNtfnWithRetry( + ctx, f.cfg.ChainNotifier, f.loopIn.HtlcTimeoutSweepTxHash, timeoutSweepPkScript, defaultConfTarget, int32(f.loopIn.InitiationHeight), ) diff --git a/staticaddr/loopin/manager.go b/staticaddr/loopin/manager.go index 05f28f1c9..9a35da9e9 100644 --- a/staticaddr/loopin/manager.go +++ b/staticaddr/loopin/manager.go @@ -22,6 +22,7 @@ import ( "github.com/lightninglabs/loop/staticaddr/address" "github.com/lightninglabs/loop/staticaddr/deposit" "github.com/lightninglabs/loop/swapserverrpc" + "github.com/lightninglabs/loop/utils" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwallet" @@ -158,8 +159,8 @@ func NewManager(cfg *Config, currentHeight uint32) *Manager { // Run runs the static address loop-in manager. func (m *Manager) Run(ctx context.Context, initChan chan struct{}) error { - registerBlockNtfn := m.cfg.ChainNotifier.RegisterBlockEpochNtfn - newBlockChan, newBlockErrChan, err := registerBlockNtfn(ctx) + newBlockChan, newBlockErrChan, err := + utils.RegisterBlockEpochNtfnWithRetry(ctx, m.cfg.ChainNotifier) if err != nil { log.Errorf("unable to register for block notifications: %v", err) diff --git a/staticaddr/withdraw/manager.go b/staticaddr/withdraw/manager.go index 09a567cfe..399c8fb30 100644 --- a/staticaddr/withdraw/manager.go +++ b/staticaddr/withdraw/manager.go @@ -20,6 +20,7 @@ import ( "github.com/lightninglabs/lndclient" "github.com/lightninglabs/loop/staticaddr/deposit" staticaddressrpc "github.com/lightninglabs/loop/swapserverrpc" + "github.com/lightninglabs/loop/utils" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/lnrpc/walletrpc" @@ -154,7 +155,7 @@ func NewManager(cfg *ManagerConfig, currentHeight uint32) *Manager { // Run runs the deposit withdrawal manager. func (m *Manager) Run(ctx context.Context, initChan chan struct{}) error { newBlockChan, newBlockErrChan, err := - m.cfg.ChainNotifier.RegisterBlockEpochNtfn(ctx) + utils.RegisterBlockEpochNtfnWithRetry(ctx, m.cfg.ChainNotifier) if err != nil { log.Errorf("unable to register for block epoch "+ @@ -639,8 +640,8 @@ func (m *Manager) handleWithdrawal(ctx context.Context, } d := deposits[0] - spentChan, errChan, err := m.cfg.ChainNotifier.RegisterSpendNtfn( - ctx, &d.OutPoint, addrParams.PkScript, + spentChan, errChan, err := utils.RegisterSpendNtfnWithRetry( + ctx, m.cfg.ChainNotifier, &d.OutPoint, addrParams.PkScript, int32(d.ConfirmationHeight), ) @@ -653,8 +654,9 @@ func (m *Manager) handleWithdrawal(ctx context.Context, // confirmations. var confChan chan *chainntnfs.TxConfirmation confChan, errChan, err = - m.cfg.ChainNotifier.RegisterConfirmationsNtfn( - ctx, spentTx.SpenderTxHash, + utils.RegisterConfirmationsNtfnWithRetry( + ctx, m.cfg.ChainNotifier, + spentTx.SpenderTxHash, withdrawalPkscript, MinConfs, int32(m.initiationHeight.Load()), ) diff --git a/sweepbatcher/sweep_batch.go b/sweepbatcher/sweep_batch.go index 4415d3fc4..58d7e3724 100644 --- a/sweepbatcher/sweep_batch.go +++ b/sweepbatcher/sweep_batch.go @@ -842,8 +842,9 @@ func (b *batch) Run(ctx context.Context) error { clock := b.cfg.clock startTime := clock.Now() - blockChan, blockErrChan, err := - b.chainNotifier.RegisterBlockEpochNtfn(runCtx) + blockChan, blockErrChan, err := utils.RegisterBlockEpochNtfnWithRetry( + runCtx, b.chainNotifier, + ) if err != nil { return fmt.Errorf("block registration error: %w", err) } @@ -1995,9 +1996,9 @@ func (b *batch) monitorSpend(ctx context.Context, primarySweep sweep) error { reorgChan := make(chan struct{}, 1) - spendChan, spendErrChan, err := b.chainNotifier.RegisterSpendNtfn( - ctx, &primarySweep.outpoint, primarySweep.htlc.PkScript, - primarySweep.initiationHeight, + spendChan, spendErrChan, err := utils.RegisterSpendNtfnWithRetry( + ctx, b.chainNotifier, &primarySweep.outpoint, + primarySweep.htlc.PkScript, primarySweep.initiationHeight, lndclient.WithReOrgChan(reorgChan), ) if err != nil { @@ -2029,9 +2030,9 @@ func (b *batch) monitorConfirmations(ctx context.Context) error { confCtx, cancel := context.WithCancel(ctx) - confChan, errChan, err := b.chainNotifier.RegisterConfirmationsNtfn( - confCtx, b.batchTxid, b.batchPkScript, batchConfHeight, - primarySweep.initiationHeight, + confChan, errChan, err := utils.RegisterConfirmationsNtfnWithRetry( + confCtx, b.chainNotifier, b.batchTxid, b.batchPkScript, + batchConfHeight, primarySweep.initiationHeight, ) if err != nil { cancel() diff --git a/sweepbatcher/sweep_batcher.go b/sweepbatcher/sweep_batcher.go index ec5453a8e..635315835 100644 --- a/sweepbatcher/sweep_batcher.go +++ b/sweepbatcher/sweep_batcher.go @@ -1284,8 +1284,8 @@ func (b *Batcher) monitorSpendAndNotify(ctx context.Context, sweeps []*sweep, sweep := sweeps[0] - spendChan, spendErr, err := b.chainNotifier.RegisterSpendNtfn( - spendCtx, &sweep.outpoint, sweep.htlc.PkScript, + spendChan, spendErr, err := utils.RegisterSpendNtfnWithRetry( + spendCtx, b.chainNotifier, &sweep.outpoint, sweep.htlc.PkScript, sweep.initiationHeight, ) if err != nil { @@ -1412,9 +1412,10 @@ func (b *Batcher) monitorConfAndNotify(ctx context.Context, sweep *sweep, confCtx, cancel := context.WithCancel(ctx) - confChan, errChan, err := b.chainNotifier.RegisterConfirmationsNtfn( - confCtx, &batchTxid, batchPkScript, batchConfHeight, - sweep.initiationHeight, lndclient.WithReOrgChan(reorgChan), + confChan, errChan, err := utils.RegisterConfirmationsNtfnWithRetry( + confCtx, b.chainNotifier, &batchTxid, batchPkScript, + batchConfHeight, sweep.initiationHeight, + lndclient.WithReOrgChan(reorgChan), ) if err != nil { cancel() diff --git a/utils/chainnotifier.go b/utils/chainnotifier.go new file mode 100644 index 000000000..389c1e93f --- /dev/null +++ b/utils/chainnotifier.go @@ -0,0 +1,155 @@ +package utils + +import ( + "context" + "strings" + "time" + + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/lightninglabs/lndclient" + "github.com/lightningnetwork/lnd/chainntnfs" + "google.golang.org/grpc/status" +) + +const ( + // chainNotifierStartupMessage is returned by lnd while the chain + // notifier RPC sub-server is initialising. + chainNotifierStartupMessage = "chain notifier RPC is still in the " + + "process of starting" + + // chainNotifierRetryBackoff is the delay used between subscription + // attempts while the chain notifier is still starting. + chainNotifierRetryBackoff = 500 * time.Millisecond +) + +// BlockEpochRegistrar represents the ability to subscribe to block epoch +// notifications. +type BlockEpochRegistrar interface { + RegisterBlockEpochNtfn(ctx context.Context) (chan int32, chan error, + error) +} + +// ConfirmationsRegistrar represents the ability to subscribe to confirmation +// notifications. +type ConfirmationsRegistrar interface { + RegisterConfirmationsNtfn(ctx context.Context, txid *chainhash.Hash, + pkScript []byte, numConfs, heightHint int32, + opts ...lndclient.NotifierOption) (chan *chainntnfs.TxConfirmation, + chan error, error) +} + +// SpendRegistrar represents the ability to subscribe to spend notifications. +type SpendRegistrar interface { + RegisterSpendNtfn(ctx context.Context, + outpoint *wire.OutPoint, pkScript []byte, heightHint int32, + optFuncs ...lndclient.NotifierOption) (chan *chainntnfs.SpendDetail, + chan error, error) +} + +// RegisterBlockEpochNtfnWithRetry keeps retrying block epoch subscriptions as +// long as lnd reports that the chain notifier sub-server is still starting. +func RegisterBlockEpochNtfnWithRetry(ctx context.Context, + registrar BlockEpochRegistrar) (chan int32, chan error, error) { + + for { + blockChan, errChan, err := registrar.RegisterBlockEpochNtfn(ctx) + if err == nil { + return blockChan, errChan, nil + } + + if !isChainNotifierStartingErr(err) { + return nil, nil, err + } + + log.Warnf("Chain notifier RPC not ready yet, retrying: %v", + err) + + select { + case <-time.After(chainNotifierRetryBackoff): + continue + + case <-ctx.Done(): + return nil, nil, ctx.Err() + } + } +} + +// RegisterConfirmationsNtfnWithRetry keeps retrying confirmation subscriptions +// while lnd reports that the chain notifier sub-server is still starting. +func RegisterConfirmationsNtfnWithRetry(ctx context.Context, + registrar ConfirmationsRegistrar, txid *chainhash.Hash, pkScript []byte, + numConfs, heightHint int32, opts ...lndclient.NotifierOption) ( + chan *chainntnfs.TxConfirmation, chan error, error) { + + for { + confChan, errChan, err := registrar.RegisterConfirmationsNtfn( + ctx, txid, pkScript, numConfs, heightHint, opts..., + ) + if err == nil { + return confChan, errChan, nil + } + + if !isChainNotifierStartingErr(err) { + return nil, nil, err + } + + log.Warnf("Chain notifier RPC not ready yet, retrying: %v", + err) + + select { + case <-time.After(chainNotifierRetryBackoff): + continue + + case <-ctx.Done(): + return nil, nil, ctx.Err() + } + } +} + +// RegisterSpendNtfnWithRetry keeps retrying spend subscriptions while lnd +// reports that the chain notifier sub-server is still starting. +func RegisterSpendNtfnWithRetry(ctx context.Context, + registrar SpendRegistrar, outpoint *wire.OutPoint, pkScript []byte, + heightHint int32, optFuncs ...lndclient.NotifierOption) ( + chan *chainntnfs.SpendDetail, chan error, error) { + + for { + spendChan, errChan, err := registrar.RegisterSpendNtfn( + ctx, outpoint, pkScript, heightHint, optFuncs..., + ) + if err == nil { + return spendChan, errChan, nil + } + + if !isChainNotifierStartingErr(err) { + return nil, nil, err + } + + log.Warnf("Chain notifier RPC not ready yet, retrying: %v", + err) + + select { + case <-time.After(chainNotifierRetryBackoff): + continue + + case <-ctx.Done(): + return nil, nil, ctx.Err() + } + } +} + +// isChainNotifierStartingErr checks whether an error indicates that lnd's chain +// notifier has not started yet. +func isChainNotifierStartingErr(err error) bool { + if err == nil { + return false + } + + st, ok := status.FromError(err) + if ok && strings.Contains(st.Message(), chainNotifierStartupMessage) { + return true + } + + return strings.Contains(err.Error(), chainNotifierStartupMessage) +} diff --git a/utils/chainnotifier_test.go b/utils/chainnotifier_test.go new file mode 100644 index 000000000..3e014c543 --- /dev/null +++ b/utils/chainnotifier_test.go @@ -0,0 +1,227 @@ +package utils + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/lightninglabs/lndclient" + "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// stubBlockEpochRegistrar implements BlockEpochRegistrar and records the +// number of attempts before succeeding. +type stubBlockEpochRegistrar struct { + mu sync.Mutex + attempts int + succeedAfter int +} + +// RegisterBlockEpochNtfn simulates a chain notifier that returns a startup +// error until the configured number of attempts has been exhausted. +func (s *stubBlockEpochRegistrar) RegisterBlockEpochNtfn( + context.Context) (chan int32, chan error, error) { + + s.mu.Lock() + defer s.mu.Unlock() + + s.attempts++ + if s.attempts <= s.succeedAfter { + return nil, nil, status.Error( + codes.Unknown, chainNotifierStartupMessage, + ) + } + + return make(chan int32), make(chan error), nil +} + +// Attempts returns the total number of registration attempts made. +func (s *stubBlockEpochRegistrar) Attempts() int { + s.mu.Lock() + defer s.mu.Unlock() + + return s.attempts +} + +// stubConfirmationsRegistrar implements ConfirmationsRegistrar. +type stubConfirmationsRegistrar struct { + mu sync.Mutex + attempts int + succeedAfter int +} + +func (s *stubConfirmationsRegistrar) RegisterConfirmationsNtfn( + context.Context, *chainhash.Hash, []byte, int32, int32, + ...lndclient.NotifierOption) (chan *chainntnfs.TxConfirmation, chan error, + error) { + + s.mu.Lock() + defer s.mu.Unlock() + + s.attempts++ + if s.attempts <= s.succeedAfter { + return nil, nil, status.Error( + codes.Unknown, chainNotifierStartupMessage, + ) + } + + return make(chan *chainntnfs.TxConfirmation), make(chan error), nil +} + +func (s *stubConfirmationsRegistrar) Attempts() int { + s.mu.Lock() + defer s.mu.Unlock() + + return s.attempts +} + +// stubSpendRegistrar implements SpendRegistrar. +type stubSpendRegistrar struct { + mu sync.Mutex + attempts int + succeedAfter int +} + +func (s *stubSpendRegistrar) RegisterSpendNtfn(context.Context, + *wire.OutPoint, []byte, int32, ...lndclient.NotifierOption) ( + chan *chainntnfs.SpendDetail, chan error, error) { + + s.mu.Lock() + defer s.mu.Unlock() + + s.attempts++ + if s.attempts <= s.succeedAfter { + return nil, nil, status.Error( + codes.Unknown, chainNotifierStartupMessage, + ) + } + + return make(chan *chainntnfs.SpendDetail), make(chan error), nil +} + +func (s *stubSpendRegistrar) Attempts() int { + s.mu.Lock() + defer s.mu.Unlock() + + return s.attempts +} + +// TestRegisterBlockEpochNtfnWithRetry ensures we retry until the notifier +// becomes available. +func TestRegisterBlockEpochNtfnWithRetry(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(t.Context(), time.Second) + t.Cleanup(cancel) + + stub := &stubBlockEpochRegistrar{ + succeedAfter: 1, + } + + blockChan, errChan, err := RegisterBlockEpochNtfnWithRetry(ctx, stub) + require.NoError(t, err) + require.NotNil(t, blockChan) + require.NotNil(t, errChan) + require.Equal(t, 2, stub.Attempts()) +} + +// TestRegisterBlockEpochNtfnWithRetryContextCancel ensures we propagate the +// caller's context error if the notifier never becomes ready. +func TestRegisterBlockEpochNtfnWithRetryContextCancel(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(t.Context(), 50*time.Millisecond) + t.Cleanup(cancel) + + stub := &stubBlockEpochRegistrar{ + succeedAfter: 100, + } + + _, _, err := RegisterBlockEpochNtfnWithRetry(ctx, stub) + require.ErrorIs(t, err, context.DeadlineExceeded) + require.GreaterOrEqual(t, stub.Attempts(), 1) +} + +// TestRegisterConfirmationsNtfnWithRetry ensures confirmation subscriptions +// retry until the notifier becomes available. +func TestRegisterConfirmationsNtfnWithRetry(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(t.Context(), time.Second) + t.Cleanup(cancel) + + stub := &stubConfirmationsRegistrar{ + succeedAfter: 2, + } + + confChan, errChan, err := RegisterConfirmationsNtfnWithRetry( + ctx, stub, nil, nil, 1, 0, + ) + require.NoError(t, err) + require.NotNil(t, confChan) + require.NotNil(t, errChan) + require.Equal(t, 3, stub.Attempts()) +} + +// TestRegisterConfirmationsNtfnWithRetryContextCancel ensures context +// cancellation is propagated while retrying confirmation subscriptions. +func TestRegisterConfirmationsNtfnWithRetryContextCancel(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(t.Context(), 50*time.Millisecond) + t.Cleanup(cancel) + + stub := &stubConfirmationsRegistrar{ + succeedAfter: 100, + } + + _, _, err := RegisterConfirmationsNtfnWithRetry( + ctx, stub, nil, nil, 1, 0, + ) + require.ErrorIs(t, err, context.DeadlineExceeded) + require.GreaterOrEqual(t, stub.Attempts(), 1) +} + +// TestRegisterSpendNtfnWithRetry ensures spend subscriptions retry until the +// notifier becomes available. +func TestRegisterSpendNtfnWithRetry(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(t.Context(), time.Second) + t.Cleanup(cancel) + + stub := &stubSpendRegistrar{ + succeedAfter: 1, + } + + spendChan, errChan, err := RegisterSpendNtfnWithRetry( + ctx, stub, nil, nil, 0, + ) + require.NoError(t, err) + require.NotNil(t, spendChan) + require.NotNil(t, errChan) + require.Equal(t, 2, stub.Attempts()) +} + +// TestRegisterSpendNtfnWithRetryContextCancel ensures spend subscription retry +// honours context cancellation. +func TestRegisterSpendNtfnWithRetryContextCancel(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(t.Context(), 50*time.Millisecond) + t.Cleanup(cancel) + + stub := &stubSpendRegistrar{ + succeedAfter: 100, + } + + _, _, err := RegisterSpendNtfnWithRetry(ctx, stub, nil, nil, 0) + require.ErrorIs(t, err, context.DeadlineExceeded) + require.GreaterOrEqual(t, stub.Attempts(), 1) +} diff --git a/utils/log.go b/utils/log.go new file mode 100644 index 000000000..03fa858c7 --- /dev/null +++ b/utils/log.go @@ -0,0 +1,24 @@ +package utils + +import ( + "github.com/btcsuite/btclog/v2" + "github.com/lightningnetwork/lnd/build" +) + +// Subsystem defines the sub system name of this package. +const Subsystem = "UTILS" + +// log is a logger that is initialized with no output filters. This means the +// package will not perform any logging by default until the caller requests it. +var log btclog.Logger + +// The default amount of logging is none. +func init() { + UseLogger(build.NewSubLogger(Subsystem, nil)) +} + +// UseLogger uses a specified Logger to output package logging info. This should +// be used in preference to SetLogWriter if the caller is also using btclog. +func UseLogger(logger btclog.Logger) { + log = logger +}