Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 6 additions & 31 deletions executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -14,6 +13,7 @@ import (
"github.com/lightninglabs/loop/loopdb"
"github.com/lightninglabs/loop/sweep"
"github.com/lightninglabs/loop/sweepbatcher"
"github.com/lightninglabs/loop/utils"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/queue"
)
Expand Down Expand Up @@ -67,40 +67,15 @@ func (s *executor) run(mainCtx context.Context,
statusChan chan<- SwapInfo,
abandonChans map[lntypes.Hash]chan struct{}) error {

var (
err error
blockEpochChan <-chan int32
blockErrorChan <-chan error
batcherErrChan chan error
blockEpochChan, blockErrorChan, err := utils.RegisterBlockEpochNtfnWithRetry(
mainCtx, s.lnd.ChainNotifier,
)

for {
blockEpochChan, blockErrorChan, err =
s.lnd.ChainNotifier.RegisterBlockEpochNtfn(mainCtx)
if err == nil {
break
}

if strings.Contains(err.Error(),
"in the process of starting") {

log.Warnf("LND chain notifier server not ready yet, " +
"retrying with delay")

// Give chain notifier some time to start and try to
// re-attempt block epoch subscription.
select {
case <-time.After(500 * time.Millisecond):
continue

case <-mainCtx.Done():
return err
}
}

if err != nil {
return err
}

var batcherErrChan chan error

// Before starting, make sure we have an up-to-date block height.
// Otherwise, we might reveal a preimage for a swap that is already
// expired.
Expand Down
27 changes: 13 additions & 14 deletions instantout/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/lightninglabs/loop/loopdb"
"github.com/lightninglabs/loop/swap"
"github.com/lightninglabs/loop/swapserverrpc"
"github.com/lightninglabs/loop/utils"
"github.com/lightningnetwork/lnd/lnrpc"
"github.com/lightningnetwork/lnd/lnrpc/walletrpc"
"github.com/lightningnetwork/lnd/lntypes"
Expand Down Expand Up @@ -460,11 +461,10 @@ func (f *FSM) WaitForSweeplessSweepConfirmedAction(ctx context.Context,
return f.HandleError(err)
}

confChan, confErrChan, err := f.cfg.ChainNotifier.
RegisterConfirmationsNtfn(
ctx, f.InstantOut.SweepTxHash, pkscript,
1, f.InstantOut.initiationHeight,
)
confChan, confErrChan, err := utils.RegisterConfirmationsNtfnWithRetry(
ctx, f.cfg.ChainNotifier, f.InstantOut.SweepTxHash, pkscript,
1, f.InstantOut.initiationHeight,
)
if err != nil {
return f.HandleError(err)
}
Expand Down Expand Up @@ -505,12 +505,11 @@ func (f *FSM) PublishHtlcAction(ctx context.Context,
f.Debugf("published htlc tx: %v", txHash)

// We'll now wait for the htlc to be confirmed.
confChan, confErrChan, err := f.cfg.ChainNotifier.
RegisterConfirmationsNtfn(
ctx, &txHash,
f.InstantOut.finalizedHtlcTx.TxOut[0].PkScript,
1, f.InstantOut.initiationHeight,
)
confChan, confErrChan, err := utils.RegisterConfirmationsNtfnWithRetry(
ctx, f.cfg.ChainNotifier, &txHash,
f.InstantOut.finalizedHtlcTx.TxOut[0].PkScript,
1, f.InstantOut.initiationHeight,
)
if err != nil {
return f.HandleError(err)
}
Expand Down Expand Up @@ -575,9 +574,9 @@ func (f *FSM) WaitForHtlcSweepConfirmedAction(ctx context.Context,
return f.HandleError(err)
}

confChan, confErrChan, err := f.cfg.ChainNotifier.RegisterConfirmationsNtfn(
ctx, f.InstantOut.SweepTxHash, sweepPkScript,
1, f.InstantOut.initiationHeight,
confChan, confErrChan, err := utils.RegisterConfirmationsNtfnWithRetry(
ctx, f.cfg.ChainNotifier, f.InstantOut.SweepTxHash,
sweepPkScript, 1, f.InstantOut.initiationHeight,
)
if err != nil {
return f.HandleError(err)
Expand Down
5 changes: 3 additions & 2 deletions instantout/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/lightninglabs/loop/fsm"
"github.com/lightninglabs/loop/instantout/reservation"
"github.com/lightninglabs/loop/swapserverrpc"
"github.com/lightninglabs/loop/utils"
"github.com/lightningnetwork/lnd/lntypes"
)

Expand Down Expand Up @@ -68,8 +69,8 @@ func (m *Manager) Run(ctx context.Context, initChan chan struct{}) error {
return err
}

newBlockChan, newBlockErrChan, err := m.cfg.ChainNotifier.
RegisterBlockEpochNtfn(ctx)
newBlockChan, newBlockErrChan, err :=
utils.RegisterBlockEpochNtfnWithRetry(ctx, m.cfg.ChainNotifier)
if err != nil {
close(initChan)
return err
Expand Down
20 changes: 11 additions & 9 deletions instantout/reservation/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/btcsuite/btcd/btcutil"
"github.com/lightninglabs/loop/fsm"
"github.com/lightninglabs/loop/swapserverrpc"
"github.com/lightninglabs/loop/utils"
"github.com/lightningnetwork/lnd/chainntnfs"
)

Expand Down Expand Up @@ -94,17 +95,17 @@ func (f *FSM) SubscribeToConfirmationAction(ctx context.Context,
"initiation height: %v", f.reservation.ID, pkscript,
f.reservation.InitiationHeight)

confChan, errConfChan, err := f.cfg.ChainNotifier.RegisterConfirmationsNtfn(
callCtx, nil, pkscript, DefaultConfTarget,
f.reservation.InitiationHeight,
confChan, errConfChan, err := utils.RegisterConfirmationsNtfnWithRetry(
callCtx, f.cfg.ChainNotifier, nil, pkscript,
DefaultConfTarget, f.reservation.InitiationHeight,
)
if err != nil {
f.Errorf("unable to subscribe to conf notification: %v", err)
return f.HandleError(err)
}

blockChan, errBlockChan, err := f.cfg.ChainNotifier.RegisterBlockEpochNtfn(
callCtx,
blockChan, errBlockChan, err := utils.RegisterBlockEpochNtfnWithRetry(
callCtx, f.cfg.ChainNotifier,
)
if err != nil {
f.Errorf("unable to subscribe to block notifications: %v", err)
Expand Down Expand Up @@ -158,8 +159,9 @@ func (f *FSM) AsyncWaitForExpiredOrSweptAction(ctx context.Context,

notifCtx, cancel := context.WithCancel(ctx)

blockHeightChan, errEpochChan, err := f.cfg.ChainNotifier.
RegisterBlockEpochNtfn(notifCtx)
blockHeightChan, errEpochChan, err := utils.RegisterBlockEpochNtfnWithRetry(
notifCtx, f.cfg.ChainNotifier,
)
if err != nil {
cancel()
return f.HandleError(err)
Expand All @@ -171,8 +173,8 @@ func (f *FSM) AsyncWaitForExpiredOrSweptAction(ctx context.Context,
return f.HandleError(err)
}

spendChan, errSpendChan, err := f.cfg.ChainNotifier.RegisterSpendNtfn(
notifCtx, f.reservation.Outpoint, pkScript,
spendChan, errSpendChan, err := utils.RegisterSpendNtfnWithRetry(
notifCtx, f.cfg.ChainNotifier, f.reservation.Outpoint, pkScript,
f.reservation.InitiationHeight,
)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions instantout/reservation/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/btcsuite/btcd/btcutil"
"github.com/lightninglabs/loop/fsm"
reservationrpc "github.com/lightninglabs/loop/swapserverrpc"
"github.com/lightninglabs/loop/utils"
)

// Manager manages the reservation state machines.
Expand Down Expand Up @@ -49,8 +50,8 @@ func (m *Manager) Run(ctx context.Context, height int32,
return err
}

newBlockChan, newBlockErrChan, err := m.cfg.ChainNotifier.
RegisterBlockEpochNtfn(runCtx)
newBlockChan, newBlockErrChan, err :=
utils.RegisterBlockEpochNtfnWithRetry(runCtx, m.cfg.ChainNotifier)
if err != nil {
return err
}
Expand Down
10 changes: 6 additions & 4 deletions loopin.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,8 +702,9 @@ func (s *loopInSwap) waitForHtlcConf(globalCtx context.Context) (
return nil, nil, nil
}

return notifier.RegisterConfirmationsNtfn(
ctx, s.htlcTxHash, htlc.PkScript, 1, s.InitiationHeight,
return utils.RegisterConfirmationsNtfnWithRetry(
ctx, notifier, s.htlcTxHash, htlc.PkScript, 1,
s.InitiationHeight,
)
}

Expand Down Expand Up @@ -867,8 +868,9 @@ func (s *loopInSwap) waitForSwapComplete(ctx context.Context,
// Register the htlc spend notification.
rpcCtx, cancel := context.WithCancel(ctx)
defer cancel()
spendChan, spendErr, err := s.lnd.ChainNotifier.RegisterSpendNtfn(
rpcCtx, htlcOutpoint, s.htlc.PkScript, s.InitiationHeight,
spendChan, spendErr, err := utils.RegisterSpendNtfnWithRetry(
rpcCtx, s.lnd.ChainNotifier, htlcOutpoint, s.htlc.PkScript,
s.InitiationHeight,
)
if err != nil {
return fmt.Errorf("register spend ntfn: %v", err)
Expand Down
7 changes: 4 additions & 3 deletions loopout.go
Original file line number Diff line number Diff line change
Expand Up @@ -1012,9 +1012,10 @@ func (s *loopOutSwap) waitForConfirmedHtlc(globalCtx context.Context) (
ctx, cancel := context.WithCancel(globalCtx)
defer cancel()
htlcConfChan, htlcErrChan, err :=
s.lnd.ChainNotifier.RegisterConfirmationsNtfn(
ctx, s.htlcTxHash, s.htlc.PkScript,
int32(s.HtlcConfirmations), s.InitiationHeight,
utils.RegisterConfirmationsNtfnWithRetry(
ctx, s.lnd.ChainNotifier, s.htlcTxHash,
s.htlc.PkScript, int32(s.HtlcConfirmations),
s.InitiationHeight,
)
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion staticaddr/address/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/lightninglabs/loop/staticaddr/version"
"github.com/lightninglabs/loop/swap"
staticaddressrpc "github.com/lightninglabs/loop/swapserverrpc"
"github.com/lightninglabs/loop/utils"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnwallet"
Expand Down Expand Up @@ -69,7 +70,7 @@ func NewManager(cfg *ManagerConfig, currentHeight int32) *Manager {
// Run runs the address manager.
func (m *Manager) Run(ctx context.Context, initChan chan struct{}) error {
newBlockChan, newBlockErrChan, err :=
m.cfg.ChainNotifier.RegisterBlockEpochNtfn(ctx)
utils.RegisterBlockEpochNtfnWithRetry(ctx, m.cfg.ChainNotifier)

if err != nil {
return err
Expand Down
6 changes: 4 additions & 2 deletions staticaddr/deposit/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/lightninglabs/lndclient"
"github.com/lightninglabs/loop/fsm"
"github.com/lightninglabs/loop/staticaddr/script"
"github.com/lightninglabs/loop/utils"
"github.com/lightningnetwork/lnd/lntypes"
)

Expand Down Expand Up @@ -123,8 +124,9 @@ func (f *FSM) WaitForExpirySweepAction(ctx context.Context,
txID = &f.deposit.ExpirySweepTxid
}

spendChan, errSpendChan, err := f.cfg.ChainNotifier.RegisterConfirmationsNtfn( //nolint:lll
ctx, txID, f.deposit.TimeOutSweepPkScript, DefaultConfTarget,
spendChan, errSpendChan, err := utils.RegisterConfirmationsNtfnWithRetry( //nolint:lll
ctx, f.cfg.ChainNotifier, txID,
f.deposit.TimeOutSweepPkScript, DefaultConfTarget,
int32(f.deposit.ConfirmationHeight),
)
if err != nil {
Expand Down
11 changes: 7 additions & 4 deletions staticaddr/deposit/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/lightninglabs/loop"
"github.com/lightninglabs/loop/fsm"
staticaddressrpc "github.com/lightninglabs/loop/swapserverrpc"
"github.com/lightninglabs/loop/utils"
"github.com/lightningnetwork/lnd/lnrpc/walletrpc"
"github.com/lightningnetwork/lnd/lnwallet"
)
Expand Down Expand Up @@ -101,7 +102,8 @@ func NewManager(cfg *ManagerConfig) *Manager {

// Run runs the address manager.
func (m *Manager) Run(ctx context.Context, initChan chan struct{}) error {
newBlockChan, newBlockErrChan, err := m.cfg.ChainNotifier.RegisterBlockEpochNtfn(ctx) //nolint:lll
newBlockChan, newBlockErrChan, err :=
utils.RegisterBlockEpochNtfnWithRetry(ctx, m.cfg.ChainNotifier)
if err != nil {
log.Errorf("unable to register block epoch notifier: %v", err)

Expand Down Expand Up @@ -328,9 +330,10 @@ func (m *Manager) getBlockHeight(ctx context.Context,
}

notifChan, errChan, err :=
m.cfg.ChainNotifier.RegisterConfirmationsNtfn(
ctx, &utxo.OutPoint.Hash, addressParams.PkScript,
MinConfs, addressParams.InitiationHeight,
utils.RegisterConfirmationsNtfnWithRetry(
ctx, m.cfg.ChainNotifier, &utxo.OutPoint.Hash,
addressParams.PkScript, MinConfs,
addressParams.InitiationHeight,
)
if err != nil {
return 0, err
Expand Down
16 changes: 9 additions & 7 deletions staticaddr/loopin/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/lightninglabs/loop/staticaddr/version"
"github.com/lightninglabs/loop/swap"
"github.com/lightninglabs/loop/swapserverrpc"
"github.com/lightninglabs/loop/utils"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/invoices"
Expand Down Expand Up @@ -481,9 +482,9 @@ func (f *FSM) MonitorInvoiceAndHtlcTxAction(ctx context.Context,
registerHtlcConf := func() (chan *chainntnfs.TxConfirmation, chan error,
error) {

return f.cfg.ChainNotifier.RegisterConfirmationsNtfn(
ctx, nil, htlc.PkScript, defaultConfTarget,
int32(f.loopIn.InitiationHeight),
return utils.RegisterConfirmationsNtfnWithRetry(
ctx, f.cfg.ChainNotifier, nil, htlc.PkScript,
defaultConfTarget, int32(f.loopIn.InitiationHeight),
lndclient.WithReOrgChan(reorgChan),
)
}
Expand All @@ -497,8 +498,9 @@ func (f *FSM) MonitorInvoiceAndHtlcTxAction(ctx context.Context,
}

// Subscribe to new blocks.
registerBlocks := f.cfg.ChainNotifier.RegisterBlockEpochNtfn
blockChan, blockChanErr, err := registerBlocks(ctx)
blockChan, blockChanErr, err := utils.RegisterBlockEpochNtfnWithRetry(
ctx, f.cfg.ChainNotifier,
)
if err != nil {
err = fmt.Errorf("unable to subscribe to new blocks: %w", err)

Expand Down Expand Up @@ -730,8 +732,8 @@ func (f *FSM) MonitorHtlcTimeoutSweepAction(ctx context.Context,
}

htlcTimeoutTxidChan, errChan, err :=
f.cfg.ChainNotifier.RegisterConfirmationsNtfn(
ctx, f.loopIn.HtlcTimeoutSweepTxHash,
utils.RegisterConfirmationsNtfnWithRetry(
ctx, f.cfg.ChainNotifier, f.loopIn.HtlcTimeoutSweepTxHash,
timeoutSweepPkScript, defaultConfTarget,
int32(f.loopIn.InitiationHeight),
)
Expand Down
5 changes: 3 additions & 2 deletions staticaddr/loopin/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/lightninglabs/loop/staticaddr/address"
"github.com/lightninglabs/loop/staticaddr/deposit"
"github.com/lightninglabs/loop/swapserverrpc"
"github.com/lightninglabs/loop/utils"
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwallet"
Expand Down Expand Up @@ -158,8 +159,8 @@ func NewManager(cfg *Config, currentHeight uint32) *Manager {

// Run runs the static address loop-in manager.
func (m *Manager) Run(ctx context.Context, initChan chan struct{}) error {
registerBlockNtfn := m.cfg.ChainNotifier.RegisterBlockEpochNtfn
newBlockChan, newBlockErrChan, err := registerBlockNtfn(ctx)
newBlockChan, newBlockErrChan, err :=
utils.RegisterBlockEpochNtfnWithRetry(ctx, m.cfg.ChainNotifier)
if err != nil {
log.Errorf("unable to register for block notifications: %v",
err)
Expand Down
Loading