Skip to content

Commit

Permalink
Introduce flow usage in channelmanager
Browse files Browse the repository at this point in the history
  • Loading branch information
shaavan committed Mar 5, 2025
1 parent 9658d90 commit 286ccd7
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 109 deletions.
131 changes: 28 additions & 103 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1752,6 +1752,7 @@ where
/// # tx_broadcaster: &dyn lightning::chain::chaininterface::BroadcasterInterface,
/// # router: &lightning::routing::router::DefaultRouter<&NetworkGraph<&'a L>, &'a L, &ES, &S, SP, SL>,
/// # message_router: &lightning::onion_message::messenger::DefaultMessageRouter<&NetworkGraph<&'a L>, &'a L, &ES>,
/// # flow: &lightning::offers::flow::OffersMessageFlow<&ES, &lightning::onion_message::messenger::DefaultMessageRouter<&NetworkGraph<&'a L>, &'a L, &ES>>,
/// # logger: &L,
/// # entropy_source: &ES,
/// # node_signer: &dyn lightning::sign::NodeSigner,
Expand All @@ -1767,15 +1768,15 @@ where
/// };
/// let default_config = UserConfig::default();
/// let channel_manager = ChannelManager::new(
/// fee_estimator, chain_monitor, tx_broadcaster, router, message_router, logger,
/// fee_estimator, chain_monitor, tx_broadcaster, router, message_router, flow, logger,
/// entropy_source, node_signer, signer_provider, default_config, params, current_timestamp,
/// );
///
/// // Restart from deserialized data
/// let mut channel_monitors = read_channel_monitors();
/// let args = ChannelManagerReadArgs::new(
/// entropy_source, node_signer, signer_provider, fee_estimator, chain_monitor, tx_broadcaster,
/// router, message_router, logger, default_config, channel_monitors.iter().collect(),
/// router, message_router, flow, logger, default_config, channel_monitors.iter().collect(),
/// );
/// let (block_hash, channel_manager) =
/// <(BlockHash, ChannelManager<_, _, _, _, _, _, _, _, _, _>)>::read(&mut reader, args)?;
Expand Down Expand Up @@ -10122,18 +10123,15 @@ macro_rules! create_offer_builder { ($self: ident, $builder: ty) => {
pub fn create_offer_builder(
&$self, absolute_expiry: Option<Duration>
) -> Result<$builder, Bolt12SemanticError> {
let node_id = $self.get_our_node_id();
let expanded_key = &$self.inbound_payment_key;
let entropy = &*$self.entropy_source;
let secp_ctx = &$self.secp_ctx;

let nonce = Nonce::from_entropy_source(entropy);
let context = OffersContext::InvoiceRequest { nonce };
let path = $self.create_blinded_paths_using_absolute_expiry(context, absolute_expiry)
let path = $self.flow.create_blinded_paths_using_absolute_expiry($self.peer_for_blinded_path(), context, absolute_expiry)
.and_then(|paths| paths.into_iter().next().ok_or(()))
.map_err(|_| Bolt12SemanticError::MissingPaths)?;
let builder = OfferBuilder::deriving_signing_pubkey(node_id, expanded_key, nonce, secp_ctx)
.chain_hash($self.chain_hash)

let builder = $self.flow.create_offer_builder(nonce)?
.path(path);

let builder = match absolute_expiry {
Expand Down Expand Up @@ -10195,23 +10193,16 @@ macro_rules! create_refund_builder { ($self: ident, $builder: ty) => {
&$self, amount_msats: u64, absolute_expiry: Duration, payment_id: PaymentId,
retry_strategy: Retry, route_params_config: RouteParametersConfig
) -> Result<$builder, Bolt12SemanticError> {
let node_id = $self.get_our_node_id();
let expanded_key = &$self.inbound_payment_key;
let entropy = &*$self.entropy_source;
let secp_ctx = &$self.secp_ctx;

let nonce = Nonce::from_entropy_source(entropy);
let context = OffersContext::OutboundPayment { payment_id, nonce, hmac: None };
let path = $self.create_blinded_paths_using_absolute_expiry(context, Some(absolute_expiry))
let path = $self.flow.create_blinded_paths_using_absolute_expiry($self.peer_for_blinded_path(), context, Some(absolute_expiry))
.and_then(|paths| paths.into_iter().next().ok_or(()))
.map_err(|_| Bolt12SemanticError::MissingPaths)?;

let builder = RefundBuilder::deriving_signing_pubkey(
node_id, expanded_key, nonce, secp_ctx, amount_msats, payment_id
)?
.chain_hash($self.chain_hash)
.absolute_expiry(absolute_expiry)
.path(path);
let builder = $self.flow.create_refund_builder(amount_msats, absolute_expiry, payment_id, nonce)?
.path(path);

let _persistence_guard = PersistenceNotifierGuard::notify_on_drop($self);

Expand Down Expand Up @@ -10272,15 +10263,10 @@ where
return Err(Bolt12SemanticError::MissingPaths)
}

let node_id = self.get_our_node_id();
let expanded_key = &self.inbound_payment_key;
let entropy = &*self.entropy_source;
let secp_ctx = &self.secp_ctx;

let nonce = Nonce::from_entropy_source(entropy);
let mut builder = OfferBuilder::deriving_signing_pubkey(
node_id, expanded_key, nonce, secp_ctx
).chain_hash(self.chain_hash);
let mut builder = self.flow.create_offer_builder(nonce)?;

for path in message_paths_to_always_online_node {
builder = builder.path(path);
Expand Down Expand Up @@ -10420,44 +10406,23 @@ where
) -> Result<(), Bolt12SemanticError> {
let expanded_key = &self.inbound_payment_key;
let entropy = &*self.entropy_source;
let secp_ctx = &self.secp_ctx;

let nonce = Nonce::from_entropy_source(entropy);
let builder: InvoiceRequestBuilder<secp256k1::All> = offer
.request_invoice(expanded_key, nonce, secp_ctx, payment_id)?
.into();
let builder = builder.chain_hash(self.chain_hash)?;

let builder = match quantity {
None => builder,
Some(quantity) => builder.quantity(quantity)?,
};
let builder = match amount_msats {
None => builder,
Some(amount_msats) => builder.amount_msats(amount_msats)?,
};
let builder = match payer_note {
None => builder,
Some(payer_note) => builder.payer_note(payer_note),
};
let builder = match human_readable_name {
None => builder,
Some(hrn) => builder.sourced_from_human_readable_name(hrn),
};
let builder = self.flow.create_invoice_request_builder(offer, nonce, quantity, amount_msats, payer_note, human_readable_name, payment_id)?;
let invoice_request = builder.build_and_sign()?;

let hmac = payment_id.hmac_for_offer_payment(nonce, expanded_key);
let context = MessageContext::Offers(
OffersContext::OutboundPayment { payment_id, nonce, hmac: Some(hmac) }
);
let reply_paths = self.create_blinded_paths(context)
let reply_paths = self.flow.create_blinded_paths(self.peer_for_blinded_path(), context)
.map_err(|_| Bolt12SemanticError::MissingPaths)?;

let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);

create_pending_payment(&invoice_request, nonce)?;

self.enqueue_invoice_request(invoice_request, reply_paths)
self.flow.enqueue_invoice_request(invoice_request, reply_paths)
}

fn enqueue_invoice_request(
Expand Down Expand Up @@ -10542,53 +10507,18 @@ where
)
.map_err(|_| Bolt12SemanticError::MissingPaths)?;

#[cfg(feature = "std")]
let builder = refund.respond_using_derived_keys(
payment_paths, payment_hash, expanded_key, entropy
)?;
#[cfg(not(feature = "std"))]
let created_at = Duration::from_secs(
self.highest_seen_timestamp.load(Ordering::Acquire) as u64
);
#[cfg(not(feature = "std"))]
let builder = refund.respond_using_derived_keys_no_std(
payment_paths, payment_hash, created_at, expanded_key, entropy
)?;
let builder: InvoiceBuilder<DerivedSigningPubkey> = builder.into();
let builder = self.flow.create_invoice_builder(refund, payment_paths, payment_hash)?;
let invoice = builder.allow_mpp().build_and_sign(secp_ctx)?;

let nonce = Nonce::from_entropy_source(entropy);
let hmac = payment_hash.hmac_for_offer_payment(nonce, expanded_key);
let context = MessageContext::Offers(OffersContext::InboundPayment {
payment_hash: invoice.payment_hash(), nonce, hmac
});
let reply_paths = self.create_blinded_paths(context)
let reply_paths = self.flow.create_blinded_paths(self.peer_for_blinded_path(), context)
.map_err(|_| Bolt12SemanticError::MissingPaths)?;

let mut pending_offers_messages = self.pending_offers_messages.lock().unwrap();
if refund.paths().is_empty() {
for reply_path in reply_paths {
let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
destination: Destination::Node(refund.payer_signing_pubkey()),
reply_path,
};
let message = OffersMessage::Invoice(invoice.clone());
pending_offers_messages.push((message, instructions));
}
} else {
reply_paths
.iter()
.flat_map(|reply_path| refund.paths().iter().map(move |path| (path, reply_path)))
.take(OFFERS_MESSAGE_REQUEST_LIMIT)
.for_each(|(path, reply_path)| {
let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
destination: Destination::BlindedPath(path.clone()),
reply_path: reply_path.clone(),
};
let message = OffersMessage::Invoice(invoice.clone());
pending_offers_messages.push((message, instructions));
});
}
self.flow.enqueue_invoice(invoice.clone(), refund, reply_paths)?;

Ok(invoice)
},
Expand Down Expand Up @@ -10644,22 +10574,10 @@ where
) -> Result<(), ()> {
let (onion_message, context) =
self.hrn_resolver.resolve_name(payment_id, name, &*self.entropy_source)?;
let reply_paths = self.create_blinded_paths(MessageContext::DNSResolver(context))?;
let reply_paths = self.flow.create_blinded_paths(self.peer_for_blinded_path(), MessageContext::DNSResolver(context))?;
let expiration = StaleExpiration::TimerTicks(1);
self.pending_outbound_payments.add_new_awaiting_offer(payment_id, expiration, retry_strategy, route_params_config, amount_msats)?;
let message_params = dns_resolvers
.iter()
.flat_map(|destination| reply_paths.iter().map(move |path| (path, destination)))
.take(OFFERS_MESSAGE_REQUEST_LIMIT);
for (reply_path, destination) in message_params {
self.pending_dns_onion_messages.lock().unwrap().push((
DNSResolverMessage::DNSSECQuery(onion_message.clone()),
MessageSendInstructions::WithSpecifiedReplyPath {
destination: destination.clone(),
reply_path: reply_path.clone(),
},
));
}
self.flow.enqueue_dns_onion_message(onion_message, dns_resolvers, reply_paths).map_err(|_| ())?;
Ok(())
}

Expand Down Expand Up @@ -11239,6 +11157,7 @@ where

self.transactions_confirmed(header, txdata, height);
self.best_block_updated(header, height);
self.flow.filtered_block_connected(header, txdata, height);
}

fn block_disconnected(&self, header: &Header, height: u32) {
Expand Down Expand Up @@ -12609,7 +12528,9 @@ where
}

fn release_pending_messages(&self) -> Vec<(OffersMessage, MessageSendInstructions)> {
core::mem::take(&mut self.pending_offers_messages.lock().unwrap())
let mut messages = core::mem::take(&mut *self.pending_offers_messages.lock().unwrap());
messages.extend(self.flow.get_and_clear_pending_offers_messages());
messages
}
}

Expand Down Expand Up @@ -12665,7 +12586,9 @@ where
}

fn release_pending_messages(&self) -> Vec<(AsyncPaymentsMessage, MessageSendInstructions)> {
core::mem::take(&mut self.pending_async_payments_messages.lock().unwrap())
let mut messages = core::mem::take(&mut *self.pending_async_payments_messages.lock().unwrap());
messages.extend(self.flow.get_and_clear_pending_async_messages());
messages
}
}

Expand Down Expand Up @@ -12730,7 +12653,9 @@ where
}

fn release_pending_messages(&self) -> Vec<(DNSResolverMessage, MessageSendInstructions)> {
core::mem::take(&mut self.pending_dns_onion_messages.lock().unwrap())
let mut messages = core::mem::take(&mut *self.pending_dns_onion_messages.lock().unwrap());
messages.extend(self.flow.get_and_clear_pending_dns_messages());
messages
}
}

Expand Down
12 changes: 6 additions & 6 deletions lightning/src/ln/offers_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1402,7 +1402,7 @@ fn fails_authentication_when_handling_invoice_request() {
expect_recent_payment!(david, RecentPaymentDetails::AwaitingInvoice, payment_id);

connect_peers(david, alice);
match &mut david.node.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
match &mut david.node.flow.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
MessageSendInstructions::WithSpecifiedReplyPath { destination, .. } =>
*destination = Destination::Node(alice_id),
_ => panic!(),
Expand All @@ -1427,7 +1427,7 @@ fn fails_authentication_when_handling_invoice_request() {
.unwrap();
expect_recent_payment!(david, RecentPaymentDetails::AwaitingInvoice, payment_id);

match &mut david.node.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
match &mut david.node.flow.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
MessageSendInstructions::WithSpecifiedReplyPath { destination, .. } =>
*destination = Destination::BlindedPath(invalid_path),
_ => panic!(),
Expand Down Expand Up @@ -1507,7 +1507,7 @@ fn fails_authentication_when_handling_invoice_for_offer() {

// Don't send the invoice request, but grab its reply path to use with a different request.
let invalid_reply_path = {
let mut pending_offers_messages = david.node.pending_offers_messages.lock().unwrap();
let mut pending_offers_messages = david.node.flow.pending_offers_messages.lock().unwrap();
let pending_invoice_request = pending_offers_messages.pop().unwrap();
pending_offers_messages.clear();
match pending_invoice_request.1 {
Expand All @@ -1524,7 +1524,7 @@ fn fails_authentication_when_handling_invoice_for_offer() {
// Swap out the reply path to force authentication to fail when handling the invoice since it
// will be sent over the wrong blinded path.
{
let mut pending_offers_messages = david.node.pending_offers_messages.lock().unwrap();
let mut pending_offers_messages = david.node.flow.pending_offers_messages.lock().unwrap();
let mut pending_invoice_request = pending_offers_messages.first_mut().unwrap();
match &mut pending_invoice_request.1 {
MessageSendInstructions::WithSpecifiedReplyPath { reply_path, .. } =>
Expand Down Expand Up @@ -1611,7 +1611,7 @@ fn fails_authentication_when_handling_invoice_for_refund() {
let expected_invoice = alice.node.request_refund_payment(&refund).unwrap();

connect_peers(david, alice);
match &mut alice.node.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
match &mut alice.node.flow.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
MessageSendInstructions::WithSpecifiedReplyPath { destination, .. } =>
*destination = Destination::Node(david_id),
_ => panic!(),
Expand Down Expand Up @@ -1642,7 +1642,7 @@ fn fails_authentication_when_handling_invoice_for_refund() {

let expected_invoice = alice.node.request_refund_payment(&refund).unwrap();

match &mut alice.node.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
match &mut alice.node.flow.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
MessageSendInstructions::WithSpecifiedReplyPath { destination, .. } =>
*destination = Destination::BlindedPath(invalid_path),
_ => panic!(),
Expand Down

0 comments on commit 286ccd7

Please sign in to comment.