diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 95b83b105ac..888915f43e2 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -622,7 +622,7 @@ impl Hash for SocketDescriptor { mod tests { use bitcoin::constants::ChainHash; use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey}; - use bitcoin::{Network, Txid}; + use bitcoin::Network; use lightning::ln::msgs::*; use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler, PeerManager}; use lightning::ln::types::ChannelId; @@ -632,7 +632,6 @@ mod tests { use tokio::sync::mpsc; - use std::collections::BTreeMap; use std::mem; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; @@ -726,8 +725,7 @@ mod tests { } fn handle_commitment_signed(&self, _their_node_id: PublicKey, _msg: &CommitmentSigned) {} fn handle_commitment_signed_batch( - &self, _their_node_id: PublicKey, _channel_id: ChannelId, - _batch: BTreeMap<Txid, CommitmentSigned>, + &self, _their_node_id: PublicKey, _channel_id: ChannelId, _batch: Vec<CommitmentSigned>, ) { } fn handle_revoke_and_ack(&self, _their_node_id: PublicKey, _msg: &RevokeAndACK) {} diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 7f33937f2dd..dbe890649e7 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -68,7 +68,7 @@ use crate::util::errors::APIError; use crate::util::config::{UserConfig, ChannelConfig, LegacyChannelConfig, ChannelHandshakeConfig, ChannelHandshakeLimits, MaxDustHTLCExposure}; use crate::util::scid_utils::scid_from_parts; -use alloc::collections::BTreeMap; +use alloc::collections::{btree_map, BTreeMap}; use crate::io; use crate::prelude::*; @@ -4978,7 +4978,7 @@ impl<SP: Deref> ChannelContext<SP> where SP::Target: SignerProvider { channel_id: self.channel_id, htlc_signatures: vec![], signature, - batch: None, + funding_txid: funding.get_funding_txo().map(|funding_txo| funding_txo.txid), #[cfg(taproot)] partial_signature_with_nonce: None, }) @@ -5948,10 +5948,6 @@ impl<SP: Deref> FundedChannel<SP> where ))); } - if msg.batch.is_some() { - return Err(ChannelError::close("Peer sent initial commitment_signed with a batch".to_owned())); - } - let holder_commitment_point = &mut self.holder_commitment_point.clone(); self.context.assert_no_commitment_advancement(holder_commitment_point.transaction_number(), "initial commitment_signed"); @@ -5991,18 +5987,35 @@ impl<SP: Deref> FundedChannel<SP> where self.commitment_signed_update_monitor(updates, logger) } - pub fn commitment_signed_batch<L: Deref>(&mut self, batch: &BTreeMap<Txid, msgs::CommitmentSigned>, logger: &L) -> Result<Option<ChannelMonitorUpdate>, ChannelError> + pub fn commitment_signed_batch<L: Deref>(&mut self, batch: Vec<msgs::CommitmentSigned>, logger: &L) -> Result<Option<ChannelMonitorUpdate>, ChannelError> where L::Target: Logger { self.commitment_signed_check_state()?; + let mut messages = BTreeMap::new(); + for msg in batch { + let funding_txid = match msg.funding_txid { + Some(funding_txid) => funding_txid, + None => { + return Err(ChannelError::close("Peer sent batched commitment_signed without a funding_txid".to_string())); + }, + }; + + match messages.entry(funding_txid) { + btree_map::Entry::Vacant(entry) => { entry.insert(msg); }, + btree_map::Entry::Occupied(_) => { + return Err(ChannelError::close(format!("Peer sent batched commitment_signed with duplicate funding_txid {}", funding_txid))); + } + } + } + // Any commitment_signed not associated with a FundingScope is ignored below if a // pending splice transaction has confirmed since receiving the batch. let updates = core::iter::once(&self.funding) .chain(self.pending_funding.iter()) .map(|funding| { let funding_txid = funding.get_funding_txo().unwrap().txid; - let msg = batch + let msg = messages .get(&funding_txid) .ok_or_else(|| ChannelError::close(format!("Peer did not send a commitment_signed for pending splice transaction: {}", funding_txid)))?; self.context @@ -9349,20 +9362,11 @@ impl<SP: Deref> FundedChannel<SP> where } } - let batch = if self.pending_funding.is_empty() { None } else { - Some(msgs::CommitmentSignedBatch { - batch_size: self.pending_funding.len() as u16 + 1, - funding_txid: funding - .get_funding_txo() - .expect("splices should have their funding transactions negotiated before exiting quiescence while un-negotiated splices are discarded on reload") - .txid, - }) - }; Ok(msgs::CommitmentSigned { channel_id: self.context.channel_id, signature, htlc_signatures, - batch, + funding_txid: funding.get_funding_txo().map(|funding_txo| funding_txo.txid), #[cfg(taproot)] partial_signature_with_nonce: None, }) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index b018c6c74cd..bea4275ed01 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -9263,7 +9263,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ } #[rustfmt::skip] - fn internal_commitment_signed_batch(&self, counterparty_node_id: &PublicKey, channel_id: ChannelId, batch: &BTreeMap<Txid, msgs::CommitmentSigned>) -> Result<(), MsgHandleErrInternal> { + fn internal_commitment_signed_batch(&self, counterparty_node_id: &PublicKey, channel_id: ChannelId, batch: Vec<msgs::CommitmentSigned>) -> Result<(), MsgHandleErrInternal> { let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) .ok_or_else(|| { @@ -12330,9 +12330,9 @@ where } #[rustfmt::skip] - fn handle_commitment_signed_batch(&self, counterparty_node_id: PublicKey, channel_id: ChannelId, batch: BTreeMap<Txid, msgs::CommitmentSigned>) { + fn handle_commitment_signed_batch(&self, counterparty_node_id: PublicKey, channel_id: ChannelId, batch: Vec<msgs::CommitmentSigned>) { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let _ = handle_error!(self, self.internal_commitment_signed_batch(&counterparty_node_id, channel_id, &batch), counterparty_node_id); + let _ = handle_error!(self, self.internal_commitment_signed_batch(&counterparty_node_id, channel_id, batch), counterparty_node_id); } #[rustfmt::skip] diff --git a/lightning/src/ln/dual_funding_tests.rs b/lightning/src/ln/dual_funding_tests.rs index 6a7ef317ba8..ed770d06e6d 100644 --- a/lightning/src/ln/dual_funding_tests.rs +++ b/lightning/src/ln/dual_funding_tests.rs @@ -185,7 +185,7 @@ fn do_test_v2_channel_establishment(session: V2ChannelEstablishmentTestSession) ) .unwrap(), htlc_signatures: vec![], - batch: None, + funding_txid: None, #[cfg(taproot)] partial_signature_with_nonce: None, }; diff --git a/lightning/src/ln/htlc_reserve_unit_tests.rs b/lightning/src/ln/htlc_reserve_unit_tests.rs index aee764682a2..111e0b404de 100644 --- a/lightning/src/ln/htlc_reserve_unit_tests.rs +++ b/lightning/src/ln/htlc_reserve_unit_tests.rs @@ -899,7 +899,7 @@ pub fn test_fee_spike_violation_fails_htlc() { channel_id: chan.2, signature: res.0, htlc_signatures: res.1, - batch: None, + funding_txid: None, #[cfg(taproot)] partial_signature_with_nonce: None, }; diff --git a/lightning/src/ln/msgs.rs b/lightning/src/ln/msgs.rs index 6f452836c15..dcafc27d482 100644 --- a/lightning/src/ln/msgs.rs +++ b/lightning/src/ln/msgs.rs @@ -47,8 +47,6 @@ use crate::types::payment::{PaymentHash, PaymentPreimage, PaymentSecret}; #[allow(unused_imports)] use crate::prelude::*; -use alloc::collections::BTreeMap; - use crate::io::{self, Cursor, Read}; use crate::io_extras::read_to_end; use core::fmt; @@ -686,6 +684,20 @@ pub struct ClosingSigned { pub fee_range: Option<ClosingSignedFeeRange>, } +/// A [`start_batch`] message to be sent to group together multiple channel messages as a single +/// logical message. +/// +/// [`start_batch`]: https://github.com/lightning/bolts/blob/master/02-peer-protocol.md#batching-channel-messages +#[derive(Clone, Debug, Hash, PartialEq, Eq)] +pub struct StartBatch { + /// The channel ID of all messages in the batch. + pub channel_id: ChannelId, + /// The number of messages to follow. + pub batch_size: u16, + /// The type of all messages expected in the batch. + pub message_type: Option<u16>, +} + /// An [`update_add_htlc`] message to be sent to or received from a peer. /// /// [`update_add_htlc`]: https://github.com/lightning/bolts/blob/master/02-peer-protocol.md#adding-an-htlc-update_add_htlc @@ -795,15 +807,6 @@ pub struct UpdateFailMalformedHTLC { pub failure_code: u16, } -/// Optional batch parameters for `commitment_signed` message. -#[derive(Clone, Debug, Hash, PartialEq, Eq)] -pub struct CommitmentSignedBatch { - /// Batch size N: all N `commitment_signed` messages must be received before being processed - pub batch_size: u16, - /// The funding transaction, to discriminate among multiple pending funding transactions (e.g. in case of splicing) - pub funding_txid: Txid, -} - /// A [`commitment_signed`] message to be sent to or received from a peer. /// /// [`commitment_signed`]: https://github.com/lightning/bolts/blob/master/02-peer-protocol.md#committing-updates-so-far-commitment_signed @@ -815,8 +818,8 @@ pub struct CommitmentSigned { pub signature: Signature, /// Signatures on the HTLC transactions pub htlc_signatures: Vec<Signature>, - /// Optional batch size and other parameters - pub batch: Option<CommitmentSignedBatch>, + /// The funding transaction, to discriminate among multiple pending funding transactions (e.g. in case of splicing) + pub funding_txid: Option<Txid>, #[cfg(taproot)] /// The partial Taproot signature on the commitment transaction pub partial_signature_with_nonce: Option<PartialSignatureWithNonce>, @@ -1962,8 +1965,7 @@ pub trait ChannelMessageHandler: BaseMessageHandler { fn handle_commitment_signed(&self, their_node_id: PublicKey, msg: &CommitmentSigned); /// Handle a batch of incoming `commitment_signed` message from the given peer. fn handle_commitment_signed_batch( - &self, their_node_id: PublicKey, channel_id: ChannelId, - batch: BTreeMap<Txid, CommitmentSigned>, + &self, their_node_id: PublicKey, channel_id: ChannelId, batch: Vec<CommitmentSigned>, ); /// Handle an incoming `revoke_and_ack` message from the given peer. fn handle_revoke_and_ack(&self, their_node_id: PublicKey, msg: &RevokeAndACK); @@ -1974,19 +1976,10 @@ pub trait ChannelMessageHandler: BaseMessageHandler { ) { assert!(!batch.is_empty()); if batch.len() == 1 { - assert!(batch[0].batch.is_none()); self.handle_commitment_signed(their_node_id, &batch[0]); } else { let channel_id = batch[0].channel_id; - let batch: BTreeMap<Txid, CommitmentSigned> = batch - .iter() - .cloned() - .map(|mut cs| { - let funding_txid = cs.batch.take().unwrap().funding_txid; - (funding_txid, cs) - }) - .collect(); - self.handle_commitment_signed_batch(their_node_id, channel_id, batch); + self.handle_commitment_signed_batch(their_node_id, channel_id, batch.clone()); } } @@ -2756,18 +2749,14 @@ impl_writeable!(ClosingSignedFeeRange, { max_fee_satoshis }); -impl_writeable_msg!(CommitmentSignedBatch, { - batch_size, - funding_txid, -}, {}); - #[cfg(not(taproot))] impl_writeable_msg!(CommitmentSigned, { channel_id, signature, htlc_signatures }, { - (0, batch, option), + // TOOD(splicing): Change this to 1 once the spec is finalized + (1001, funding_txid, option), }); #[cfg(taproot)] @@ -2776,8 +2765,9 @@ impl_writeable_msg!(CommitmentSigned, { signature, htlc_signatures }, { - (0, batch, option), (2, partial_signature_with_nonce, option), + // TOOD(splicing): Change this to 1 and reorder once the spec is finalized + (1001, funding_txid, option), }); impl_writeable!(DecodedOnionErrorPacket, { @@ -3097,6 +3087,13 @@ impl_writeable_msg!(PeerStorage, { data }, {}); impl_writeable_msg!(PeerStorageRetrieval, { data }, {}); +impl_writeable_msg!(StartBatch, { + channel_id, + batch_size +}, { + (1, message_type, option) +}); + // Note that this is written as a part of ChannelManager objects, and thus cannot change its // serialization format in a way which assumes we know the total serialized length/message end // position. @@ -5632,13 +5629,10 @@ mod tests { channel_id: ChannelId::from_bytes([2; 32]), signature: sig_1, htlc_signatures: if htlcs { vec![sig_2, sig_3, sig_4] } else { Vec::new() }, - batch: Some(msgs::CommitmentSignedBatch { - batch_size: 3, - funding_txid: Txid::from_str( - "c2d4449afa8d26140898dd54d3390b057ba2a5afcf03ba29d7dc0d8b9ffe966e", - ) - .unwrap(), - }), + funding_txid: Some( + Txid::from_str("c2d4449afa8d26140898dd54d3390b057ba2a5afcf03ba29d7dc0d8b9ffe966e") + .unwrap(), + ), #[cfg(taproot)] partial_signature_with_nonce: None, }; @@ -5649,7 +5643,9 @@ mod tests { } else { target_value += "0000"; } - target_value += "002200036e96fe9f8b0ddcd729ba03cfafa5a27b050b39d354dd980814268dfa9a44d4c2"; // batch + target_value += "fd03e9"; // Type (funding_txid) + target_value += "20"; // Length (funding_txid) + target_value += "6e96fe9f8b0ddcd729ba03cfafa5a27b050b39d354dd980814268dfa9a44d4c2"; // Value assert_eq!(encoded_value.as_hex().to_string(), target_value); } diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 5c3bfd48d55..92fc73d64f9 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -17,7 +17,6 @@ //! call into the provided message handlers (probably a ChannelManager and P2PGossipSync) with //! messages they should handle, and encoding/sending response messages. -use bitcoin::Txid; use bitcoin::constants::ChainHash; use bitcoin::secp256k1::{self, Secp256k1, SecretKey, PublicKey}; @@ -44,8 +43,6 @@ use crate::util::string::PrintableString; #[allow(unused_imports)] use crate::prelude::*; -use alloc::collections::{btree_map, BTreeMap}; - use crate::io; use crate::sync::{Mutex, MutexGuard, FairRwLock}; use core::sync::atomic::{AtomicBool, AtomicU32, AtomicI32, Ordering}; @@ -337,8 +334,7 @@ impl ChannelMessageHandler for ErroringMessageHandler { ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); } fn handle_commitment_signed_batch( - &self, their_node_id: PublicKey, channel_id: ChannelId, - _batch: BTreeMap<Txid, msgs::CommitmentSigned>, + &self, their_node_id: PublicKey, channel_id: ChannelId, _batch: Vec<msgs::CommitmentSigned>, ) { ErroringMessageHandler::push_error(self, their_node_id, channel_id); } @@ -539,6 +535,24 @@ enum InitSyncTracker{ NodesSyncing(NodeId), } +/// A batch of messages initiated when receiving a `start_batch` message. +struct MessageBatch { + /// The channel associated with all the messages in the batch. + channel_id: ChannelId, + + /// The number of messages expected to be in the batch. + batch_size: usize, + + /// The batch of messages, which should all be of the same type. + messages: MessageBatchImpl, +} + +/// The representation of the message batch, which may different for each message type. +enum MessageBatchImpl { + /// A batch of `commitment_signed` messages used when there are pending splices. + CommitmentSigned(Vec<msgs::CommitmentSigned>), +} + /// The ratio between buffer sizes at which we stop sending initial sync messages vs when we stop /// forwarding gossip messages to peers altogether. const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO: usize = 2; @@ -620,7 +634,7 @@ struct Peer { inbound_connection: bool, - commitment_signed_batch: Option<(ChannelId, BTreeMap<Txid, msgs::CommitmentSigned>)>, + message_batch: Option<MessageBatch>, } impl Peer { @@ -873,7 +887,7 @@ pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: D enum LogicalMessage<T: core::fmt::Debug + wire::Type + wire::TestEq> { FromWire(wire::Message<T>), - CommitmentSignedBatch(ChannelId, BTreeMap<Txid, msgs::CommitmentSigned>), + CommitmentSignedBatch(ChannelId, Vec<msgs::CommitmentSigned>), } enum MessageHandlingError { @@ -1159,7 +1173,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM received_channel_announce_since_backlogged: false, inbound_connection: false, - commitment_signed_batch: None, + message_batch: None, })); Ok(res) } @@ -1217,7 +1231,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM received_channel_announce_since_backlogged: false, inbound_connection: true, - commitment_signed_batch: None, + message_batch: None, })); Ok(()) } @@ -1772,46 +1786,120 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM // During splicing, commitment_signed messages need to be collected into a single batch // before they are handled. - if let wire::Message::CommitmentSigned(msg) = message { - if let Some(ref batch) = msg.batch { - let (channel_id, buffer) = peer_lock - .commitment_signed_batch - .get_or_insert_with(|| (msg.channel_id, BTreeMap::new())); + if let wire::Message::StartBatch(msg) = message { + if peer_lock.message_batch.is_some() { + let error = format!("Peer {} sent start_batch for channel {} before previous batch completed", log_pubkey!(their_node_id), &msg.channel_id); + log_debug!(logger, "{}", error); + return Err(LightningError { + err: error.clone(), + action: msgs::ErrorAction::DisconnectPeerWithWarning { + msg: msgs::WarningMessage { + channel_id: msg.channel_id, + data: error, + }, + }, + }.into()); + } - if msg.channel_id != *channel_id { - log_debug!(logger, "Peer {} sent batched commitment_signed for the wrong channel (expected: {}, actual: {})", log_pubkey!(their_node_id), channel_id, &msg.channel_id); - return Err(PeerHandleError { }.into()); - } + let batch_size = msg.batch_size as usize; + if batch_size <= 1 { + let error = format!("Peer {} sent start_batch for channel {} not strictly greater than 1", log_pubkey!(their_node_id), &msg.channel_id); + log_debug!(logger, "{}", error); + return Err(LightningError { + err: error.clone(), + action: msgs::ErrorAction::SendWarningMessage { + msg: msgs::WarningMessage { + channel_id: msg.channel_id, + data: error, + }, + log_level: Level::Debug, + }, + }.into()); + } - const COMMITMENT_SIGNED_BATCH_LIMIT: usize = 100; - if buffer.len() == COMMITMENT_SIGNED_BATCH_LIMIT { - log_debug!(logger, "Peer {} sent batched commitment_signed for channel {} exceeding the limit", log_pubkey!(their_node_id), channel_id); - return Err(PeerHandleError { }.into()); - } + const COMMITMENT_SIGNED_BATCH_LIMIT: usize = 20; + if batch_size > COMMITMENT_SIGNED_BATCH_LIMIT { + let error = format!("Peer {} sent start_batch for channel {} exceeding the limit", log_pubkey!(their_node_id), &msg.channel_id); + log_debug!(logger, "{}", error); + return Err(LightningError { + err: error.clone(), + action: msgs::ErrorAction::DisconnectPeerWithWarning { + msg: msgs::WarningMessage { + channel_id: msg.channel_id, + data: error, + }, + }, + }.into()); + } - let batch_size = batch.batch_size as usize; - match buffer.entry(batch.funding_txid) { - btree_map::Entry::Vacant(entry) => { entry.insert(msg); }, - btree_map::Entry::Occupied(_) => { - log_debug!(logger, "Peer {} sent batched commitment_signed with duplicate funding_txid {} for channel {}", log_pubkey!(their_node_id), channel_id, &batch.funding_txid); - return Err(PeerHandleError { }.into()); - } + let messages = match msg.message_type { + Some(message_type) if message_type == msgs::CommitmentSigned::TYPE => { + let messages = Vec::with_capacity(batch_size); + MessageBatchImpl::CommitmentSigned(messages) + }, + _ => { + let error = format!("Peer {} sent start_batch for channel {} without a known message type", log_pubkey!(their_node_id), &msg.channel_id); + log_debug!(logger, "{}", error); + return Err(LightningError { + err: error.clone(), + action: msgs::ErrorAction::DisconnectPeerWithWarning { + msg: msgs::WarningMessage { + channel_id: msg.channel_id, + data: error, + }, + }, + }.into()); + }, + }; + + let message_batch = MessageBatch { + channel_id: msg.channel_id, + batch_size, + messages, + }; + peer_lock.message_batch = Some(message_batch); + + return Ok(None); + } + + if let wire::Message::CommitmentSigned(msg) = message { + if let Some(message_batch) = &mut peer_lock.message_batch { + let MessageBatchImpl::CommitmentSigned(ref mut messages) = &mut message_batch.messages; + + if msg.channel_id != message_batch.channel_id { + let error = format!("Peer {} sent batched commitment_signed for the wrong channel (expected: {}, actual: {})", log_pubkey!(their_node_id), message_batch.channel_id, &msg.channel_id); + log_debug!(logger, "{}", error); + return Err(LightningError { + err: error.clone(), + action: msgs::ErrorAction::DisconnectPeerWithWarning { + msg: msgs::WarningMessage { + channel_id: msg.channel_id, + data: error, + }, + }, + }.into()); } - if buffer.len() >= batch_size { - let (channel_id, batch) = peer_lock.commitment_signed_batch.take().expect("batch should have been inserted"); + messages.push(msg); + + if messages.len() == message_batch.batch_size { + let MessageBatch { channel_id, batch_size: _, messages } = peer_lock.message_batch.take().expect("batch should have been inserted"); + let MessageBatchImpl::CommitmentSigned(batch) = messages; + return Ok(Some(LogicalMessage::CommitmentSignedBatch(channel_id, batch))); } else { return Ok(None); } - } else if peer_lock.commitment_signed_batch.is_some() { - log_debug!(logger, "Peer {} sent non-batched commitment_signed for channel {} when expecting batched commitment_signed", log_pubkey!(their_node_id), &msg.channel_id); - return Err(PeerHandleError { }.into()); } else { return Ok(Some(LogicalMessage::FromWire(wire::Message::CommitmentSigned(msg)))); } - } else if peer_lock.commitment_signed_batch.is_some() { - log_debug!(logger, "Peer {} sent non-commitment_signed message when expecting batched commitment_signed", log_pubkey!(their_node_id)); + } else if let Some(message_batch) = &peer_lock.message_batch { + match message_batch.messages { + MessageBatchImpl::CommitmentSigned(_) => { + log_debug!(logger, "Peer {} sent an unexpected message for a commitment_signed batch", log_pubkey!(their_node_id)); + }, + } + return Err(PeerHandleError { }.into()); } @@ -1901,6 +1989,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM }, // Channel messages: + wire::Message::StartBatch(_msg) => { + debug_assert!(false); + }, wire::Message::OpenChannel(msg) => { self.message_handler.chan_handler.handle_open_channel(their_node_id, &msg); }, @@ -2404,6 +2495,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM if let &Some(ref msg) = update_fee { self.enqueue_message(&mut *peer, msg); } + if commitment_signed.len() > 1 { + let msg = msgs::StartBatch { + channel_id: *channel_id, + batch_size: commitment_signed.len() as u16, + message_type: Some(msgs::CommitmentSigned::TYPE), + }; + self.enqueue_message(&mut *peer, &msg); + } for msg in commitment_signed { self.enqueue_message(&mut *peer, msg); } diff --git a/lightning/src/ln/update_fee_tests.rs b/lightning/src/ln/update_fee_tests.rs index 1512444c830..27a1035e639 100644 --- a/lightning/src/ln/update_fee_tests.rs +++ b/lightning/src/ln/update_fee_tests.rs @@ -520,7 +520,7 @@ pub fn do_test_update_fee_that_funder_cannot_afford(channel_type_features: Chann channel_id: chan.2, signature: res.0, htlc_signatures: res.1, - batch: None, + funding_txid: None, #[cfg(taproot)] partial_signature_with_nonce: None, }; @@ -621,7 +621,7 @@ pub fn test_update_fee_that_saturates_subs() { channel_id: chan_id, signature: res.0, htlc_signatures: res.1, - batch: None, + funding_txid: None, #[cfg(taproot)] partial_signature_with_nonce: None, }; diff --git a/lightning/src/ln/wire.rs b/lightning/src/ln/wire.rs index 1bb7c7448a8..2dc54f852b5 100644 --- a/lightning/src/ln/wire.rs +++ b/lightning/src/ln/wire.rs @@ -82,6 +82,7 @@ pub(crate) enum Message<T: core::fmt::Debug + Type + TestEq> { Shutdown(msgs::Shutdown), ClosingSigned(msgs::ClosingSigned), OnionMessage(msgs::OnionMessage), + StartBatch(msgs::StartBatch), UpdateAddHTLC(msgs::UpdateAddHTLC), UpdateFulfillHTLC(msgs::UpdateFulfillHTLC), UpdateFailHTLC(msgs::UpdateFailHTLC), @@ -142,6 +143,7 @@ impl<T: core::fmt::Debug + Type + TestEq> Writeable for Message<T> { &Message::Shutdown(ref msg) => msg.write(writer), &Message::ClosingSigned(ref msg) => msg.write(writer), &Message::OnionMessage(ref msg) => msg.write(writer), + &Message::StartBatch(ref msg) => msg.write(writer), &Message::UpdateAddHTLC(ref msg) => msg.write(writer), &Message::UpdateFulfillHTLC(ref msg) => msg.write(writer), &Message::UpdateFailHTLC(ref msg) => msg.write(writer), @@ -202,6 +204,7 @@ impl<T: core::fmt::Debug + Type + TestEq> Type for Message<T> { &Message::Shutdown(ref msg) => msg.type_id(), &Message::ClosingSigned(ref msg) => msg.type_id(), &Message::OnionMessage(ref msg) => msg.type_id(), + &Message::StartBatch(ref msg) => msg.type_id(), &Message::UpdateAddHTLC(ref msg) => msg.type_id(), &Message::UpdateFulfillHTLC(ref msg) => msg.type_id(), &Message::UpdateFailHTLC(ref msg) => msg.type_id(), @@ -350,6 +353,9 @@ where msgs::OnionMessage::TYPE => { Ok(Message::OnionMessage(LengthReadable::read_from_fixed_length_buffer(buffer)?)) }, + msgs::StartBatch::TYPE => { + Ok(Message::StartBatch(LengthReadable::read_from_fixed_length_buffer(buffer)?)) + }, msgs::UpdateAddHTLC::TYPE => { Ok(Message::UpdateAddHTLC(LengthReadable::read_from_fixed_length_buffer(buffer)?)) }, @@ -590,6 +596,10 @@ impl Encode for msgs::OnionMessage { const TYPE: u16 = 513; } +impl Encode for msgs::StartBatch { + const TYPE: u16 = 127; +} + impl Encode for msgs::UpdateAddHTLC { const TYPE: u16 = 128; } diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 00b85cc1ef8..a6f0b38a4fb 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -79,8 +79,6 @@ use bitcoin::secp256k1::{self, PublicKey, Scalar, Secp256k1, SecretKey}; use lightning_invoice::RawBolt11Invoice; -use alloc::collections::BTreeMap; - use crate::io; use crate::prelude::*; use crate::sign::{EntropySource, NodeSigner, RandomBytes, Recipient, SignerProvider}; @@ -1057,7 +1055,7 @@ impl msgs::ChannelMessageHandler for TestChannelMessageHandler { } fn handle_commitment_signed_batch( &self, _their_node_id: PublicKey, _channel_id: ChannelId, - _batch: BTreeMap<Txid, msgs::CommitmentSigned>, + _batch: Vec<msgs::CommitmentSigned>, ) { unreachable!() }