Skip to content

Implement start_batch message batching #3793

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 8, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions lightning-net-tokio/src/lib.rs
Original file line number Diff line number Diff line change
@@ -622,7 +622,7 @@ impl Hash for SocketDescriptor {
mod tests {
use bitcoin::constants::ChainHash;
use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
use bitcoin::{Network, Txid};
use bitcoin::Network;
use lightning::ln::msgs::*;
use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler, PeerManager};
use lightning::ln::types::ChannelId;
@@ -632,7 +632,6 @@ mod tests {

use tokio::sync::mpsc;

use std::collections::BTreeMap;
use std::mem;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
@@ -726,8 +725,7 @@ mod tests {
}
fn handle_commitment_signed(&self, _their_node_id: PublicKey, _msg: &CommitmentSigned) {}
fn handle_commitment_signed_batch(
&self, _their_node_id: PublicKey, _channel_id: ChannelId,
_batch: BTreeMap<Txid, CommitmentSigned>,
&self, _their_node_id: PublicKey, _channel_id: ChannelId, _batch: Vec<CommitmentSigned>,
) {
}
fn handle_revoke_and_ack(&self, _their_node_id: PublicKey, _msg: &RevokeAndACK) {}
40 changes: 22 additions & 18 deletions lightning/src/ln/channel.rs
Original file line number Diff line number Diff line change
@@ -68,7 +68,7 @@ use crate::util::errors::APIError;
use crate::util::config::{UserConfig, ChannelConfig, LegacyChannelConfig, ChannelHandshakeConfig, ChannelHandshakeLimits, MaxDustHTLCExposure};
use crate::util::scid_utils::scid_from_parts;

use alloc::collections::BTreeMap;
use alloc::collections::{btree_map, BTreeMap};

use crate::io;
use crate::prelude::*;
@@ -4978,7 +4978,7 @@ impl<SP: Deref> ChannelContext<SP> where SP::Target: SignerProvider {
channel_id: self.channel_id,
htlc_signatures: vec![],
signature,
batch: None,
funding_txid: funding.get_funding_txo().map(|funding_txo| funding_txo.txid),
#[cfg(taproot)]
partial_signature_with_nonce: None,
})
@@ -5948,10 +5948,6 @@ impl<SP: Deref> FundedChannel<SP> where
)));
}

if msg.batch.is_some() {
return Err(ChannelError::close("Peer sent initial commitment_signed with a batch".to_owned()));
}

let holder_commitment_point = &mut self.holder_commitment_point.clone();
self.context.assert_no_commitment_advancement(holder_commitment_point.transaction_number(), "initial commitment_signed");

@@ -5991,18 +5987,35 @@ impl<SP: Deref> FundedChannel<SP> where
self.commitment_signed_update_monitor(updates, logger)
}

pub fn commitment_signed_batch<L: Deref>(&mut self, batch: &BTreeMap<Txid, msgs::CommitmentSigned>, logger: &L) -> Result<Option<ChannelMonitorUpdate>, ChannelError>
pub fn commitment_signed_batch<L: Deref>(&mut self, batch: Vec<msgs::CommitmentSigned>, logger: &L) -> Result<Option<ChannelMonitorUpdate>, ChannelError>
where L::Target: Logger
{
self.commitment_signed_check_state()?;

let mut messages = BTreeMap::new();
for msg in batch {
let funding_txid = match msg.funding_txid {
Some(funding_txid) => funding_txid,
None => {
return Err(ChannelError::close("Peer sent batched commitment_signed without a funding_txid".to_string()));
},
};

match messages.entry(funding_txid) {
btree_map::Entry::Vacant(entry) => { entry.insert(msg); },
btree_map::Entry::Occupied(_) => {
return Err(ChannelError::close(format!("Peer sent batched commitment_signed with duplicate funding_txid {}", funding_txid)));
}
}
}

// Any commitment_signed not associated with a FundingScope is ignored below if a
// pending splice transaction has confirmed since receiving the batch.
let updates = core::iter::once(&self.funding)
.chain(self.pending_funding.iter())
.map(|funding| {
let funding_txid = funding.get_funding_txo().unwrap().txid;
let msg = batch
let msg = messages
.get(&funding_txid)
.ok_or_else(|| ChannelError::close(format!("Peer did not send a commitment_signed for pending splice transaction: {}", funding_txid)))?;
self.context
@@ -9349,20 +9362,11 @@ impl<SP: Deref> FundedChannel<SP> where
}
}

let batch = if self.pending_funding.is_empty() { None } else {
Some(msgs::CommitmentSignedBatch {
batch_size: self.pending_funding.len() as u16 + 1,
funding_txid: funding
.get_funding_txo()
.expect("splices should have their funding transactions negotiated before exiting quiescence while un-negotiated splices are discarded on reload")
.txid,
})
};
Ok(msgs::CommitmentSigned {
channel_id: self.context.channel_id,
signature,
htlc_signatures,
batch,
funding_txid: funding.get_funding_txo().map(|funding_txo| funding_txo.txid),
#[cfg(taproot)]
partial_signature_with_nonce: None,
})
6 changes: 3 additions & 3 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
@@ -9263,7 +9263,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/
}

#[rustfmt::skip]
fn internal_commitment_signed_batch(&self, counterparty_node_id: &PublicKey, channel_id: ChannelId, batch: &BTreeMap<Txid, msgs::CommitmentSigned>) -> Result<(), MsgHandleErrInternal> {
fn internal_commitment_signed_batch(&self, counterparty_node_id: &PublicKey, channel_id: ChannelId, batch: Vec<msgs::CommitmentSigned>) -> Result<(), MsgHandleErrInternal> {
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex = per_peer_state.get(counterparty_node_id)
.ok_or_else(|| {
@@ -12330,9 +12330,9 @@ where
}

#[rustfmt::skip]
fn handle_commitment_signed_batch(&self, counterparty_node_id: PublicKey, channel_id: ChannelId, batch: BTreeMap<Txid, msgs::CommitmentSigned>) {
fn handle_commitment_signed_batch(&self, counterparty_node_id: PublicKey, channel_id: ChannelId, batch: Vec<msgs::CommitmentSigned>) {
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
let _ = handle_error!(self, self.internal_commitment_signed_batch(&counterparty_node_id, channel_id, &batch), counterparty_node_id);
let _ = handle_error!(self, self.internal_commitment_signed_batch(&counterparty_node_id, channel_id, batch), counterparty_node_id);
}

#[rustfmt::skip]
2 changes: 1 addition & 1 deletion lightning/src/ln/dual_funding_tests.rs
Original file line number Diff line number Diff line change
@@ -185,7 +185,7 @@ fn do_test_v2_channel_establishment(session: V2ChannelEstablishmentTestSession)
)
.unwrap(),
htlc_signatures: vec![],
batch: None,
funding_txid: None,
#[cfg(taproot)]
partial_signature_with_nonce: None,
};
2 changes: 1 addition & 1 deletion lightning/src/ln/htlc_reserve_unit_tests.rs
Original file line number Diff line number Diff line change
@@ -899,7 +899,7 @@ pub fn test_fee_spike_violation_fails_htlc() {
channel_id: chan.2,
signature: res.0,
htlc_signatures: res.1,
batch: None,
funding_txid: None,
#[cfg(taproot)]
partial_signature_with_nonce: None,
};
76 changes: 36 additions & 40 deletions lightning/src/ln/msgs.rs
Original file line number Diff line number Diff line change
@@ -47,8 +47,6 @@ use crate::types::payment::{PaymentHash, PaymentPreimage, PaymentSecret};
#[allow(unused_imports)]
use crate::prelude::*;

use alloc::collections::BTreeMap;

use crate::io::{self, Cursor, Read};
use crate::io_extras::read_to_end;
use core::fmt;
@@ -686,6 +684,20 @@ pub struct ClosingSigned {
pub fee_range: Option<ClosingSignedFeeRange>,
}

/// A [`start_batch`] message to be sent to group together multiple channel messages as a single
/// logical message.
///
/// [`start_batch`]: https://github.com/lightning/bolts/blob/master/02-peer-protocol.md#batching-channel-messages
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct StartBatch {
/// The channel ID of all messages in the batch.
pub channel_id: ChannelId,
/// The number of messages to follow.
pub batch_size: u16,
/// The type of all messages expected in the batch.
pub message_type: Option<u16>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this an Option if we require it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spec doesn't spell out the requirements yet, but I assumed we'd need to send a warning before disconnecting. Also allows for additional logging.

}

/// An [`update_add_htlc`] message to be sent to or received from a peer.
///
/// [`update_add_htlc`]: https://github.com/lightning/bolts/blob/master/02-peer-protocol.md#adding-an-htlc-update_add_htlc
@@ -795,15 +807,6 @@ pub struct UpdateFailMalformedHTLC {
pub failure_code: u16,
}

/// Optional batch parameters for `commitment_signed` message.
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct CommitmentSignedBatch {
/// Batch size N: all N `commitment_signed` messages must be received before being processed
pub batch_size: u16,
/// The funding transaction, to discriminate among multiple pending funding transactions (e.g. in case of splicing)
pub funding_txid: Txid,
}

/// A [`commitment_signed`] message to be sent to or received from a peer.
///
/// [`commitment_signed`]: https://github.com/lightning/bolts/blob/master/02-peer-protocol.md#committing-updates-so-far-commitment_signed
@@ -815,8 +818,8 @@ pub struct CommitmentSigned {
pub signature: Signature,
/// Signatures on the HTLC transactions
pub htlc_signatures: Vec<Signature>,
/// Optional batch size and other parameters
pub batch: Option<CommitmentSignedBatch>,
/// The funding transaction, to discriminate among multiple pending funding transactions (e.g. in case of splicing)
pub funding_txid: Option<Txid>,
#[cfg(taproot)]
/// The partial Taproot signature on the commitment transaction
pub partial_signature_with_nonce: Option<PartialSignatureWithNonce>,
@@ -1962,8 +1965,7 @@ pub trait ChannelMessageHandler: BaseMessageHandler {
fn handle_commitment_signed(&self, their_node_id: PublicKey, msg: &CommitmentSigned);
/// Handle a batch of incoming `commitment_signed` message from the given peer.
fn handle_commitment_signed_batch(
&self, their_node_id: PublicKey, channel_id: ChannelId,
batch: BTreeMap<Txid, CommitmentSigned>,
&self, their_node_id: PublicKey, channel_id: ChannelId, batch: Vec<CommitmentSigned>,
);
/// Handle an incoming `revoke_and_ack` message from the given peer.
fn handle_revoke_and_ack(&self, their_node_id: PublicKey, msg: &RevokeAndACK);
@@ -1974,19 +1976,10 @@ pub trait ChannelMessageHandler: BaseMessageHandler {
) {
assert!(!batch.is_empty());
if batch.len() == 1 {
assert!(batch[0].batch.is_none());
self.handle_commitment_signed(their_node_id, &batch[0]);
} else {
let channel_id = batch[0].channel_id;
let batch: BTreeMap<Txid, CommitmentSigned> = batch
.iter()
.cloned()
.map(|mut cs| {
let funding_txid = cs.batch.take().unwrap().funding_txid;
(funding_txid, cs)
})
.collect();
self.handle_commitment_signed_batch(their_node_id, channel_id, batch);
self.handle_commitment_signed_batch(their_node_id, channel_id, batch.clone());
}
}

@@ -2756,18 +2749,14 @@ impl_writeable!(ClosingSignedFeeRange, {
max_fee_satoshis
});

impl_writeable_msg!(CommitmentSignedBatch, {
batch_size,
funding_txid,
}, {});

#[cfg(not(taproot))]
impl_writeable_msg!(CommitmentSigned, {
channel_id,
signature,
htlc_signatures
}, {
(0, batch, option),
// TOOD(splicing): Change this to 1 once the spec is finalized
(1001, funding_txid, option),
});

#[cfg(taproot)]
@@ -2776,8 +2765,9 @@ impl_writeable_msg!(CommitmentSigned, {
signature,
htlc_signatures
}, {
(0, batch, option),
(2, partial_signature_with_nonce, option),
// TOOD(splicing): Change this to 1 and reorder once the spec is finalized
(1001, funding_txid, option),
});

impl_writeable!(DecodedOnionErrorPacket, {
@@ -3097,6 +3087,13 @@ impl_writeable_msg!(PeerStorage, { data }, {});

impl_writeable_msg!(PeerStorageRetrieval, { data }, {});

impl_writeable_msg!(StartBatch, {
channel_id,
batch_size
}, {
(1, message_type, option)
});

// Note that this is written as a part of ChannelManager objects, and thus cannot change its
// serialization format in a way which assumes we know the total serialized length/message end
// position.
@@ -5632,13 +5629,10 @@ mod tests {
channel_id: ChannelId::from_bytes([2; 32]),
signature: sig_1,
htlc_signatures: if htlcs { vec![sig_2, sig_3, sig_4] } else { Vec::new() },
batch: Some(msgs::CommitmentSignedBatch {
batch_size: 3,
funding_txid: Txid::from_str(
"c2d4449afa8d26140898dd54d3390b057ba2a5afcf03ba29d7dc0d8b9ffe966e",
)
.unwrap(),
}),
funding_txid: Some(
Txid::from_str("c2d4449afa8d26140898dd54d3390b057ba2a5afcf03ba29d7dc0d8b9ffe966e")
.unwrap(),
),
#[cfg(taproot)]
partial_signature_with_nonce: None,
};
@@ -5649,7 +5643,9 @@ mod tests {
} else {
target_value += "0000";
}
target_value += "002200036e96fe9f8b0ddcd729ba03cfafa5a27b050b39d354dd980814268dfa9a44d4c2"; // batch
target_value += "fd03e9"; // Type (funding_txid)
target_value += "20"; // Length (funding_txid)
target_value += "6e96fe9f8b0ddcd729ba03cfafa5a27b050b39d354dd980814268dfa9a44d4c2"; // Value
assert_eq!(encoded_value.as_hex().to_string(), target_value);
}

Loading