Skip to content

Commit f78025d

Browse files
jkczyzclaude
andcommitted
Persist payment transaction data without blocking LDK
Previously the BroadcasterInterface implementation wrote the payment record synchronously when LDK invoked it. With a remote KV store this could block LDK's message handling for hundreds of milliseconds per call, noticeably during force-close bursts or splice broadcasts. Persistence now happens asynchronously and must complete before the transaction is sent to the chain client. If persistence fails, the broadcast is dropped: a payment record must exist for every on-chain tx we emit, otherwise a crash could leave the tx confirmed with no matching record. Generated with assistance from Claude Code. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 88cfd95 commit f78025d

5 files changed

Lines changed: 66 additions & 32 deletions

File tree

src/chain/bitcoind.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -571,16 +571,18 @@ impl BitcoindChainSource {
571571
Ok(())
572572
}
573573

574-
pub(crate) async fn process_broadcast_package(&self, package: Vec<Transaction>) {
574+
pub(crate) async fn process_broadcast_package(
575+
&self, txs: impl IntoIterator<Item = Transaction>,
576+
) {
575577
// While it's a bit unclear when we'd be able to lean on Bitcoin Core >v28
576578
// features, we should eventually switch to use `submitpackage` via the
577579
// `rust-bitcoind-json-rpc` crate rather than just broadcasting individual
578580
// transactions.
579-
for tx in &package {
581+
for tx in txs {
580582
let txid = tx.compute_txid();
581583
let timeout_fut = tokio::time::timeout(
582584
Duration::from_secs(DEFAULT_TX_BROADCAST_TIMEOUT_SECS),
583-
self.api_client.broadcast_transaction(tx),
585+
self.api_client.broadcast_transaction(&tx),
584586
);
585587
match timeout_fut.await {
586588
Ok(res) => match res {

src/chain/electrum.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,9 @@ impl ElectrumChainSource {
294294
Ok(())
295295
}
296296

297-
pub(crate) async fn process_broadcast_package(&self, package: Vec<Transaction>) {
297+
pub(crate) async fn process_broadcast_package(
298+
&self, txs: impl IntoIterator<Item = Transaction>,
299+
) {
298300
let electrum_client: Arc<ElectrumRuntimeClient> = if let Some(client) =
299301
self.electrum_runtime_status.read().expect("lock").client().as_ref()
300302
{
@@ -304,7 +306,7 @@ impl ElectrumChainSource {
304306
return;
305307
};
306308

307-
for tx in package {
309+
for tx in txs {
308310
electrum_client.broadcast(tx).await;
309311
}
310312
}

src/chain/esplora.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -355,12 +355,14 @@ impl EsploraChainSource {
355355
Ok(())
356356
}
357357

358-
pub(crate) async fn process_broadcast_package(&self, package: Vec<Transaction>) {
359-
for tx in &package {
358+
pub(crate) async fn process_broadcast_package(
359+
&self, txs: impl IntoIterator<Item = Transaction>,
360+
) {
361+
for tx in txs {
360362
let txid = tx.compute_txid();
361363
let timeout_fut = tokio::time::timeout(
362364
Duration::from_secs(self.sync_config.timeouts_config.tx_broadcast_timeout_secs),
363-
self.esplora_client.broadcast(tx),
365+
self.esplora_client.broadcast(&tx),
364366
);
365367
match timeout_fut.await {
366368
Ok(res) => match res {

src/chain/mod.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::config::{
2424
WALLET_SYNC_INTERVAL_MINIMUM_SECS,
2525
};
2626
use crate::fee_estimator::OnchainFeeEstimator;
27-
use crate::logger::{log_debug, log_info, log_trace, LdkLogger, Logger};
27+
use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger};
2828
use crate::runtime::Runtime;
2929
use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
3030
use crate::{Error, PersistedNodeMetrics};
@@ -453,15 +453,27 @@ impl ChainSource {
453453
return;
454454
}
455455
Some(next_package) = receiver.recv() => {
456+
let package = match self.tx_broadcaster.classify_package(next_package).await {
457+
Ok(p) => p,
458+
Err(e) => {
459+
log_error!(
460+
tx_bcast_logger,
461+
"Skipping broadcast: failed to persist payment records: {:?}",
462+
e,
463+
);
464+
continue;
465+
},
466+
};
467+
let txs = package.into_iter().map(|(tx, _)| tx);
456468
match &self.kind {
457469
ChainSourceKind::Esplora(esplora_chain_source) => {
458-
esplora_chain_source.process_broadcast_package(next_package).await
470+
esplora_chain_source.process_broadcast_package(txs).await
459471
},
460472
ChainSourceKind::Electrum(electrum_chain_source) => {
461-
electrum_chain_source.process_broadcast_package(next_package).await
473+
electrum_chain_source.process_broadcast_package(txs).await
462474
},
463475
ChainSourceKind::Bitcoind(bitcoind_chain_source) => {
464-
bitcoind_chain_source.process_broadcast_package(next_package).await
476+
bitcoind_chain_source.process_broadcast_package(txs).await
465477
},
466478
}
467479
}

src/tx_broadcaster.rs

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,26 @@ use tokio::sync::{mpsc, Mutex, MutexGuard};
1414

1515
use crate::logger::{log_error, LdkLogger};
1616
use crate::types::Wallet;
17+
use crate::Error;
1718

1819
const BCAST_PACKAGE_QUEUE_SIZE: usize = 50;
1920

21+
/// A package of transactions that LDK handed to the broadcaster in one
22+
/// `broadcast_transactions` call, along with each transaction's type. Queued until the
23+
/// background task classifies and broadcasts it.
24+
pub(crate) type BroadcastPackage = Vec<(Transaction, TransactionType)>;
25+
2026
pub(crate) struct TransactionBroadcaster<L: Deref>
2127
where
2228
L::Target: LdkLogger,
2329
{
24-
queue_sender: mpsc::Sender<Vec<Transaction>>,
25-
queue_receiver: Mutex<mpsc::Receiver<Vec<Transaction>>>,
30+
queue_sender: mpsc::Sender<BroadcastPackage>,
31+
queue_receiver: Mutex<mpsc::Receiver<BroadcastPackage>>,
2632
/// Weak handle to the [`Wallet`] that performs classification of funding broadcasts
2733
/// (channel opens and splices) into payment records. Remains `None` while the
28-
/// builder is wiring the node up, during which broadcasts are still forwarded to
29-
/// the queue but no payment record is written. [`Self::set_wallet`] installs the
30-
/// handle once the [`Wallet`] exists.
34+
/// builder is wiring the node up, during which broadcasts are forwarded to the
35+
/// queue but no payment record is written. [`Self::set_wallet`] installs the handle
36+
/// once the [`Wallet`] exists.
3137
wallet: StdMutex<Option<Weak<Wallet>>>,
3238
logger: L,
3339
}
@@ -55,30 +61,40 @@ where
5561

5662
pub(crate) async fn get_broadcast_queue(
5763
&self,
58-
) -> MutexGuard<'_, mpsc::Receiver<Vec<Transaction>>> {
64+
) -> MutexGuard<'_, mpsc::Receiver<BroadcastPackage>> {
5965
self.queue_receiver.lock().await
6066
}
67+
68+
/// Classifies a queued package into payment records and returns the package ready
69+
/// for the chain client. Returns `Err` if any classification fails; callers must
70+
/// not broadcast the package in that case, since a crash would leave the tx
71+
/// on-chain without a record.
72+
pub(crate) async fn classify_package(
73+
&self, package: BroadcastPackage,
74+
) -> Result<BroadcastPackage, Error> {
75+
let wallet_opt = self.wallet.lock().expect("lock").as_ref().and_then(Weak::upgrade);
76+
if let Some(wallet) = wallet_opt {
77+
tokio::task::spawn_blocking(move || {
78+
for (tx, tx_type) in &package {
79+
wallet.classify_broadcast(tx, tx_type)?;
80+
}
81+
Ok::<_, Error>(package)
82+
})
83+
.await
84+
.map_err(|_| Error::PersistenceFailed)?
85+
} else {
86+
Ok(package)
87+
}
88+
}
6189
}
6290

6391
impl<L: Deref> BroadcasterInterface for TransactionBroadcaster<L>
6492
where
6593
L::Target: LdkLogger,
6694
{
6795
fn broadcast_transactions(&self, txs: &[(&Transaction, TransactionType)]) {
68-
let wallet = self.wallet.lock().expect("lock").as_ref().and_then(Weak::upgrade);
69-
if let Some(wallet) = wallet {
70-
for (tx, tx_type) in txs {
71-
if let Err(e) = wallet.classify_broadcast(tx, tx_type) {
72-
log_error!(
73-
self.logger,
74-
"Failed to classify broadcast tx {}: {:?}",
75-
tx.compute_txid(),
76-
e,
77-
);
78-
}
79-
}
80-
}
81-
let package = txs.iter().map(|(t, _)| (*t).clone()).collect::<Vec<Transaction>>();
96+
let package: BroadcastPackage =
97+
txs.iter().map(|(tx, tx_type)| ((*tx).clone(), tx_type.clone())).collect();
8298
self.queue_sender.try_send(package).unwrap_or_else(|e| {
8399
log_error!(self.logger, "Failed to broadcast transactions: {}", e);
84100
});

0 commit comments

Comments
 (0)