Skip to content

Async BumpTransactionEventHandler #3752

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
305 changes: 168 additions & 137 deletions lightning/src/events/bump_transaction.rs

Large diffs are not rendered by default.

243 changes: 243 additions & 0 deletions lightning/src/events/bump_transaction_sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
//! This module provides synchronous wrappers around bump transaction event handler and related types.

use core::future::Future;
use core::ops::Deref;
use core::task;

use crate::chain::chaininterface::BroadcasterInterface;
use crate::chain::ClaimId;
use crate::prelude::*;
use crate::sign::SignerProvider;
use crate::sync::Arc;
use crate::util::async_poll::{dummy_waker, AsyncResult, MaybeSend, MaybeSync};
use crate::util::logger::Logger;

use bitcoin::{Psbt, ScriptBuf, Transaction, TxOut};

use super::bump_transaction::{
BumpTransactionEventHandler, CoinSelection, CoinSelectionSource, Input, Utxo, Wallet,
WalletSource,
};
use super::BumpTransactionEvent;

/// A synchronous version of the [`WalletSource`] trait. Implementations of this trait should be wrapped in
/// WalletSourceSyncWrapper for it to be used within rust-lightning.
pub trait WalletSourceSync {
/// A synchronous version of [`WalletSource::list_confirmed_utxos`].
fn list_confirmed_utxos(&self) -> Result<Vec<Utxo>, ()>;
/// A synchronous version of [`WalletSource::get_change_script`].
fn get_change_script(&self) -> Result<ScriptBuf, ()>;
/// A Synchronous version of [`WalletSource::sign_psbt`].
fn sign_psbt(&self, psbt: Psbt) -> Result<Transaction, ()>;
}

/// A wrapper around [`WalletSourceSync`] to allow for async calls.
///
/// This wrapper isn't intended to be used directly, because that would risk blocking an async context. Instead, it is
/// built for you in [`WalletSync::new`].
#[doc(hidden)]
pub(crate) struct WalletSourceSyncWrapper<T: Deref>(T)
where
T::Target: WalletSourceSync;

impl<T: Deref> WalletSourceSyncWrapper<T>
where
T::Target: WalletSourceSync,
{
/// Creates a new [`WalletSourceSyncWrapper`].
pub fn new(source: T) -> Self {
Self(source)
}
}
impl<T: Deref> WalletSource for WalletSourceSyncWrapper<T>
where
T::Target: WalletSourceSync,
{
/// Returns all UTXOs, with at least 1 confirmation each, that are available to spend. Wraps
/// [`WalletSourceSync::list_confirmed_utxos`].
fn list_confirmed_utxos<'a>(&'a self) -> AsyncResult<'a, Vec<Utxo>> {
let utxos = self.0.list_confirmed_utxos();
Box::pin(async move { utxos })
}

/// Returns a script to use for change above dust resulting from a successful coin selection attempt. Wraps
/// [`WalletSourceSync::get_change_script`].
fn get_change_script<'a>(&'a self) -> AsyncResult<'a, ScriptBuf> {
let script = self.0.get_change_script();
Box::pin(async move { script })
}

/// Signs and provides the full [`TxIn::script_sig`] and [`TxIn::witness`] for all inputs within the transaction
/// known to the wallet (i.e., any provided via [`WalletSource::list_confirmed_utxos`]). Wraps
/// [`WalletSourceSync::sign_psbt`].
fn sign_psbt<'a>(&'a self, psbt: Psbt) -> AsyncResult<'a, Transaction> {
let signed_psbt = self.0.sign_psbt(psbt);
Box::pin(async move { signed_psbt })
}
}

/// A synchronous wrapper around [`Wallet`] to be used in contexts where async is not available.
pub struct WalletSync<W: Deref + MaybeSync + MaybeSend, L: Deref + MaybeSync + MaybeSend>
where
W::Target: WalletSourceSync + MaybeSend,
L::Target: Logger + MaybeSend,
{
wallet: Wallet<Arc<WalletSourceSyncWrapper<W>>, L>,
}

impl<W: Deref + MaybeSync + MaybeSend, L: Deref + MaybeSync + MaybeSend> WalletSync<W, L>
where
W::Target: WalletSourceSync + MaybeSend,
L::Target: Logger + MaybeSend,
{
/// Constructs a new [`WalletSync`] instance.
pub fn new(source: W, logger: L) -> Self {
Self { wallet: Wallet::new(Arc::new(WalletSourceSyncWrapper::new(source)), logger) }
}
}

impl<W: Deref + MaybeSync + MaybeSend, L: Deref + MaybeSync + MaybeSend> CoinSelectionSourceSync
for WalletSync<W, L>
where
W::Target: WalletSourceSync + MaybeSend + MaybeSync,
L::Target: Logger + MaybeSend + MaybeSync,
{
fn select_confirmed_utxos(
&self, claim_id: ClaimId, must_spend: Vec<Input>, must_pay_to: &[TxOut],
target_feerate_sat_per_1000_weight: u32,
) -> Result<CoinSelection, ()> {
let mut fut = self.wallet.select_confirmed_utxos(
claim_id,
must_spend,
must_pay_to,
target_feerate_sat_per_1000_weight,
);
let mut waker = dummy_waker();
let mut ctx = task::Context::from_waker(&mut waker);
match fut.as_mut().poll(&mut ctx) {
task::Poll::Ready(result) => result,
task::Poll::Pending => {
unreachable!(
"Wallet::select_confirmed_utxos should not be pending in a sync context"
);
},
}
}

fn sign_psbt(&self, psbt: Psbt) -> Result<Transaction, ()> {
let mut fut = self.wallet.sign_psbt(psbt);
let mut waker = dummy_waker();
let mut ctx = task::Context::from_waker(&mut waker);
match fut.as_mut().poll(&mut ctx) {
task::Poll::Ready(result) => result,
task::Poll::Pending => {
unreachable!("Wallet::sign_psbt should not be pending in a sync context");
},
}
}
}

/// A synchronous helper trait that can be used to implement [`CoinSelectionSource`] in a synchronous
/// context.
pub trait CoinSelectionSourceSync {
/// A synchronous version of [`CoinSelectionSource::select_confirmed_utxos`].
fn select_confirmed_utxos(
&self, claim_id: ClaimId, must_spend: Vec<Input>, must_pay_to: &[TxOut],
target_feerate_sat_per_1000_weight: u32,
) -> Result<CoinSelection, ()>;

/// A synchronous version of [`CoinSelectionSource::sign_psbt`].
fn sign_psbt(&self, psbt: Psbt) -> Result<Transaction, ()>;
}

/// A wrapper around [`CoinSelectionSourceSync`] to allow for async calls.
///
/// This wrapper isn't intended to be used directly, because that would risk blocking an async context. Instead, it is
/// built for you in [`BumpTransactionEventHandlerSync::new`].
#[doc(hidden)]
#[cfg(any(test, feature = "_test_utils"))]
pub struct CoinSelectionSourceSyncWrapper<T: Deref>(T)
where
T::Target: CoinSelectionSourceSync;
#[cfg(not(any(test, feature = "_test_utils")))]
pub(crate) struct CoinSelectionSourceSyncWrapper<T: Deref>(T)
where
T::Target: CoinSelectionSourceSync;

impl<T: Deref> CoinSelectionSourceSyncWrapper<T>
where
T::Target: CoinSelectionSourceSync,
{
#[allow(dead_code)]
pub fn new(source: T) -> Self {
Self(source)
}
}

impl<T: Deref> CoinSelectionSource for CoinSelectionSourceSyncWrapper<T>
where
T::Target: CoinSelectionSourceSync,
{
fn select_confirmed_utxos<'a>(
&'a self, claim_id: ClaimId, must_spend: Vec<Input>, must_pay_to: &'a [TxOut],
target_feerate_sat_per_1000_weight: u32,
) -> AsyncResult<'a, CoinSelection> {
let coins = self.0.select_confirmed_utxos(
claim_id,
must_spend,
must_pay_to,
target_feerate_sat_per_1000_weight,
);
Box::pin(async move { coins })
}

fn sign_psbt<'a>(&'a self, psbt: Psbt) -> AsyncResult<'a, Transaction> {
let psbt = self.0.sign_psbt(psbt);
Box::pin(async move { psbt })
}
}

/// A synchronous wrapper around [`BumpTransactionEventHandler`] to be used in contexts where async is not available.
pub struct BumpTransactionEventHandlerSync<B: Deref, C: Deref, SP: Deref, L: Deref>
where
B::Target: BroadcasterInterface,
C::Target: CoinSelectionSourceSync,
SP::Target: SignerProvider,
L::Target: Logger,
{
bump_transaction_event_handler:
Arc<BumpTransactionEventHandler<B, Arc<CoinSelectionSourceSyncWrapper<C>>, SP, L>>,
}

impl<B: Deref, C: Deref, SP: Deref, L: Deref> BumpTransactionEventHandlerSync<B, C, SP, L>
where
B::Target: BroadcasterInterface,
C::Target: CoinSelectionSourceSync,
SP::Target: SignerProvider,
L::Target: Logger,
{
/// Constructs a new instance of [`BumpTransactionEventHandlerSync`].
pub fn new(broadcaster: B, utxo_source: C, signer_provider: SP, logger: L) -> Self {
let bump_transaction_event_handler = Arc::new(BumpTransactionEventHandler::new(
broadcaster,
Arc::new(CoinSelectionSourceSyncWrapper(utxo_source)),
signer_provider,
logger,
));
Self { bump_transaction_event_handler }
}

/// Handles all variants of [`BumpTransactionEvent`].
pub fn handle_event(&self, event: &BumpTransactionEvent) {
let mut fut = Box::pin(self.bump_transaction_event_handler.handle_event(event));
let mut waker = dummy_waker();
let mut ctx = task::Context::from_waker(&mut waker);
match fut.as_mut().poll(&mut ctx) {
task::Poll::Ready(result) => result,
task::Poll::Pending => {
// In a sync context, we can't wait for the future to complete.
unreachable!("BumpTransactionEventHandlerSync::handle_event should not be pending in a sync context");
},
}
}
}
1 change: 1 addition & 0 deletions lightning/src/events/mod.rs
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
//! few other things.

pub mod bump_transaction;
pub mod bump_transaction_sync;

pub use bump_transaction::BumpTransactionEvent;

2 changes: 1 addition & 1 deletion lightning/src/ln/async_signer_tests.rs
Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@ use bitcoin::transaction::Version;

use crate::chain::channelmonitor::LATENCY_GRACE_PERIOD_BLOCKS;
use crate::chain::ChannelMonitorUpdateStatus;
use crate::events::bump_transaction::WalletSource;
use crate::events::bump_transaction_sync::WalletSourceSync;
use crate::events::{ClosureReason, Event};
use crate::ln::chan_utils::ClosingTransaction;
use crate::ln::channel::DISCONNECT_PEER_AWAITING_RESPONSE_TICKS;
14 changes: 8 additions & 6 deletions lightning/src/ln/functional_test_utils.rs
Original file line number Diff line number Diff line change
@@ -16,7 +16,8 @@ use crate::chain::{BestBlock, ChannelMonitorUpdateStatus, Confirm, Listen, Watch
use crate::chain::channelmonitor::ChannelMonitor;
use crate::chain::transaction::OutPoint;
use crate::events::{ClaimedHTLC, ClosureReason, Event, HTLCHandlingFailureType, PaidBolt12Invoice, PathFailure, PaymentFailureReason, PaymentPurpose};
use crate::events::bump_transaction::{BumpTransactionEvent, BumpTransactionEventHandler, Wallet, WalletSource};
use crate::events::bump_transaction::{BumpTransactionEvent};
use crate::events::bump_transaction_sync::{BumpTransactionEventHandlerSync, WalletSourceSync, WalletSync};
use crate::ln::types::ChannelId;
use crate::types::features::ChannelTypeFeatures;
use crate::types::payment::{PaymentPreimage, PaymentHash, PaymentSecret};
@@ -472,9 +473,9 @@ pub struct Node<'chan_man, 'node_cfg: 'chan_man, 'chan_mon_cfg: 'node_cfg> {
pub connect_style: Rc<RefCell<ConnectStyle>>,
pub override_init_features: Rc<RefCell<Option<InitFeatures>>>,
pub wallet_source: Arc<test_utils::TestWalletSource>,
pub bump_tx_handler: BumpTransactionEventHandler<
pub bump_tx_handler: BumpTransactionEventHandlerSync<
&'chan_mon_cfg test_utils::TestBroadcaster,
Arc<Wallet<Arc<test_utils::TestWalletSource>, &'chan_mon_cfg test_utils::TestLogger>>,
Arc<WalletSync<Arc<test_utils::TestWalletSource>, &'chan_mon_cfg test_utils::TestLogger>>,
&'chan_mon_cfg test_utils::TestKeysInterface,
&'chan_mon_cfg test_utils::TestLogger,
>,
@@ -3424,6 +3425,7 @@ pub fn create_network<'a, 'b: 'a, 'c: 'b>(node_count: usize, cfgs: &'b Vec<NodeC
);
let gossip_sync = P2PGossipSync::new(cfgs[i].network_graph.as_ref(), None, cfgs[i].logger);
let wallet_source = Arc::new(test_utils::TestWalletSource::new(SecretKey::from_slice(&[i as u8 + 1; 32]).unwrap()));
let wallet = Arc::new(WalletSync::new(wallet_source.clone(), cfgs[i].logger));
nodes.push(Node{
chain_source: cfgs[i].chain_source, tx_broadcaster: cfgs[i].tx_broadcaster,
fee_estimator: cfgs[i].fee_estimator, router: &cfgs[i].router,
@@ -3435,9 +3437,9 @@ pub fn create_network<'a, 'b: 'a, 'c: 'b>(node_count: usize, cfgs: &'b Vec<NodeC
blocks: Arc::clone(&cfgs[i].tx_broadcaster.blocks),
connect_style: Rc::clone(&connect_style),
override_init_features: Rc::clone(&cfgs[i].override_init_features),
wallet_source: Arc::clone(&wallet_source),
bump_tx_handler: BumpTransactionEventHandler::new(
cfgs[i].tx_broadcaster, Arc::new(Wallet::new(Arc::clone(&wallet_source), cfgs[i].logger)),
wallet_source: wallet_source.clone(),
bump_tx_handler: BumpTransactionEventHandlerSync::new(
cfgs[i].tx_broadcaster, wallet,
&cfgs[i].keys_manager, cfgs[i].logger,
),
})
3 changes: 2 additions & 1 deletion lightning/src/ln/functional_tests.rs
Original file line number Diff line number Diff line change
@@ -20,7 +20,6 @@ use crate::chain::channelmonitor::{
};
use crate::chain::transaction::OutPoint;
use crate::chain::{ChannelMonitorUpdateStatus, Confirm, Listen, Watch};
use crate::events::bump_transaction::WalletSource;
use crate::events::{
ClosureReason, Event, FundingInfo, HTLCHandlingFailureType, PathFailure, PaymentFailureReason,
PaymentPurpose,
@@ -1474,6 +1473,8 @@ pub fn claim_htlc_outputs() {
// This is a regression test for https://github.com/lightningdevkit/rust-lightning/issues/3537.
#[xtest(feature = "_externalize_tests")]
pub fn test_multiple_package_conflicts() {
use crate::events::bump_transaction_sync::WalletSourceSync;

let chanmon_cfgs = create_chanmon_cfgs(3);
let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
let mut user_cfg = test_default_channel_config();
3 changes: 2 additions & 1 deletion lightning/src/ln/monitor_tests.rs
Original file line number Diff line number Diff line change
@@ -11,11 +11,12 @@

//! Further functional tests which test blockchain reorganizations.

use crate::events::bump_transaction_sync::WalletSourceSync;
use crate::sign::{ecdsa::EcdsaChannelSigner, OutputSpender, SignerProvider, SpendableOutputDescriptor};
use crate::chain::channelmonitor::{ANTI_REORG_DELAY, ARCHIVAL_DELAY_BLOCKS,LATENCY_GRACE_PERIOD_BLOCKS, COUNTERPARTY_CLAIMABLE_WITHIN_BLOCKS_PINNABLE, Balance, BalanceSource, ChannelMonitorUpdateStep};
use crate::chain::transaction::OutPoint;
use crate::chain::chaininterface::{ConfirmationTarget, LowerBoundedFeeEstimator, compute_feerate_sat_per_1000_weight};
use crate::events::bump_transaction::{BumpTransactionEvent, WalletSource};
use crate::events::bump_transaction::{BumpTransactionEvent};
use crate::events::{Event, ClosureReason, HTLCHandlingFailureType};
use crate::ln::channel;
use crate::ln::types::ChannelId;
20 changes: 20 additions & 0 deletions lightning/src/util/async_poll.rs
Original file line number Diff line number Diff line change
@@ -97,4 +97,24 @@ pub(crate) fn dummy_waker() -> Waker {
}

/// A type alias for a future that returns a result of type T.
#[cfg(feature = "std")]
pub type AsyncResult<'a, T> = Pin<Box<dyn Future<Output = Result<T, ()>> + 'a + Send>>;
#[cfg(not(feature = "std"))]
pub type AsyncResult<'a, T> = Pin<Box<dyn Future<Output = Result<T, ()>> + 'a>>;

// Marker trait to optionally implement `Sync` under std.
#[cfg(feature = "std")]
pub use core::marker::Sync as MaybeSync;

#[cfg(not(feature = "std"))]
pub trait MaybeSync {}
#[cfg(not(feature = "std"))]
impl<T> MaybeSync for T where T: ?Sized {}

// Marker trait to optionally implement `Send` under std.
#[cfg(feature = "std")]
pub use core::marker::Send as MaybeSend;
#[cfg(not(feature = "std"))]
pub trait MaybeSend {}
#[cfg(not(feature = "std"))]
impl<T> MaybeSend for T where T: ?Sized {}
Loading