diff --git a/lightning/src/blinded_path/message.rs b/lightning/src/blinded_path/message.rs
index 0e3977ab68e..164cfcfb1ad 100644
--- a/lightning/src/blinded_path/message.rs
+++ b/lightning/src/blinded_path/message.rs
@@ -238,6 +238,12 @@ pub enum NextMessageHop {
 }
 
 /// An intermediate node, and possibly a short channel id leading to the next node.
+///
+/// Note:
+/// [`MessageForwardNode`] must represent a node that supports [`supports_onion_messages`]
+/// in order to be included in valid blinded paths for onion messaging.
+///
+/// [`supports_onion_messages`]: crate::types::features::Features::supports_onion_messages
 #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
 pub struct MessageForwardNode {
 	/// This node's pubkey.
diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs
index 3bf672f7e13..cc1a7e770b5 100644
--- a/lightning/src/ln/channelmanager.rs
+++ b/lightning/src/ln/channelmanager.rs
@@ -35,10 +35,10 @@ use bitcoin::{secp256k1, Sequence};
 use bitcoin::{TxIn, Weight};
 
 use crate::events::{FundingInfo, PaidBolt12Invoice};
-use crate::blinded_path::message::{AsyncPaymentsContext, MessageContext, OffersContext};
+use crate::blinded_path::message::{AsyncPaymentsContext, OffersContext};
 use crate::blinded_path::NodeIdLookUp;
-use crate::blinded_path::message::{BlindedMessagePath, MessageForwardNode};
-use crate::blinded_path::payment::{AsyncBolt12OfferContext, BlindedPaymentPath, Bolt12OfferContext, Bolt12RefundContext, PaymentConstraints, PaymentContext, UnauthenticatedReceiveTlvs};
+use crate::blinded_path::message::MessageForwardNode;
+use crate::blinded_path::payment::{AsyncBolt12OfferContext, Bolt12OfferContext, PaymentContext, UnauthenticatedReceiveTlvs};
 use crate::chain;
 use crate::chain::{Confirm, ChannelMonitorUpdateStatus, Watch, BestBlock};
 use crate::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator, LowerBoundedFeeEstimator};
@@ -49,6 +49,7 @@ use crate::events::{self, Event, EventHandler, EventsProvider, InboundChannelFun
 // construct one themselves.
 use crate::ln::inbound_payment;
 use crate::ln::types::ChannelId;
+use crate::offers::flow::OffersMessageFlow;
 use crate::types::payment::{PaymentHash, PaymentPreimage, PaymentSecret};
 use crate::ln::channel::{self, Channel, ChannelError, ChannelUpdateStatus, FundedChannel, ShutdownResult, UpdateFulfillCommitFetch, OutboundV1Channel, ReconnectionMsg, InboundV1Channel, WithChannelContext};
 use crate::ln::channel::PendingV2Channel;
@@ -65,19 +66,18 @@ use crate::ln::msgs::{BaseMessageHandler, ChannelMessageHandler, CommitmentUpdat
 #[cfg(test)]
 use crate::ln::outbound_payment;
 use crate::ln::outbound_payment::{Bolt11PaymentError, OutboundPayments, PendingOutboundPayment, RetryableInvoiceRequest, SendAlongPathArgs, StaleExpiration};
-use crate::offers::invoice::{Bolt12Invoice, DEFAULT_RELATIVE_EXPIRY, DerivedSigningPubkey, ExplicitSigningPubkey, InvoiceBuilder, UnsignedBolt12Invoice};
+use crate::offers::invoice::{Bolt12Invoice, DerivedSigningPubkey, InvoiceBuilder, DEFAULT_RELATIVE_EXPIRY};
 use crate::offers::invoice_error::InvoiceError;
-use crate::offers::invoice_request::{InvoiceRequest, InvoiceRequestBuilder};
+use crate::offers::invoice_request::InvoiceRequest;
 use crate::offers::nonce::Nonce;
-use crate::offers::offer::{Offer, OfferBuilder};
+use crate::offers::offer::Offer;
 use crate::offers::parse::Bolt12SemanticError;
-use crate::offers::refund::{Refund, RefundBuilder};
+use crate::offers::refund::Refund;
 use crate::offers::signer;
 use crate::onion_message::async_payments::{AsyncPaymentsMessage, HeldHtlcAvailable, ReleaseHeldHtlc, AsyncPaymentsMessageHandler};
 use crate::onion_message::dns_resolution::HumanReadableName;
-use crate::onion_message::messenger::{Destination, MessageRouter, Responder, ResponseInstruction, MessageSendInstructions};
+use crate::onion_message::messenger::{MessageRouter, Responder, ResponseInstruction, MessageSendInstructions};
 use crate::onion_message::offers::{OffersMessage, OffersMessageHandler};
-use crate::onion_message::packet::OnionMessageContents;
 use crate::sign::{EntropySource, NodeSigner, Recipient, SignerProvider};
 use crate::sign::ecdsa::EcdsaChannelSigner;
 use crate::util::config::{ChannelConfig, ChannelConfigUpdate, ChannelConfigOverrides, UserConfig};
@@ -89,18 +89,24 @@ use crate::util::logger::{Level, Logger, WithContext};
 use crate::util::errors::APIError;
 
 #[cfg(async_payments)] use {
+	crate::blinded_path::message::BlindedMessagePath,
 	crate::offers::offer::Amount,
 	crate::offers::static_invoice::{DEFAULT_RELATIVE_EXPIRY as STATIC_INVOICE_DEFAULT_RELATIVE_EXPIRY, StaticInvoice, StaticInvoiceBuilder},
 };
+#[cfg(all(test, async_payments))]
+use crate::blinded_path::payment::BlindedPaymentPath;
 
 #[cfg(feature = "dnssec")]
-use crate::blinded_path::message::DNSResolverContext;
-#[cfg(feature = "dnssec")]
-use crate::onion_message::dns_resolution::{DNSResolverMessage, DNSResolverMessageHandler, DNSSECQuery, DNSSECProof, OMNameResolver};
+use {
+	crate::onion_message::dns_resolution::{DNSResolverMessage, DNSResolverMessageHandler, DNSSECQuery, DNSSECProof},
+	crate::onion_message::messenger::Destination,
+	crate::blinded_path::message::DNSResolverContext,
+};
 
 #[cfg(not(c_bindings))]
 use {
-	crate::offers::offer::DerivedMetadata,
+	crate::offers::offer::{DerivedMetadata, OfferBuilder},
+	crate::offers::refund::RefundBuilder,
 	crate::onion_message::messenger::DefaultMessageRouter,
 	crate::routing::router::DefaultRouter,
 	crate::routing::gossip::NetworkGraph,
@@ -123,7 +129,7 @@ use core::{cmp, mem};
 use core::borrow::Borrow;
 use core::cell::RefCell;
 use crate::io::Read;
-use crate::sync::{Arc, Mutex, RwLock, RwLockReadGuard, FairRwLock, LockTestExt, LockHeldState};
+use crate::sync::{Arc, FairRwLock, LockHeldState, LockTestExt, Mutex, RwLock, RwLockReadGuard};
 use core::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
 use core::time::Duration;
 use core::ops::Deref;
@@ -281,6 +287,8 @@ pub struct BlindedForward {
 	/// Overrides the next hop's [`msgs::UpdateAddHTLC::blinding_point`]. Set if this HTLC is being
 	/// forwarded within a [`BlindedPaymentPath`] that was concatenated to another blinded path that
 	/// starts at the next hop.
+	///
+	/// [`BlindedPaymentPath`]: crate::blinded_path::payment::BlindedPaymentPath
 	pub next_blinding_override: Option<PublicKey>,
 }
 
@@ -2458,7 +2466,11 @@ where
 	chain_monitor: M,
 	tx_broadcaster: T,
 	router: R,
-	message_router: MR,
+
+	#[cfg(test)]
+	pub(super) flow: OffersMessageFlow<MR>,
+	#[cfg(not(test))]
+	flow: OffersMessageFlow<MR>,
 
 	/// See `ChannelManager` struct-level documentation for lock order requirements.
 	#[cfg(any(test, feature = "_test_utils"))]
@@ -2642,12 +2654,6 @@ where
 	event_persist_notifier: Notifier,
 	needs_persist_flag: AtomicBool,
 
-	#[cfg(not(any(test, feature = "_test_utils")))]
-	pending_offers_messages: Mutex<Vec<(OffersMessage, MessageSendInstructions)>>,
-	#[cfg(any(test, feature = "_test_utils"))]
-	pub(crate) pending_offers_messages: Mutex<Vec<(OffersMessage, MessageSendInstructions)>>,
-	pending_async_payments_messages: Mutex<Vec<(AsyncPaymentsMessage, MessageSendInstructions)>>,
-
 	/// Tracks the message events that are to be broadcasted when we are connected to some peer.
 	pending_broadcast_messages: Mutex<Vec<MessageSendEvent>>,
 
@@ -2666,11 +2672,6 @@ where
 	/// [`ConfirmationTarget::MinAllowedNonAnchorChannelRemoteFee`] estimate.
 	last_days_feerates: Mutex<VecDeque<(u32, u32)>>,
 
-	#[cfg(feature = "dnssec")]
-	hrn_resolver: OMNameResolver,
-	#[cfg(feature = "dnssec")]
-	pending_dns_onion_messages: Mutex<Vec<(DNSResolverMessage, MessageSendInstructions)>>,
-
 	#[cfg(feature = "_test_utils")]
 	/// In testing, it is useful be able to forge a name -> offer mapping so that we can pay an
 	/// offer generated in the test.
@@ -2827,7 +2828,7 @@ pub const MIN_CLTV_EXPIRY_DELTA: u16 = 6*8;
 // scale them up to suit its security policy. At the network-level, we shouldn't constrain them too much,
 // while avoiding to introduce a DoS vector. Further, a low CTLV_FAR_FAR_AWAY could be a source of
 // routing failure for any HTLC sender picking up an LDK node among the first hops.
-pub(super) const CLTV_FAR_FAR_AWAY: u32 = 14 * 24 * 6;
+pub(crate) const CLTV_FAR_FAR_AWAY: u32 = 14 * 24 * 6;
 
 /// Minimum CLTV difference between the current block height and received inbound payments.
 /// Invoices generated for payment to us must set their `min_final_cltv_expiry_delta` field to at least
@@ -2908,6 +2909,8 @@ const MAX_NO_CHANNEL_PEERS: usize = 250;
 /// Using compact [`BlindedMessagePath`]s may provide better privacy as the [`MessageRouter`] could select
 /// more hops. However, since they use short channel ids instead of pubkeys, they are more likely to
 /// become invalid over time as channels are closed. Thus, they are only suitable for short-term use.
+///
+/// [`BlindedMessagePath`]: crate::blinded_path::message::BlindedMessagePath
 pub const MAX_SHORT_LIVED_RELATIVE_EXPIRY: Duration = Duration::from_secs(60 * 60 * 24);
 
 /// Used by [`ChannelManager::list_recent_payments`] to express the status of recent payments.
@@ -3576,7 +3579,16 @@ where
 	) -> Self {
 		let mut secp_ctx = Secp256k1::new();
 		secp_ctx.seeded_randomize(&entropy_source.get_secure_random_bytes());
+
 		let expanded_inbound_key = node_signer.get_inbound_payment_key();
+		let our_network_pubkey = node_signer.get_node_id(Recipient::Node).unwrap();
+
+		let flow = OffersMessageFlow::new(
+			ChainHash::using_genesis_block(params.network), params.best_block,
+			our_network_pubkey, current_timestamp, expanded_inbound_key,
+			secp_ctx.clone(), message_router
+		);
+
 		ChannelManager {
 			default_configuration: config.clone(),
 			chain_hash: ChainHash::using_genesis_block(params.network),
@@ -3584,7 +3596,7 @@ where
 			chain_monitor,
 			tx_broadcaster,
 			router,
-			message_router,
+			flow,
 
 			best_block: RwLock::new(params.best_block),
 
@@ -3596,7 +3608,7 @@ where
 			pending_intercepted_htlcs: Mutex::new(new_hash_map()),
 			short_to_chan_info: FairRwLock::new(new_hash_map()),
 
-			our_network_pubkey: node_signer.get_node_id(Recipient::Node).unwrap(),
+			our_network_pubkey,
 			secp_ctx,
 
 			inbound_payment_key: expanded_inbound_key,
@@ -3621,8 +3633,6 @@ where
 			needs_persist_flag: AtomicBool::new(false),
 			funding_batch_states: Mutex::new(BTreeMap::new()),
 
-			pending_offers_messages: Mutex::new(Vec::new()),
-			pending_async_payments_messages: Mutex::new(Vec::new()),
 			pending_broadcast_messages: Mutex::new(Vec::new()),
 
 			last_days_feerates: Mutex::new(VecDeque::new()),
@@ -3633,11 +3643,6 @@ where
 
 			logger,
 
-			#[cfg(feature = "dnssec")]
-			hrn_resolver: OMNameResolver::new(current_timestamp, params.best_block.height),
-			#[cfg(feature = "dnssec")]
-			pending_dns_onion_messages: Mutex::new(Vec::new()),
-
 			#[cfg(feature = "_test_utils")]
 			testing_dnssec_proof_offer_resolution_override: Mutex::new(new_hash_map()),
 		}
@@ -4966,27 +4971,14 @@ where
 				}
 			};
 
-			let nonce = Nonce::from_entropy_source(&*self.entropy_source);
-			let hmac = payment_id.hmac_for_async_payment(nonce, &self.inbound_payment_key);
-			let reply_paths = match self.create_blinded_paths(
-				MessageContext::AsyncPayments(
-					AsyncPaymentsContext::OutboundPayment { payment_id, nonce, hmac }
-				)
-			) {
-				Ok(paths) => paths,
-				Err(()) => {
-					self.abandon_payment_with_reason(payment_id, PaymentFailureReason::BlindedPathCreationFailed);
+			let entropy = &*self.entropy_source;
+
+			if self.flow.enqueue_held_htlc_available(entropy, invoice, payment_id, self.get_peers_for_blinded_path()).is_err() {
+				self.abandon_payment_with_reason(payment_id, PaymentFailureReason::BlindedPathCreationFailed);
 					res = Err(Bolt12PaymentError::BlindedPathCreationFailed);
 					return NotifyOption::DoPersist
-				}
 			};
 
-			let mut pending_async_payments_messages = self.pending_async_payments_messages.lock().unwrap();
-			let message = AsyncPaymentsMessage::HeldHtlcAvailable(HeldHtlcAvailable {});
-			enqueue_onion_message_with_reply_paths(
-				message, invoice.message_paths(), reply_paths, &mut pending_async_payments_messages
-			);
-
 			NotifyOption::DoPersist
 		});
 
@@ -10225,36 +10217,21 @@ macro_rules! create_offer_builder { ($self: ident, $builder: ty) => {
 	///
 	/// # Limitations
 	///
-	/// Requires a direct connection to the introduction node in the responding [`InvoiceRequest`]'s
-	/// reply path.
+	/// See [`OffersMessageFlow::create_offer_builder`] for limitations on the offer builder.
 	///
 	/// # Errors
 	///
 	/// Errors if the parameterized [`Router`] is unable to create a blinded path for the offer.
 	///
+	/// [`BlindedMessagePath`]: crate::blinded_path::message::BlindedMessagePath
 	/// [`Offer`]: crate::offers::offer::Offer
 	/// [`InvoiceRequest`]: crate::offers::invoice_request::InvoiceRequest
 	pub fn create_offer_builder(
 		&$self, absolute_expiry: Option<Duration>
 	) -> Result<$builder, Bolt12SemanticError> {
-		let node_id = $self.get_our_node_id();
-		let expanded_key = &$self.inbound_payment_key;
 		let entropy = &*$self.entropy_source;
-		let secp_ctx = &$self.secp_ctx;
 
-		let nonce = Nonce::from_entropy_source(entropy);
-		let context = OffersContext::InvoiceRequest { nonce };
-		let path = $self.create_blinded_paths_using_absolute_expiry(context, absolute_expiry)
-			.and_then(|paths| paths.into_iter().next().ok_or(()))
-			.map_err(|_| Bolt12SemanticError::MissingPaths)?;
-		let builder = OfferBuilder::deriving_signing_pubkey(node_id, expanded_key, nonce, secp_ctx)
-			.chain_hash($self.chain_hash)
-			.path(path);
-
-		let builder = match absolute_expiry {
-			None => builder,
-			Some(absolute_expiry) => builder.absolute_expiry(absolute_expiry),
-		};
+		let builder = $self.flow.create_offer_builder(entropy, absolute_expiry, $self.get_peers_for_blinded_path())?;
 
 		Ok(builder.into())
 	}
@@ -10290,11 +10267,6 @@ macro_rules! create_refund_builder { ($self: ident, $builder: ty) => {
 	///
 	/// Also, uses a derived payer id in the refund for payer privacy.
 	///
-	/// # Limitations
-	///
-	/// Requires a direct connection to an introduction node in the responding
-	/// [`Bolt12Invoice::payment_paths`].
-	///
 	/// # Errors
 	///
 	/// Errors if:
@@ -10305,28 +10277,18 @@ macro_rules! create_refund_builder { ($self: ident, $builder: ty) => {
 	/// [`Refund`]: crate::offers::refund::Refund
 	/// [`Bolt12Invoice`]: crate::offers::invoice::Bolt12Invoice
 	/// [`Bolt12Invoice::payment_paths`]: crate::offers::invoice::Bolt12Invoice::payment_paths
+	/// [`BlindedMessagePath`]: crate::blinded_path::message::BlindedMessagePath
 	/// [Avoiding Duplicate Payments]: #avoiding-duplicate-payments
 	pub fn create_refund_builder(
 		&$self, amount_msats: u64, absolute_expiry: Duration, payment_id: PaymentId,
 		retry_strategy: Retry, route_params_config: RouteParametersConfig
 	) -> Result<$builder, Bolt12SemanticError> {
-		let node_id = $self.get_our_node_id();
-		let expanded_key = &$self.inbound_payment_key;
 		let entropy = &*$self.entropy_source;
-		let secp_ctx = &$self.secp_ctx;
 
-		let nonce = Nonce::from_entropy_source(entropy);
-		let context = OffersContext::OutboundPayment { payment_id, nonce, hmac: None };
-		let path = $self.create_blinded_paths_using_absolute_expiry(context, Some(absolute_expiry))
-			.and_then(|paths| paths.into_iter().next().ok_or(()))
-			.map_err(|_| Bolt12SemanticError::MissingPaths)?;
-
-		let builder = RefundBuilder::deriving_signing_pubkey(
-			node_id, expanded_key, nonce, secp_ctx, amount_msats, payment_id
-		)?
-			.chain_hash($self.chain_hash)
-			.absolute_expiry(absolute_expiry)
-			.path(path);
+		let builder = $self.flow.create_refund_builder(
+			entropy, amount_msats, absolute_expiry,
+			payment_id, $self.get_peers_for_blinded_path()
+		)?;
 
 		let _persistence_guard = PersistenceNotifierGuard::notify_on_drop($self);
 
@@ -10341,13 +10303,6 @@ macro_rules! create_refund_builder { ($self: ident, $builder: ty) => {
 	}
 } }
 
-/// Defines the maximum number of [`OffersMessage`] including different reply paths to be sent
-/// along different paths.
-/// Sending multiple requests increases the chances of successful delivery in case some
-/// paths are unavailable. However, only one invoice for a given [`PaymentId`] will be paid,
-/// even if multiple invoices are received.
-const OFFERS_MESSAGE_REQUEST_LIMIT: usize = 10;
-
 impl<M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, MR: Deref, L: Deref> ChannelManager<M, T, ES, NS, SP, F, R, MR, L>
 where
 	M::Target: chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
@@ -10382,25 +10337,7 @@ where
 	pub fn create_async_receive_offer_builder(
 		&self, message_paths_to_always_online_node: Vec<BlindedMessagePath>
 	) -> Result<(OfferBuilder<DerivedMetadata, secp256k1::All>, Nonce), Bolt12SemanticError> {
-		if message_paths_to_always_online_node.is_empty() {
-			return Err(Bolt12SemanticError::MissingPaths)
-		}
-
-		let node_id = self.get_our_node_id();
-		let expanded_key = &self.inbound_payment_key;
-		let entropy = &*self.entropy_source;
-		let secp_ctx = &self.secp_ctx;
-
-		let nonce = Nonce::from_entropy_source(entropy);
-		let mut builder = OfferBuilder::deriving_signing_pubkey(
-			node_id, expanded_key, nonce, secp_ctx
-		).chain_hash(self.chain_hash);
-
-		for path in message_paths_to_always_online_node {
-			builder = builder.path(path);
-		}
-
-		Ok((builder.into(), nonce))
+		self.flow.create_async_receive_offer_builder(&*self.entropy_source, message_paths_to_always_online_node)
 	}
 
 	/// Creates a [`StaticInvoiceBuilder`] from the corresponding [`Offer`] and [`Nonce`] that were
@@ -10410,18 +10347,10 @@ where
 	pub fn create_static_invoice_builder<'a>(
 		&self, offer: &'a Offer, offer_nonce: Nonce, relative_expiry: Option<Duration>
 	) -> Result<StaticInvoiceBuilder<'a>, Bolt12SemanticError> {
-		let expanded_key = &self.inbound_payment_key;
 		let entropy = &*self.entropy_source;
-		let secp_ctx = &self.secp_ctx;
-
-		let payment_context = PaymentContext::AsyncBolt12Offer(
-			AsyncBolt12OfferContext { offer_nonce }
-		);
-		let amount_msat = offer.amount().and_then(|amount| {
-			match amount {
-				Amount::Bitcoin { amount_msats } => Some(amount_msats),
-				Amount::Currency { .. } => None
-			}
+		let amount_msat = offer.amount().and_then(|amount| match amount {
+			Amount::Bitcoin { amount_msats } => Some(amount_msats),
+			Amount::Currency { .. } => None,
 		});
 
 		let relative_expiry = relative_expiry.unwrap_or(STATIC_INVOICE_DEFAULT_RELATIVE_EXPIRY);
@@ -10432,25 +10361,11 @@ where
 			&self.inbound_payment_key, amount_msat, relative_expiry_secs, created_at.as_secs(), None
 		).map_err(|()| Bolt12SemanticError::InvalidAmount)?;
 
-		let payment_paths = self.create_blinded_payment_paths(
-			amount_msat, payment_secret, payment_context, relative_expiry_secs
-		).map_err(|()| Bolt12SemanticError::MissingPaths)?;
-
-		let nonce = Nonce::from_entropy_source(entropy);
-		let hmac = signer::hmac_for_held_htlc_available_context(nonce, expanded_key);
-		let path_absolute_expiry = Duration::from_secs(
-			inbound_payment::calculate_absolute_expiry(created_at.as_secs(), relative_expiry_secs)
-		);
-		let context = MessageContext::AsyncPayments(
-			AsyncPaymentsContext::InboundPayment { nonce, hmac, path_absolute_expiry }
-		);
-		let async_receive_message_paths = self.create_blinded_paths(context)
-			.map_err(|()| Bolt12SemanticError::MissingPaths)?;
-
-		StaticInvoiceBuilder::for_offer_using_derived_keys(
-			offer, payment_paths, async_receive_message_paths, created_at, expanded_key,
-			offer_nonce, secp_ctx
-		).map(|inv| inv.allow_mpp().relative_expiry(relative_expiry_secs))
+		self.flow.create_static_invoice_builder(
+			&self.router, entropy, offer, offer_nonce, payment_secret,
+			relative_expiry_secs, self.list_usable_channels(),
+			self.get_peers_for_blinded_path()
+		)
 	}
 
 	/// Pays for an [`Offer`] using the given parameters by creating an [`InvoiceRequest`] and
@@ -10504,6 +10419,7 @@ where
 	/// [`InvoiceRequest::payer_note`]: crate::offers::invoice_request::InvoiceRequest::payer_note
 	/// [`InvoiceRequestBuilder`]: crate::offers::invoice_request::InvoiceRequestBuilder
 	/// [`Bolt12Invoice`]: crate::offers::invoice::Bolt12Invoice
+	/// [`BlindedMessagePath`]: crate::blinded_path::message::BlindedMessagePath
 	/// [`Bolt12Invoice::payment_paths`]: crate::offers::invoice::Bolt12Invoice::payment_paths
 	/// [Avoiding Duplicate Payments]: #avoiding-duplicate-payments
 	pub fn pay_for_offer(
@@ -10532,15 +10448,12 @@ where
 		payer_note: Option<String>, payment_id: PaymentId,
 		human_readable_name: Option<HumanReadableName>, create_pending_payment: CPP,
 	) -> Result<(), Bolt12SemanticError> {
-		let expanded_key = &self.inbound_payment_key;
 		let entropy = &*self.entropy_source;
-		let secp_ctx = &self.secp_ctx;
-
 		let nonce = Nonce::from_entropy_source(entropy);
-		let builder: InvoiceRequestBuilder<secp256k1::All> = offer
-			.request_invoice(expanded_key, nonce, secp_ctx, payment_id)?
-			.into();
-		let builder = builder.chain_hash(self.chain_hash)?;
+
+		let builder = self.flow.create_invoice_request_builder(
+			offer, nonce, payment_id,
+		)?;
 
 		let builder = match quantity {
 			None => builder,
@@ -10558,48 +10471,16 @@ where
 			None => builder,
 			Some(hrn) => builder.sourced_from_human_readable_name(hrn),
 		};
-		let invoice_request = builder.build_and_sign()?;
-
-		let hmac = payment_id.hmac_for_offer_payment(nonce, expanded_key);
-		let context = MessageContext::Offers(
-			OffersContext::OutboundPayment { payment_id, nonce, hmac: Some(hmac) }
-		);
-		let reply_paths = self.create_blinded_paths(context)
-			.map_err(|_| Bolt12SemanticError::MissingPaths)?;
 
+		let invoice_request = builder.build_and_sign()?;
 		let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 
-		create_pending_payment(&invoice_request, nonce)?;
+		self.flow.enqueue_invoice_request(
+			invoice_request.clone(), payment_id, nonce,
+			self.get_peers_for_blinded_path()
+		)?;
 
-		self.enqueue_invoice_request(invoice_request, reply_paths)
-	}
-
-	fn enqueue_invoice_request(
-		&self,
-		invoice_request: InvoiceRequest,
-		reply_paths: Vec<BlindedMessagePath>,
-	) -> Result<(), Bolt12SemanticError> {
-		let mut pending_offers_messages = self.pending_offers_messages.lock().unwrap();
-		if !invoice_request.paths().is_empty() {
-			let message = OffersMessage::InvoiceRequest(invoice_request.clone());
-			enqueue_onion_message_with_reply_paths(
-				message, invoice_request.paths(), reply_paths, &mut pending_offers_messages
-			);
-		} else if let Some(node_id) = invoice_request.issuer_signing_pubkey() {
-			for reply_path in reply_paths {
-				let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
-					destination: Destination::Node(node_id),
-					reply_path,
-				};
-				let message = OffersMessage::InvoiceRequest(invoice_request.clone());
-				pending_offers_messages.push((message, instructions));
-			}
-		} else {
-			debug_assert!(false);
-			return Err(Bolt12SemanticError::MissingIssuerSigningPubkey);
-		}
-
-		Ok(())
+		create_pending_payment(&invoice_request, nonce)
 	}
 
 	/// Creates a [`Bolt12Invoice`] for a [`Refund`] and enqueues it to be sent via an onion
@@ -10623,70 +10504,29 @@ where
 	/// - the parameterized [`Router`] is unable to create a blinded payment path or reply path for
 	///   the invoice.
 	///
+	/// [`BlindedPaymentPath`]: crate::blinded_path::payment::BlindedPaymentPath
 	/// [`Bolt12Invoice`]: crate::offers::invoice::Bolt12Invoice
 	pub fn request_refund_payment(
 		&self, refund: &Refund
 	) -> Result<Bolt12Invoice, Bolt12SemanticError> {
-		let expanded_key = &self.inbound_payment_key;
-		let entropy = &*self.entropy_source;
 		let secp_ctx = &self.secp_ctx;
 
 		let amount_msats = refund.amount_msats();
 		let relative_expiry = DEFAULT_RELATIVE_EXPIRY.as_secs() as u32;
 
-		if refund.chain() != self.chain_hash {
-			return Err(Bolt12SemanticError::UnsupportedChain);
-		}
-
 		let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 
 		match self.create_inbound_payment(Some(amount_msats), relative_expiry, None) {
 			Ok((payment_hash, payment_secret)) => {
-				let payment_context = PaymentContext::Bolt12Refund(Bolt12RefundContext {});
-				let payment_paths = self.create_blinded_payment_paths(
-					Some(amount_msats), payment_secret, payment_context, relative_expiry,
-				)
-					.map_err(|_| Bolt12SemanticError::MissingPaths)?;
-
-				#[cfg(feature = "std")]
-				let builder = refund.respond_using_derived_keys(
-					payment_paths, payment_hash, expanded_key, entropy
+				let entropy = &*self.entropy_source;
+				let builder = self.flow.create_invoice_builder_from_refund(
+					&self.router, entropy, refund, payment_hash,
+					payment_secret, self.list_usable_channels()
 				)?;
-				#[cfg(not(feature = "std"))]
-				let created_at = Duration::from_secs(
-					self.highest_seen_timestamp.load(Ordering::Acquire) as u64
-				);
-				#[cfg(not(feature = "std"))]
-				let builder = refund.respond_using_derived_keys_no_std(
-					payment_paths, payment_hash, created_at, expanded_key, entropy
-				)?;
-				let builder: InvoiceBuilder<DerivedSigningPubkey> = builder.into();
+
 				let invoice = builder.allow_mpp().build_and_sign(secp_ctx)?;
 
-				let nonce = Nonce::from_entropy_source(entropy);
-				let hmac = payment_hash.hmac_for_offer_payment(nonce, expanded_key);
-				let context = MessageContext::Offers(OffersContext::InboundPayment {
-					payment_hash: invoice.payment_hash(), nonce, hmac
-				});
-				let reply_paths = self.create_blinded_paths(context)
-					.map_err(|_| Bolt12SemanticError::MissingPaths)?;
-
-				let mut pending_offers_messages = self.pending_offers_messages.lock().unwrap();
-				if refund.paths().is_empty() {
-					for reply_path in reply_paths {
-						let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
-							destination: Destination::Node(refund.payer_signing_pubkey()),
-							reply_path,
-						};
-						let message = OffersMessage::Invoice(invoice.clone());
-						pending_offers_messages.push((message, instructions));
-					}
-				} else {
-					let message = OffersMessage::Invoice(invoice.clone());
-					enqueue_onion_message_with_reply_paths(
-						message, refund.paths(), reply_paths, &mut pending_offers_messages
-					);
-				}
+				self.flow.enqueue_invoice(entropy, invoice.clone(), refund, self.get_peers_for_blinded_path())?;
 
 				Ok(invoice)
 			},
@@ -10733,6 +10573,8 @@ where
 	/// - a duplicate `payment_id` is provided given the caveats in the aforementioned link,
 	///
 	/// [`Bolt12Invoice::payment_paths`]: crate::offers::invoice::Bolt12Invoice::payment_paths
+	/// [`OMNameResolver::resolve_name`]: crate::onion_message::dns_resolution::OMNameResolver::resolve_name
+	/// [`OMNameResolver::handle_dnssec_proof_for_uri`]: crate::onion_message::dns_resolution::OMNameResolver::handle_dnssec_proof_for_uri
 	/// [Avoiding Duplicate Payments]: #avoiding-duplicate-payments
 	#[cfg(feature = "dnssec")]
 	pub fn pay_for_offer_from_human_readable_name(
@@ -10741,24 +10583,16 @@ where
 		dns_resolvers: Vec<Destination>,
 	) -> Result<(), ()> {
 		let (onion_message, context) =
-			self.hrn_resolver.resolve_name(payment_id, name, &*self.entropy_source)?;
-		let reply_paths = self.create_blinded_paths(MessageContext::DNSResolver(context))?;
+			self.flow.hrn_resolver.resolve_name(payment_id, name, &*self.entropy_source)?;
+
 		let expiration = StaleExpiration::TimerTicks(1);
 		self.pending_outbound_payments.add_new_awaiting_offer(payment_id, expiration, retry_strategy, route_params_config, amount_msats)?;
-		let message_params = dns_resolvers
-			.iter()
-			.flat_map(|destination| reply_paths.iter().map(move |path| (path, destination)))
-			.take(OFFERS_MESSAGE_REQUEST_LIMIT);
-		for (reply_path, destination) in message_params {
-			self.pending_dns_onion_messages.lock().unwrap().push((
-				DNSResolverMessage::DNSSECQuery(onion_message.clone()),
-				MessageSendInstructions::WithSpecifiedReplyPath {
-					destination: destination.clone(),
-					reply_path: reply_path.clone(),
-				},
-			));
-		}
-		Ok(())
+
+		self.flow.enqueue_dns_onion_message(
+			onion_message, context, dns_resolvers,
+			self.get_peers_for_blinded_path()
+		).map_err(|_| ())
+
 	}
 
 	/// Gets a payment secret and payment hash for use in an invoice given to a third party wishing
@@ -10859,25 +10693,7 @@ where
 		inbound_payment::get_payment_preimage(payment_hash, payment_secret, &self.inbound_payment_key)
 	}
 
-	/// Creates a collection of blinded paths by delegating to [`MessageRouter`] based on
-	/// the path's intended lifetime.
-	///
-	/// Whether or not the path is compact depends on whether the path is short-lived or long-lived,
-	/// respectively, based on the given `absolute_expiry` as seconds since the Unix epoch. See
-	/// [`MAX_SHORT_LIVED_RELATIVE_EXPIRY`].
-	fn create_blinded_paths_using_absolute_expiry(
-		&self, context: OffersContext, absolute_expiry: Option<Duration>,
-	) -> Result<Vec<BlindedMessagePath>, ()> {
-		let now = self.duration_since_epoch();
-		let max_short_lived_absolute_expiry = now.saturating_add(MAX_SHORT_LIVED_RELATIVE_EXPIRY);
-
-		if absolute_expiry.unwrap_or(Duration::MAX) <= max_short_lived_absolute_expiry {
-			self.create_compact_blinded_paths(context)
-		} else {
-			self.create_blinded_paths(MessageContext::Offers(context))
-		}
-	}
-
+	#[cfg(any(test, async_payments))]
 	pub(super) fn duration_since_epoch(&self) -> Duration {
 		#[cfg(not(feature = "std"))]
 		let now = Duration::from_secs(
@@ -10891,36 +10707,8 @@ where
 		now
 	}
 
-	/// Creates a collection of blinded paths by delegating to
-	/// [`MessageRouter::create_blinded_paths`].
-	///
-	/// Errors if the `MessageRouter` errors.
-	fn create_blinded_paths(&self, context: MessageContext) -> Result<Vec<BlindedMessagePath>, ()> {
-		let recipient = self.get_our_node_id();
-		let secp_ctx = &self.secp_ctx;
-
-		let peers = self.per_peer_state.read().unwrap()
-			.iter()
-			.map(|(node_id, peer_state)| (node_id, peer_state.lock().unwrap()))
-			.filter(|(_, peer)| peer.is_connected)
-			.filter(|(_, peer)| peer.latest_features.supports_onion_messages())
-			.map(|(node_id, _)| *node_id)
-			.collect::<Vec<_>>();
-
-		self.message_router
-			.create_blinded_paths(recipient, context, peers, secp_ctx)
-			.and_then(|paths| (!paths.is_empty()).then(|| paths).ok_or(()))
-	}
-
-	/// Creates a collection of blinded paths by delegating to
-	/// [`MessageRouter::create_compact_blinded_paths`].
-	///
-	/// Errors if the `MessageRouter` errors.
-	fn create_compact_blinded_paths(&self, context: OffersContext) -> Result<Vec<BlindedMessagePath>, ()> {
-		let recipient = self.get_our_node_id();
-		let secp_ctx = &self.secp_ctx;
-
-		let peers = self.per_peer_state.read().unwrap()
+	fn get_peers_for_blinded_path(&self) -> Vec<MessageForwardNode> {
+		self.per_peer_state.read().unwrap()
 			.iter()
 			.map(|(node_id, peer_state)| (node_id, peer_state.lock().unwrap()))
 			.filter(|(_, peer)| peer.is_connected)
@@ -10933,55 +10721,20 @@ where
 					.min_by_key(|(_, channel)| channel.context().channel_creation_height)
 					.and_then(|(_, channel)| channel.context().get_short_channel_id()),
 			})
-			.collect::<Vec<_>>();
-
-		self.message_router
-			.create_compact_blinded_paths(recipient, MessageContext::Offers(context), peers, secp_ctx)
-			.and_then(|paths| (!paths.is_empty()).then(|| paths).ok_or(()))
+			.collect::<Vec<_>>()
 	}
 
+	#[cfg(all(test, async_payments))]
 	/// Creates multi-hop blinded payment paths for the given `amount_msats` by delegating to
 	/// [`Router::create_blinded_payment_paths`].
-	fn create_blinded_payment_paths(
+	pub(super) fn test_create_blinded_payment_paths(
 		&self, amount_msats: Option<u64>, payment_secret: PaymentSecret, payment_context: PaymentContext,
 		relative_expiry_seconds: u32
 	) -> Result<Vec<BlindedPaymentPath>, ()> {
-		let expanded_key = &self.inbound_payment_key;
 		let entropy = &*self.entropy_source;
-		let secp_ctx = &self.secp_ctx;
-
-		let first_hops = self.list_usable_channels();
-		let payee_node_id = self.get_our_node_id();
-
-		// Assume shorter than usual block times to avoid spuriously failing payments too early.
-		const SECONDS_PER_BLOCK: u32 = 9 * 60;
-		let relative_expiry_blocks = relative_expiry_seconds / SECONDS_PER_BLOCK;
-		let max_cltv_expiry = core::cmp::max(relative_expiry_blocks, CLTV_FAR_FAR_AWAY)
-			.saturating_add(LATENCY_GRACE_PERIOD_BLOCKS)
-			.saturating_add(self.best_block.read().unwrap().height);
-
-		let payee_tlvs = UnauthenticatedReceiveTlvs {
-			payment_secret,
-			payment_constraints: PaymentConstraints {
-				max_cltv_expiry,
-				htlc_minimum_msat: 1,
-			},
-			payment_context,
-		};
-		let nonce = Nonce::from_entropy_source(entropy);
-		let payee_tlvs = payee_tlvs.authenticate(nonce, expanded_key);
 
-		self.router.create_blinded_payment_paths(
-			payee_node_id, first_hops, payee_tlvs, amount_msats, secp_ctx
-		)
-	}
-
-	#[cfg(all(test, async_payments))]
-	pub(super) fn test_create_blinded_payment_paths(
-		&self, amount_msats: Option<u64>, payment_secret: PaymentSecret, payment_context: PaymentContext,
-		relative_expiry_seconds: u32
-	) -> Result<Vec<BlindedPaymentPath>, ()> {
-		self.create_blinded_payment_paths(
+		self.flow.test_create_blinded_payment_paths(
+			&self.router, entropy, self.list_usable_channels(),
 			amount_msats, payment_secret, payment_context, relative_expiry_seconds
 		)
 	}
@@ -11654,10 +11407,8 @@ where
 			}
 		}
 		max_time!(self.highest_seen_timestamp);
-		#[cfg(feature = "dnssec")] {
-			let timestamp = self.highest_seen_timestamp.load(Ordering::Relaxed) as u32;
-			self.hrn_resolver.new_best_block(height, timestamp);
-		}
+
+		self.flow.best_block_updated(header, height);
 	}
 
 	fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
@@ -12444,29 +12195,11 @@ where
 			.release_invoice_requests_awaiting_invoice()
 		{
 			let RetryableInvoiceRequest { invoice_request, nonce, .. } = retryable_invoice_request;
-			let hmac = payment_id.hmac_for_offer_payment(nonce, &self.inbound_payment_key);
-			let context = MessageContext::Offers(OffersContext::OutboundPayment {
-				payment_id,
-				nonce,
-				hmac: Some(hmac)
-			});
-			match self.create_blinded_paths(context) {
-				Ok(reply_paths) => match self.enqueue_invoice_request(invoice_request, reply_paths) {
-					Ok(_) => {}
-					Err(_) => {
-						log_warn!(self.logger,
-							"Retry failed for an invoice request with payment_id: {}",
-							payment_id
-						);
-					}
-				},
-				Err(_) => {
-					log_warn!(self.logger,
-						"Retry failed for an invoice request with payment_id: {}. \
-							Reason: router could not find a blinded path to include as the reply path",
-						payment_id
-					);
-				}
+
+			if self.flow.enqueue_invoice_request(
+				invoice_request, payment_id, nonce, self.get_peers_for_blinded_path()
+			).is_err() {
+				log_warn!(self.logger, "Retry failed for invoice request with payment_id {}", payment_id);
 			}
 		}
 	}
@@ -12488,7 +12221,6 @@ where
 	fn handle_message(
 		&self, message: OffersMessage, context: Option<OffersContext>, responder: Option<Responder>,
 	) -> Option<(OffersMessage, ResponseInstruction)> {
-		let secp_ctx = &self.secp_ctx;
 		let expanded_key = &self.inbound_payment_key;
 
 		macro_rules! handle_pay_invoice_res {
@@ -12533,23 +12265,9 @@ where
 					None => return None,
 				};
 
-				let nonce = match context {
-					None if invoice_request.metadata().is_some() => None,
-					Some(OffersContext::InvoiceRequest { nonce }) => Some(nonce),
-					_ => return None,
-				};
-
-				let invoice_request = match nonce {
-					Some(nonce) => match invoice_request.verify_using_recipient_data(
-						nonce, expanded_key, secp_ctx,
-					) {
-						Ok(invoice_request) => invoice_request,
-						Err(()) => return None,
-					},
-					None => match invoice_request.verify_using_metadata(expanded_key, secp_ctx) {
-						Ok(invoice_request) => invoice_request,
-						Err(()) => return None,
-					},
+				let invoice_request = match self.flow.verify_invoice_request(invoice_request, context) {
+					Ok(invoice_request) => invoice_request,
+					Err(_) => return None,
 				};
 
 				let amount_msats = match InvoiceBuilder::<DerivedSigningPubkey>::amount_msats(
@@ -12570,72 +12288,19 @@ where
 					},
 				};
 
-				let payment_context = PaymentContext::Bolt12Offer(Bolt12OfferContext {
-					offer_id: invoice_request.offer_id,
-					invoice_request: invoice_request.fields(),
-				});
-				let payment_paths = match self.create_blinded_payment_paths(
-					Some(amount_msats), payment_secret, payment_context, relative_expiry
-				) {
-					Ok(payment_paths) => payment_paths,
-					Err(()) => {
-						let error = Bolt12SemanticError::MissingPaths;
-						return Some((OffersMessage::InvoiceError(error.into()), responder.respond()));
-					},
-				};
-
-				#[cfg(not(feature = "std"))]
-				let created_at = Duration::from_secs(
-					self.highest_seen_timestamp.load(Ordering::Acquire) as u64
+				let entropy = &*self.entropy_source;
+				let (response, context) = self.flow.create_response_for_invoice_request(
+					&self.node_signer, &self.router, entropy, invoice_request, amount_msats,
+					payment_hash, payment_secret, self.list_usable_channels()
 				);
 
-				let response = if invoice_request.keys.is_some() {
-					#[cfg(feature = "std")]
-					let builder = invoice_request.respond_using_derived_keys(
-						payment_paths, payment_hash
-					);
-					#[cfg(not(feature = "std"))]
-					let builder = invoice_request.respond_using_derived_keys_no_std(
-						payment_paths, payment_hash, created_at
-					);
-					builder
-						.map(InvoiceBuilder::<DerivedSigningPubkey>::from)
-						.and_then(|builder| builder.allow_mpp().build_and_sign(secp_ctx))
-						.map_err(InvoiceError::from)
-				} else {
-					#[cfg(feature = "std")]
-					let builder = invoice_request.respond_with(payment_paths, payment_hash);
-					#[cfg(not(feature = "std"))]
-					let builder = invoice_request.respond_with_no_std(
-						payment_paths, payment_hash, created_at
-					);
-					builder
-						.map(InvoiceBuilder::<ExplicitSigningPubkey>::from)
-						.and_then(|builder| builder.allow_mpp().build())
-						.map_err(InvoiceError::from)
-						.and_then(|invoice| {
-							#[cfg(c_bindings)]
-							let mut invoice = invoice;
-							invoice
-								.sign(|invoice: &UnsignedBolt12Invoice|
-									self.node_signer.sign_bolt12_invoice(invoice)
-								)
-								.map_err(InvoiceError::from)
-						})
-				};
-
-				match response {
-					Ok(invoice) => {
-						let nonce = Nonce::from_entropy_source(&*self.entropy_source);
-						let hmac = payment_hash.hmac_for_offer_payment(nonce, expanded_key);
-						let context = MessageContext::Offers(OffersContext::InboundPayment { payment_hash, nonce, hmac });
-						Some((OffersMessage::Invoice(invoice), responder.respond_with_reply_path(context)))
-					},
-					Err(error) => Some((OffersMessage::InvoiceError(error.into()), responder.respond())),
+				match context {
+					Some(context) => Some((response, responder.respond_with_reply_path(context))),
+					None => Some((response, responder.respond()))
 				}
 			},
 			OffersMessage::Invoice(invoice) => {
-				let payment_id = match self.verify_bolt12_invoice(&invoice, context.as_ref()) {
+				let payment_id = match self.flow.verify_bolt12_invoice(&invoice, context.as_ref()) {
 					Ok(payment_id) => payment_id,
 					Err(()) => return None,
 				};
@@ -12705,7 +12370,7 @@ where
 	}
 
 	fn release_pending_messages(&self) -> Vec<(OffersMessage, MessageSendInstructions)> {
-		core::mem::take(&mut self.pending_offers_messages.lock().unwrap())
+		self.flow.release_pending_offers_messages()
 	}
 }
 
@@ -12727,15 +12392,7 @@ where
 		_responder: Option<Responder>
 	) -> Option<(ReleaseHeldHtlc, ResponseInstruction)> {
 		#[cfg(async_payments)] {
-			match _context {
-				AsyncPaymentsContext::InboundPayment { nonce, hmac, path_absolute_expiry } => {
-					if let Err(()) = signer::verify_held_htlc_available_context(
-						nonce, hmac, &self.inbound_payment_key
-					) { return None }
-					if self.duration_since_epoch() > path_absolute_expiry { return None }
-				},
-				_ => return None
-			}
+			self.flow.verify_inbound_async_payment_context(_context).ok()?;
 			return _responder.map(|responder| (ReleaseHeldHtlc {}, responder.respond()))
 		}
 		#[cfg(not(async_payments))]
@@ -12744,23 +12401,18 @@ where
 
 	fn handle_release_held_htlc(&self, _message: ReleaseHeldHtlc, _context: AsyncPaymentsContext) {
 		#[cfg(async_payments)] {
-			let (payment_id, nonce, hmac) = match _context {
-				AsyncPaymentsContext::OutboundPayment { payment_id, hmac, nonce } => {
-					(payment_id, nonce, hmac)
-				},
-				_ => return
-			};
-			if payment_id.verify_for_async_payment(hmac, nonce, &self.inbound_payment_key).is_err() { return }
-			if let Err(e) = self.send_payment_for_static_invoice(payment_id) {
-				log_trace!(
-					self.logger, "Failed to release held HTLC with payment id {}: {:?}", payment_id, e
-				);
+			if let Ok(payment_id) = self.flow.verify_outbound_async_payment_context(_context) {
+				if let Err(e) = self.send_payment_for_static_invoice(payment_id) {
+					log_trace!(
+						self.logger, "Failed to release held HTLC with payment id {}: {:?}", payment_id, e
+					);
+				}
 			}
 		}
 	}
 
 	fn release_pending_messages(&self) -> Vec<(AsyncPaymentsMessage, MessageSendInstructions)> {
-		core::mem::take(&mut self.pending_async_payments_messages.lock().unwrap())
+		self.flow.release_pending_async_messages()
 	}
 }
 
@@ -12785,7 +12437,7 @@ where
 	}
 
 	fn handle_dnssec_proof(&self, message: DNSSECProof, context: DNSResolverContext) {
-		let offer_opt = self.hrn_resolver.handle_dnssec_proof_for_offer(message, context);
+		let offer_opt = self.flow.hrn_resolver.handle_dnssec_proof_for_offer(message, context);
 		#[cfg_attr(not(feature = "_test_utils"), allow(unused_mut))]
 		if let Some((completed_requests, mut offer)) = offer_opt {
 			for (name, payment_id) in completed_requests {
@@ -12824,7 +12476,7 @@ where
 	}
 
 	fn release_pending_messages(&self) -> Vec<(DNSResolverMessage, MessageSendInstructions)> {
-		core::mem::take(&mut self.pending_dns_onion_messages.lock().unwrap())
+		self.flow.release_pending_dns_messages()
 	}
 }
 
@@ -12846,27 +12498,6 @@ where
 	}
 }
 
-fn enqueue_onion_message_with_reply_paths<T: OnionMessageContents + Clone>(
-	message: T, message_paths: &[BlindedMessagePath], reply_paths: Vec<BlindedMessagePath>,
-	queue: &mut Vec<(T, MessageSendInstructions)>
-) {
-	reply_paths
-		.iter()
-		.flat_map(|reply_path|
-			message_paths
-				.iter()
-				.map(move |path| (path, reply_path))
-		)
-		.take(OFFERS_MESSAGE_REQUEST_LIMIT)
-		.for_each(|(path, reply_path)| {
-			let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
-				destination: Destination::BlindedPath(path.clone()),
-				reply_path: reply_path.clone(),
-			};
-			queue.push((message.clone(), instructions));
-		});
-}
-
 /// Fetches the set of [`NodeFeatures`] flags that are provided by or required by
 /// [`ChannelManager`].
 pub(crate) fn provided_node_features(config: &UserConfig) -> NodeFeatures {
@@ -13695,6 +13326,8 @@ where
 	pub router: R,
 	/// The [`MessageRouter`] used for constructing [`BlindedMessagePath`]s for [`Offer`]s,
 	/// [`Refund`]s, and any reply paths.
+	///
+	/// [`BlindedMessagePath`]: crate::blinded_path::message::BlindedMessagePath
 	pub message_router: MR,
 	/// The Logger for use in the ChannelManager and which may be used to log information during
 	/// deserialization.
@@ -14752,15 +14385,22 @@ where
 			}
 		}
 
+		let best_block = BestBlock::new(best_block_hash, best_block_height);
+		let flow = OffersMessageFlow::new(
+			chain_hash, best_block, our_network_pubkey,
+			highest_seen_timestamp, expanded_inbound_key,
+			secp_ctx.clone(), args.message_router
+		);
+
 		let channel_manager = ChannelManager {
 			chain_hash,
 			fee_estimator: bounded_fee_estimator,
 			chain_monitor: args.chain_monitor,
 			tx_broadcaster: args.tx_broadcaster,
 			router: args.router,
-			message_router: args.message_router,
+			flow,
 
-			best_block: RwLock::new(BestBlock::new(best_block_hash, best_block_height)),
+			best_block: RwLock::new(best_block),
 
 			inbound_payment_key: expanded_inbound_key,
 			pending_outbound_payments: pending_outbounds,
@@ -14797,8 +14437,6 @@ where
 
 			funding_batch_states: Mutex::new(BTreeMap::new()),
 
-			pending_offers_messages: Mutex::new(Vec::new()),
-			pending_async_payments_messages: Mutex::new(Vec::new()),
 
 			pending_broadcast_messages: Mutex::new(Vec::new()),
 
@@ -14811,11 +14449,6 @@ where
 			logger: args.logger,
 			default_configuration: args.default_config,
 
-			#[cfg(feature = "dnssec")]
-			hrn_resolver: OMNameResolver::new(highest_seen_timestamp, best_block_height),
-			#[cfg(feature = "dnssec")]
-			pending_dns_onion_messages: Mutex::new(Vec::new()),
-
 			#[cfg(feature = "_test_utils")]
 			testing_dnssec_proof_offer_resolution_override: Mutex::new(new_hash_map()),
 		};
diff --git a/lightning/src/ln/inbound_payment.rs b/lightning/src/ln/inbound_payment.rs
index 53b212428ca..51146c1b6f1 100644
--- a/lightning/src/ln/inbound_payment.rs
+++ b/lightning/src/ln/inbound_payment.rs
@@ -236,7 +236,7 @@ pub(super) fn create_for_spontaneous_payment(
 	Ok(construct_payment_secret(&iv_bytes, &metadata_bytes, &keys.metadata_key))
 }
 
-pub(super) fn calculate_absolute_expiry(
+pub(crate) fn calculate_absolute_expiry(
 	highest_seen_timestamp: u64, invoice_expiry_delta_secs: u32,
 ) -> u64 {
 	// We assume that highest_seen_timestamp is pretty close to the current time - it's updated when
diff --git a/lightning/src/ln/offers_tests.rs b/lightning/src/ln/offers_tests.rs
index f8649111a0c..b0b8a8c7d35 100644
--- a/lightning/src/ln/offers_tests.rs
+++ b/lightning/src/ln/offers_tests.rs
@@ -1407,7 +1407,7 @@ fn fails_authentication_when_handling_invoice_request() {
 	expect_recent_payment!(david, RecentPaymentDetails::AwaitingInvoice, payment_id);
 
 	connect_peers(david, alice);
-	match &mut david.node.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
+	match &mut david.node.flow.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
 		MessageSendInstructions::WithSpecifiedReplyPath { destination, .. } =>
 			*destination = Destination::Node(alice_id),
 		_ => panic!(),
@@ -1432,7 +1432,7 @@ fn fails_authentication_when_handling_invoice_request() {
 		.unwrap();
 	expect_recent_payment!(david, RecentPaymentDetails::AwaitingInvoice, payment_id);
 
-	match &mut david.node.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
+	match &mut david.node.flow.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
 		MessageSendInstructions::WithSpecifiedReplyPath { destination, .. } =>
 			*destination = Destination::BlindedPath(invalid_path),
 		_ => panic!(),
@@ -1512,7 +1512,7 @@ fn fails_authentication_when_handling_invoice_for_offer() {
 
 	// Don't send the invoice request, but grab its reply path to use with a different request.
 	let invalid_reply_path = {
-		let mut pending_offers_messages = david.node.pending_offers_messages.lock().unwrap();
+		let mut pending_offers_messages = david.node.flow.pending_offers_messages.lock().unwrap();
 		let pending_invoice_request = pending_offers_messages.pop().unwrap();
 		pending_offers_messages.clear();
 		match pending_invoice_request.1 {
@@ -1529,7 +1529,7 @@ fn fails_authentication_when_handling_invoice_for_offer() {
 	// Swap out the reply path to force authentication to fail when handling the invoice since it
 	// will be sent over the wrong blinded path.
 	{
-		let mut pending_offers_messages = david.node.pending_offers_messages.lock().unwrap();
+		let mut pending_offers_messages = david.node.flow.pending_offers_messages.lock().unwrap();
 		let mut pending_invoice_request = pending_offers_messages.first_mut().unwrap();
 		match &mut pending_invoice_request.1 {
 			MessageSendInstructions::WithSpecifiedReplyPath { reply_path, .. } =>
@@ -1616,7 +1616,7 @@ fn fails_authentication_when_handling_invoice_for_refund() {
 	let expected_invoice = alice.node.request_refund_payment(&refund).unwrap();
 
 	connect_peers(david, alice);
-	match &mut alice.node.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
+	match &mut alice.node.flow.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
 		MessageSendInstructions::WithSpecifiedReplyPath { destination, .. } =>
 			*destination = Destination::Node(david_id),
 		_ => panic!(),
@@ -1647,7 +1647,7 @@ fn fails_authentication_when_handling_invoice_for_refund() {
 
 	let expected_invoice = alice.node.request_refund_payment(&refund).unwrap();
 
-	match &mut alice.node.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
+	match &mut alice.node.flow.pending_offers_messages.lock().unwrap().first_mut().unwrap().1 {
 		MessageSendInstructions::WithSpecifiedReplyPath { destination, .. } =>
 			*destination = Destination::BlindedPath(invalid_path),
 		_ => panic!(),
@@ -2238,7 +2238,7 @@ fn fails_paying_invoice_with_unknown_required_features() {
 		destination: Destination::BlindedPath(reply_path),
 	};
 	let message = OffersMessage::Invoice(invoice);
-	alice.node.pending_offers_messages.lock().unwrap().push((message, instructions));
+	alice.node.flow.pending_offers_messages.lock().unwrap().push((message, instructions));
 
 	let onion_message = alice.onion_messenger.next_onion_message_for_peer(charlie_id).unwrap();
 	charlie.onion_messenger.handle_onion_message(alice_id, &onion_message);
diff --git a/lightning/src/offers/flow.rs b/lightning/src/offers/flow.rs
new file mode 100644
index 00000000000..38f674141b1
--- /dev/null
+++ b/lightning/src/offers/flow.rs
@@ -0,0 +1,1085 @@
+// This file is Copyright its original authors, visible in version control
+// history.
+//
+// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
+// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
+// You may not use this file except in accordance with one or both of these
+// licenses.
+
+//! Provides data structures and functions for creating and managing Offers messages,
+//! facilitating communication, and handling BOLT12 messages and payments.
+
+use core::ops::Deref;
+use core::sync::atomic::{AtomicUsize, Ordering};
+use core::time::Duration;
+
+use bitcoin::block::Header;
+use bitcoin::constants::ChainHash;
+use bitcoin::secp256k1::{self, PublicKey, Secp256k1};
+
+use crate::blinded_path::message::{
+	BlindedMessagePath, MessageContext, MessageForwardNode, OffersContext,
+};
+use crate::blinded_path::payment::{
+	BlindedPaymentPath, Bolt12OfferContext, Bolt12RefundContext, PaymentConstraints,
+	PaymentContext, UnauthenticatedReceiveTlvs,
+};
+use crate::chain::channelmonitor::LATENCY_GRACE_PERIOD_BLOCKS;
+
+#[allow(unused_imports)]
+use crate::prelude::*;
+
+use crate::chain::BestBlock;
+use crate::ln::channel_state::ChannelDetails;
+use crate::ln::channelmanager::{
+	Verification, {PaymentId, CLTV_FAR_FAR_AWAY, MAX_SHORT_LIVED_RELATIVE_EXPIRY},
+};
+use crate::ln::inbound_payment;
+use crate::offers::invoice::{
+	Bolt12Invoice, DerivedSigningPubkey, ExplicitSigningPubkey, InvoiceBuilder,
+	UnsignedBolt12Invoice, DEFAULT_RELATIVE_EXPIRY,
+};
+use crate::offers::invoice_error::InvoiceError;
+use crate::offers::invoice_request::{
+	InvoiceRequest, InvoiceRequestBuilder, VerifiedInvoiceRequest,
+};
+use crate::offers::nonce::Nonce;
+use crate::offers::offer::{DerivedMetadata, Offer, OfferBuilder};
+use crate::offers::parse::Bolt12SemanticError;
+use crate::offers::refund::{Refund, RefundBuilder};
+use crate::onion_message::async_payments::AsyncPaymentsMessage;
+use crate::onion_message::messenger::{Destination, MessageRouter, MessageSendInstructions};
+use crate::onion_message::offers::OffersMessage;
+use crate::onion_message::packet::OnionMessageContents;
+use crate::routing::router::Router;
+use crate::sign::{EntropySource, NodeSigner};
+use crate::sync::{Mutex, RwLock};
+use crate::types::payment::{PaymentHash, PaymentSecret};
+
+#[cfg(async_payments)]
+use {
+	crate::blinded_path::message::AsyncPaymentsContext,
+	crate::blinded_path::payment::AsyncBolt12OfferContext,
+	crate::offers::offer::Amount,
+	crate::offers::signer,
+	crate::offers::static_invoice::{StaticInvoice, StaticInvoiceBuilder},
+	crate::onion_message::async_payments::HeldHtlcAvailable,
+};
+
+#[cfg(feature = "dnssec")]
+use {
+	crate::blinded_path::message::DNSResolverContext,
+	crate::onion_message::dns_resolution::{DNSResolverMessage, DNSSECQuery, OMNameResolver},
+};
+
+/// A BOLT12 offers code and flow utility provider, which facilitates
+/// BOLT12 builder generation and onion message handling.
+///
+/// [`OffersMessageFlow`] is parameterized by a [`MessageRouter`], which is responsible
+/// for finding message paths when initiating and retrying onion messages.
+pub struct OffersMessageFlow<MR: Deref>
+where
+	MR::Target: MessageRouter,
+{
+	chain_hash: ChainHash,
+	best_block: RwLock<BestBlock>,
+
+	our_network_pubkey: PublicKey,
+	highest_seen_timestamp: AtomicUsize,
+	inbound_payment_key: inbound_payment::ExpandedKey,
+
+	secp_ctx: Secp256k1<secp256k1::All>,
+	message_router: MR,
+
+	#[cfg(not(any(test, feature = "_test_utils")))]
+	pending_offers_messages: Mutex<Vec<(OffersMessage, MessageSendInstructions)>>,
+	#[cfg(any(test, feature = "_test_utils"))]
+	pub(crate) pending_offers_messages: Mutex<Vec<(OffersMessage, MessageSendInstructions)>>,
+
+	pending_async_payments_messages: Mutex<Vec<(AsyncPaymentsMessage, MessageSendInstructions)>>,
+
+	#[cfg(feature = "dnssec")]
+	pub(crate) hrn_resolver: OMNameResolver,
+	#[cfg(feature = "dnssec")]
+	pending_dns_onion_messages: Mutex<Vec<(DNSResolverMessage, MessageSendInstructions)>>,
+}
+
+impl<MR: Deref> OffersMessageFlow<MR>
+where
+	MR::Target: MessageRouter,
+{
+	/// Creates a new [`OffersMessageFlow`]
+	pub fn new(
+		chain_hash: ChainHash, best_block: BestBlock, our_network_pubkey: PublicKey,
+		current_timestamp: u32, inbound_payment_key: inbound_payment::ExpandedKey,
+		secp_ctx: Secp256k1<secp256k1::All>, message_router: MR,
+	) -> Self {
+		Self {
+			chain_hash,
+			best_block: RwLock::new(best_block),
+
+			our_network_pubkey,
+			highest_seen_timestamp: AtomicUsize::new(current_timestamp as usize),
+			inbound_payment_key,
+
+			secp_ctx,
+			message_router,
+
+			pending_offers_messages: Mutex::new(Vec::new()),
+			pending_async_payments_messages: Mutex::new(Vec::new()),
+
+			#[cfg(feature = "dnssec")]
+			hrn_resolver: OMNameResolver::new(current_timestamp, best_block.height),
+			#[cfg(feature = "dnssec")]
+			pending_dns_onion_messages: Mutex::new(Vec::new()),
+		}
+	}
+
+	/// Gets the node_id held by this [`OffersMessageFlow`]`
+	fn get_our_node_id(&self) -> PublicKey {
+		self.our_network_pubkey
+	}
+
+	fn duration_since_epoch(&self) -> Duration {
+		#[cfg(not(feature = "std"))]
+		let now = Duration::from_secs(self.highest_seen_timestamp.load(Ordering::Acquire) as u64);
+		#[cfg(feature = "std")]
+		let now = std::time::SystemTime::now()
+			.duration_since(std::time::SystemTime::UNIX_EPOCH)
+			.expect("SystemTime::now() should come after SystemTime::UNIX_EPOCH");
+		now
+	}
+
+	/// Notifies the [`OffersMessageFlow`] that a new block has been observed.
+	///
+	/// This allows the flow to keep in sync with the latest block timestamp,
+	/// which may be used for time-sensitive operations.
+	///
+	/// Must be called whenever a new chain tip becomes available. May be skipped
+	/// for intermediary blocks.
+	pub fn best_block_updated(&self, header: &Header, _height: u32) {
+		let timestamp = &self.highest_seen_timestamp;
+		let block_time = header.time as usize;
+
+		loop {
+			// Update timestamp to be the max of its current value and the block
+			// timestamp. This should keep us close to the current time without relying on
+			// having an explicit local time source.
+			// Just in case we end up in a race, we loop until we either successfully
+			// update timestamp or decide we don't need to.
+			let old_serial = timestamp.load(Ordering::Acquire);
+			if old_serial >= block_time {
+				break;
+			}
+			if timestamp
+				.compare_exchange(old_serial, block_time, Ordering::AcqRel, Ordering::Relaxed)
+				.is_ok()
+			{
+				break;
+			}
+		}
+
+		#[cfg(feature = "dnssec")]
+		{
+			let updated_time = timestamp.load(Ordering::Acquire) as u32;
+			self.hrn_resolver.new_best_block(_height, updated_time);
+		}
+	}
+}
+
+/// Defines the maximum number of [`OffersMessage`] including different reply paths to be sent
+/// along different paths.
+/// Sending multiple requests increases the chances of successful delivery in case some
+/// paths are unavailable. However, only one invoice for a given [`PaymentId`] will be paid,
+/// even if multiple invoices are received.
+const OFFERS_MESSAGE_REQUEST_LIMIT: usize = 10;
+
+impl<MR: Deref> OffersMessageFlow<MR>
+where
+	MR::Target: MessageRouter,
+{
+	/// Creates a collection of blinded paths by delegating to [`MessageRouter`] based on
+	/// the path's intended lifetime.
+	///
+	/// Whether or not the path is compact depends on whether the path is short-lived or long-lived,
+	/// respectively, based on the given `absolute_expiry` as seconds since the Unix epoch. See
+	/// [`MAX_SHORT_LIVED_RELATIVE_EXPIRY`].
+	fn create_blinded_paths_using_absolute_expiry(
+		&self, context: OffersContext, absolute_expiry: Option<Duration>,
+		peers: Vec<MessageForwardNode>,
+	) -> Result<Vec<BlindedMessagePath>, ()> {
+		let now = self.duration_since_epoch();
+		let max_short_lived_absolute_expiry = now.saturating_add(MAX_SHORT_LIVED_RELATIVE_EXPIRY);
+
+		if absolute_expiry.unwrap_or(Duration::MAX) <= max_short_lived_absolute_expiry {
+			self.create_compact_blinded_paths(peers, context)
+		} else {
+			self.create_blinded_paths(peers, MessageContext::Offers(context))
+		}
+	}
+
+	/// Creates a collection of blinded paths by delegating to
+	/// [`MessageRouter::create_blinded_paths`].
+	///
+	/// Errors if the `MessageRouter` errors.
+	fn create_blinded_paths(
+		&self, peers: Vec<MessageForwardNode>, context: MessageContext,
+	) -> Result<Vec<BlindedMessagePath>, ()> {
+		let recipient = self.get_our_node_id();
+		let secp_ctx = &self.secp_ctx;
+
+		let peers = peers.into_iter().map(|node| node.node_id).collect();
+		self.message_router
+			.create_blinded_paths(recipient, context, peers, secp_ctx)
+			.and_then(|paths| (!paths.is_empty()).then(|| paths).ok_or(()))
+	}
+
+	/// Creates a collection of blinded paths by delegating to
+	/// [`MessageRouter::create_compact_blinded_paths`].
+	///
+	/// Errors if the `MessageRouter` errors.
+	fn create_compact_blinded_paths(
+		&self, peers: Vec<MessageForwardNode>, context: OffersContext,
+	) -> Result<Vec<BlindedMessagePath>, ()> {
+		let recipient = self.get_our_node_id();
+		let secp_ctx = &self.secp_ctx;
+
+		self.message_router
+			.create_compact_blinded_paths(
+				recipient,
+				MessageContext::Offers(context),
+				peers,
+				secp_ctx,
+			)
+			.and_then(|paths| (!paths.is_empty()).then(|| paths).ok_or(()))
+	}
+
+	/// Creates multi-hop blinded payment paths for the given `amount_msats` by delegating to
+	/// [`Router::create_blinded_payment_paths`].
+	fn create_blinded_payment_paths<ES: Deref, R: Deref>(
+		&self, router: &R, entropy_source: ES, usable_channels: Vec<ChannelDetails>,
+		amount_msats: Option<u64>, payment_secret: PaymentSecret, payment_context: PaymentContext,
+		relative_expiry_seconds: u32,
+	) -> Result<Vec<BlindedPaymentPath>, ()>
+	where
+		ES::Target: EntropySource,
+		R::Target: Router,
+	{
+		let expanded_key = &self.inbound_payment_key;
+		let entropy = &*entropy_source;
+		let secp_ctx = &self.secp_ctx;
+
+		let payee_node_id = self.get_our_node_id();
+
+		// Assume shorter than usual block times to avoid spuriously failing payments too early.
+		const SECONDS_PER_BLOCK: u32 = 9 * 60;
+		let relative_expiry_blocks = relative_expiry_seconds / SECONDS_PER_BLOCK;
+		let max_cltv_expiry = core::cmp::max(relative_expiry_blocks, CLTV_FAR_FAR_AWAY)
+			.saturating_add(LATENCY_GRACE_PERIOD_BLOCKS)
+			.saturating_add(self.best_block.read().unwrap().height);
+
+		let payee_tlvs = UnauthenticatedReceiveTlvs {
+			payment_secret,
+			payment_constraints: PaymentConstraints { max_cltv_expiry, htlc_minimum_msat: 1 },
+			payment_context,
+		};
+		let nonce = Nonce::from_entropy_source(entropy);
+		let payee_tlvs = payee_tlvs.authenticate(nonce, expanded_key);
+
+		router.create_blinded_payment_paths(
+			payee_node_id,
+			usable_channels,
+			payee_tlvs,
+			amount_msats,
+			secp_ctx,
+		)
+	}
+
+	#[cfg(all(test, async_payments))]
+	/// Creates multi-hop blinded payment paths for the given `amount_msats` by delegating to
+	/// [`Router::create_blinded_payment_paths`].
+	pub(crate) fn test_create_blinded_payment_paths<ES: Deref, R: Deref>(
+		&self, router: &R, entropy_source: ES, usable_channels: Vec<ChannelDetails>,
+		amount_msats: Option<u64>, payment_secret: PaymentSecret, payment_context: PaymentContext,
+		relative_expiry_seconds: u32,
+	) -> Result<Vec<BlindedPaymentPath>, ()>
+	where
+		ES::Target: EntropySource,
+		R::Target: Router,
+	{
+		self.create_blinded_payment_paths(
+			router,
+			entropy_source,
+			usable_channels,
+			amount_msats,
+			payment_secret,
+			payment_context,
+			relative_expiry_seconds,
+		)
+	}
+}
+
+fn enqueue_onion_message_with_reply_paths<T: OnionMessageContents + Clone>(
+	message: T, message_paths: &[BlindedMessagePath], reply_paths: Vec<BlindedMessagePath>,
+	queue: &mut Vec<(T, MessageSendInstructions)>,
+) {
+	reply_paths
+		.iter()
+		.flat_map(|reply_path| message_paths.iter().map(move |path| (path, reply_path)))
+		.take(OFFERS_MESSAGE_REQUEST_LIMIT)
+		.for_each(|(path, reply_path)| {
+			let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
+				destination: Destination::BlindedPath(path.clone()),
+				reply_path: reply_path.clone(),
+			};
+			queue.push((message.clone(), instructions));
+		});
+}
+
+impl<MR: Deref> OffersMessageFlow<MR>
+where
+	MR::Target: MessageRouter,
+{
+	/// Verifies an [`InvoiceRequest`] using the provided [`OffersContext`] or the [`InvoiceRequest::metadata`].
+	///
+	/// - If an [`OffersContext::InvoiceRequest`] with a `nonce` is provided, verification is performed using recipient context data.
+	/// - If no context is provided but the [`InvoiceRequest`] contains [`Offer`] metadata, verification is performed using that metadata.
+	/// - If neither is available, verification fails.
+	///
+	/// # Errors
+	///
+	/// Returns an error if:
+	/// - Both [`OffersContext`] and [`InvoiceRequest`] metadata are absent or invalid.
+	/// - The verification process (via recipient context data or metadata) fails.
+	pub fn verify_invoice_request(
+		&self, invoice_request: InvoiceRequest, context: Option<OffersContext>,
+	) -> Result<VerifiedInvoiceRequest, ()> {
+		let secp_ctx = &self.secp_ctx;
+		let expanded_key = &self.inbound_payment_key;
+
+		let nonce = match context {
+			None if invoice_request.metadata().is_some() => None,
+			Some(OffersContext::InvoiceRequest { nonce }) => Some(nonce),
+			_ => return Err(()),
+		};
+
+		let invoice_request = match nonce {
+			Some(nonce) => {
+				invoice_request.verify_using_recipient_data(nonce, expanded_key, secp_ctx)
+			},
+			None => invoice_request.verify_using_metadata(expanded_key, secp_ctx),
+		}?;
+
+		Ok(invoice_request)
+	}
+
+	/// Verifies a [`Bolt12Invoice`] using the provided [`OffersContext`] or the invoice's payer metadata,
+	/// returning the corresponding [`PaymentId`] if successful.
+	///
+	/// - If an [`OffersContext::OutboundPayment`] with a `nonce` is provided, verification is performed
+	///   using this to form the payer metadata.
+	/// - If no context is provided and the invoice corresponds to a [`Refund`] without blinded paths,
+	///   verification is performed using the [`Bolt12Invoice::payer_metadata`].
+	/// - If neither condition is met, verification fails.
+	pub fn verify_bolt12_invoice(
+		&self, invoice: &Bolt12Invoice, context: Option<&OffersContext>,
+	) -> Result<PaymentId, ()> {
+		let secp_ctx = &self.secp_ctx;
+		let expanded_key = &self.inbound_payment_key;
+
+		match context {
+			None if invoice.is_for_refund_without_paths() => {
+				invoice.verify_using_metadata(expanded_key, secp_ctx)
+			},
+			Some(&OffersContext::OutboundPayment { payment_id, nonce, .. }) => {
+				invoice.verify_using_payer_data(payment_id, nonce, expanded_key, secp_ctx)
+			},
+			_ => Err(()),
+		}
+	}
+
+	/// Verifies the provided [`AsyncPaymentsContext`] for an inbound [`HeldHtlcAvailable`] message.
+	///
+	/// The context is verified using the `nonce` and `hmac` values, and ensures that the context
+	/// has not expired based on `path_absolute_expiry`.
+	///
+	/// # Errors
+	///
+	/// Returns `Err(())` if:
+	/// - The HMAC verification fails for inbound context.
+	/// - The inbound payment context has expired.
+	#[cfg(async_payments)]
+	pub fn verify_inbound_async_payment_context(
+		&self, context: AsyncPaymentsContext,
+	) -> Result<(), ()> {
+		match context {
+			AsyncPaymentsContext::InboundPayment { nonce, hmac, path_absolute_expiry } => {
+				signer::verify_held_htlc_available_context(nonce, hmac, &self.inbound_payment_key)?;
+
+				if self.duration_since_epoch() > path_absolute_expiry {
+					return Err(());
+				}
+				Ok(())
+			},
+			_ => Err(()),
+		}
+	}
+
+	/// Verifies the provided [`AsyncPaymentsContext`] for an inbound [`ReleaseHeldHtlc`] message.
+	///
+	/// The context is verified using the `nonce` and `hmac` values, and if valid,
+	/// returns the associated [`PaymentId`].
+	///
+	/// # Errors
+	///
+	/// Returns `Err(())` if:
+	/// - The HMAC verification fails for outbound context.
+	///
+	/// [`ReleaseHeldHtlc`]: crate::onion_message::async_payments::ReleaseHeldHtlc
+	#[cfg(async_payments)]
+	pub fn verify_outbound_async_payment_context(
+		&self, context: AsyncPaymentsContext,
+	) -> Result<PaymentId, ()> {
+		match context {
+			AsyncPaymentsContext::OutboundPayment { payment_id, hmac, nonce } => {
+				payment_id.verify_for_async_payment(hmac, nonce, &self.inbound_payment_key)?;
+				Ok(payment_id)
+			},
+			_ => Err(()),
+		}
+	}
+
+	/// Creates an [`OfferBuilder`] such that the [`Offer`] it builds is recognized by the
+	/// [`OffersMessageFlow`], and any corresponding [`InvoiceRequest`] can be verified using
+	/// [`Self::verify_invoice_request`]. The offer will expire at `absolute_expiry` if `Some`,
+	/// or will not expire if `None`.
+	///
+	/// # Privacy
+	///
+	/// Uses [`MessageRouter`] to construct a [`BlindedMessagePath`] for the offer based on the given
+	/// `absolute_expiry` according to [`MAX_SHORT_LIVED_RELATIVE_EXPIRY`]. See those docs for
+	/// privacy implications, as well as those of the parameterized [`Router`], which implements
+	/// [`MessageRouter`].
+	///
+	/// Also uses a derived signing pubkey in the offer for recipient privacy.
+	///
+	/// # Limitations
+	///
+	/// If [`DefaultMessageRouter`] is used to parameterize the [`OffersMessageFlow`], a direct
+	/// connection to the introduction node in the responding [`InvoiceRequest`]'s reply path is required.
+	/// See the [`DefaultMessageRouter`] documentation for more details.
+	///
+	/// # Errors
+	///
+	/// Returns an error if the parameterized [`Router`] is unable to create a blinded path for the offer.
+	///
+	/// [`DefaultMessageRouter`]: crate::onion_message::messenger::DefaultMessageRouter
+	pub fn create_offer_builder<ES: Deref>(
+		&self, entropy_source: ES, absolute_expiry: Option<Duration>,
+		peers: Vec<MessageForwardNode>,
+	) -> Result<OfferBuilder<DerivedMetadata, secp256k1::All>, Bolt12SemanticError>
+	where
+		ES::Target: EntropySource,
+	{
+		let node_id = self.get_our_node_id();
+		let expanded_key = &self.inbound_payment_key;
+		let entropy = &*entropy_source;
+		let secp_ctx = &self.secp_ctx;
+
+		let nonce = Nonce::from_entropy_source(entropy);
+		let context = OffersContext::InvoiceRequest { nonce };
+
+		let path = self
+			.create_blinded_paths_using_absolute_expiry(context, absolute_expiry, peers)
+			.and_then(|paths| paths.into_iter().next().ok_or(()))
+			.map_err(|_| Bolt12SemanticError::MissingPaths)?;
+
+		let builder = OfferBuilder::deriving_signing_pubkey(node_id, expanded_key, nonce, secp_ctx)
+			.chain_hash(self.chain_hash)
+			.path(path);
+
+		let builder = match absolute_expiry {
+			None => builder,
+			Some(absolute_expiry) => builder.absolute_expiry(absolute_expiry),
+		};
+
+		Ok(builder)
+	}
+
+	/// Create an offer for receiving async payments as an often-offline recipient.
+	///
+	/// Because we may be offline when the payer attempts to request an invoice, you MUST:
+	/// 1. Provide at least 1 [`BlindedMessagePath`] terminating at an always-online node that will
+	///    serve the [`StaticInvoice`] created from this offer on our behalf.
+	/// 2. Use [`Self::create_static_invoice_builder`] to create a [`StaticInvoice`] from this
+	///    [`Offer`] plus the returned [`Nonce`], and provide the static invoice to the
+	///    aforementioned always-online node.
+	#[cfg(async_payments)]
+	pub fn create_async_receive_offer_builder<ES: Deref>(
+		&self, entropy_source: ES, message_paths_to_always_online_node: Vec<BlindedMessagePath>,
+	) -> Result<(OfferBuilder<DerivedMetadata, secp256k1::All>, Nonce), Bolt12SemanticError>
+	where
+		ES::Target: EntropySource,
+	{
+		if message_paths_to_always_online_node.is_empty() {
+			return Err(Bolt12SemanticError::MissingPaths);
+		}
+
+		let node_id = self.get_our_node_id();
+		let expanded_key = &self.inbound_payment_key;
+		let entropy = &*entropy_source;
+		let secp_ctx = &self.secp_ctx;
+
+		let nonce = Nonce::from_entropy_source(entropy);
+		let mut builder =
+			OfferBuilder::deriving_signing_pubkey(node_id, expanded_key, nonce, secp_ctx)
+				.chain_hash(self.chain_hash);
+
+		for path in message_paths_to_always_online_node {
+			builder = builder.path(path);
+		}
+
+		Ok((builder.into(), nonce))
+	}
+
+	/// Creates a [`RefundBuilder`] such that the [`Refund`] it builds is recognized by the
+	/// [`OffersMessageFlow`], and any corresponding [`Bolt12Invoice`] received for the refund
+	/// can be verified using [`Self::verify_bolt12_invoice`].
+	///
+	/// The builder will have the provided expiration set. Any changes to the expiration on the
+	/// returned builder will not be honored by [`OffersMessageFlow`]. For non-`std`, the highest seen
+	/// block time minus two hours is used for the current time when determining if the refund has
+	/// expired.
+	///
+	/// To refund can be revoked by the user prior to receiving the invoice.
+	/// If abandoned, or if an invoice is not received before expiration, the payment will fail
+	/// with an [`Event::PaymentFailed`].
+	///
+	/// If `max_total_routing_fee_msat` is not specified, the default from
+	/// [`RouteParameters::from_payment_params_and_value`] is applied.
+	///
+	/// # Privacy
+	///
+	/// Uses [`MessageRouter`] to construct a [`BlindedMessagePath`] for the refund based on the given
+	/// `absolute_expiry` according to [`MAX_SHORT_LIVED_RELATIVE_EXPIRY`]. See those docs for
+	/// privacy implications.
+	///
+	/// Also uses a derived payer id in the refund for payer privacy.
+	///
+	/// # Errors
+	///
+	/// Returns an error if:
+	/// - A duplicate `payment_id` is provided, given the caveats in the aforementioned link.
+	/// - `amount_msats` is invalid, or
+	/// - The parameterized [`Router`] is unable to create a blinded path for the refund.
+	///
+	/// [`Event::PaymentFailed`]: crate::events::Event::PaymentFailed
+	/// [`RouteParameters::from_payment_params_and_value`]: crate::routing::router::RouteParameters::from_payment_params_and_value
+	pub fn create_refund_builder<ES: Deref>(
+		&self, entropy_source: ES, amount_msats: u64, absolute_expiry: Duration,
+		payment_id: PaymentId, peers: Vec<MessageForwardNode>,
+	) -> Result<RefundBuilder<secp256k1::All>, Bolt12SemanticError>
+	where
+		ES::Target: EntropySource,
+	{
+		let node_id = self.get_our_node_id();
+		let expanded_key = &self.inbound_payment_key;
+		let entropy = &*entropy_source;
+		let secp_ctx = &self.secp_ctx;
+
+		let nonce = Nonce::from_entropy_source(entropy);
+		let context = OffersContext::OutboundPayment { payment_id, nonce, hmac: None };
+
+		let path = self
+			.create_blinded_paths_using_absolute_expiry(context, Some(absolute_expiry), peers)
+			.and_then(|paths| paths.into_iter().next().ok_or(()))
+			.map_err(|_| Bolt12SemanticError::MissingPaths)?;
+
+		let builder = RefundBuilder::deriving_signing_pubkey(
+			node_id,
+			expanded_key,
+			nonce,
+			secp_ctx,
+			amount_msats,
+			payment_id,
+		)?
+		.chain_hash(self.chain_hash)
+		.absolute_expiry(absolute_expiry)
+		.path(path);
+
+		Ok(builder)
+	}
+
+	/// Creates an [`InvoiceRequestBuilder`] such that the [`InvoiceRequest`] it builds is recognized
+	/// by the [`OffersMessageFlow`], and any corresponding [`Bolt12Invoice`] received in response
+	/// can be verified using [`Self::verify_bolt12_invoice`].
+	///
+	/// # Nonce
+	/// The nonce is used to create a unique [`InvoiceRequest::payer_metadata`] for the invoice request.
+	/// These will be used to verify the corresponding [`Bolt12Invoice`] when it is received.
+	pub fn create_invoice_request_builder<'a>(
+		&'a self, offer: &'a Offer, nonce: Nonce, payment_id: PaymentId,
+	) -> Result<InvoiceRequestBuilder<'a, 'a, secp256k1::All>, Bolt12SemanticError> {
+		let expanded_key = &self.inbound_payment_key;
+		let secp_ctx = &self.secp_ctx;
+
+		let builder: InvoiceRequestBuilder<secp256k1::All> =
+			offer.request_invoice(expanded_key, nonce, secp_ctx, payment_id)?.into();
+		let builder = builder.chain_hash(self.chain_hash)?;
+
+		Ok(builder)
+	}
+
+	/// Creates a [`StaticInvoiceBuilder`] from the corresponding [`Offer`] and [`Nonce`] that were
+	/// created via [`Self::create_async_receive_offer_builder`].
+	#[cfg(async_payments)]
+	pub fn create_static_invoice_builder<'a, ES: Deref, R: Deref>(
+		&self, router: &R, entropy_source: ES, offer: &'a Offer, offer_nonce: Nonce,
+		payment_secret: PaymentSecret, relative_expiry_secs: u32,
+		usable_channels: Vec<ChannelDetails>, peers: Vec<MessageForwardNode>,
+	) -> Result<StaticInvoiceBuilder<'a>, Bolt12SemanticError>
+	where
+		ES::Target: EntropySource,
+		R::Target: Router,
+	{
+		let expanded_key = &self.inbound_payment_key;
+		let entropy = &*entropy_source;
+		let secp_ctx = &self.secp_ctx;
+
+		let payment_context =
+			PaymentContext::AsyncBolt12Offer(AsyncBolt12OfferContext { offer_nonce });
+
+		let amount_msat = offer.amount().and_then(|amount| match amount {
+			Amount::Bitcoin { amount_msats } => Some(amount_msats),
+			Amount::Currency { .. } => None,
+		});
+
+		let created_at = self.duration_since_epoch();
+
+		let payment_paths = self
+			.create_blinded_payment_paths(
+				router,
+				entropy,
+				usable_channels,
+				amount_msat,
+				payment_secret,
+				payment_context,
+				relative_expiry_secs,
+			)
+			.map_err(|()| Bolt12SemanticError::MissingPaths)?;
+
+		let nonce = Nonce::from_entropy_source(entropy);
+		let hmac = signer::hmac_for_held_htlc_available_context(nonce, expanded_key);
+		let path_absolute_expiry = Duration::from_secs(inbound_payment::calculate_absolute_expiry(
+			created_at.as_secs(),
+			relative_expiry_secs,
+		));
+
+		let context = MessageContext::AsyncPayments(AsyncPaymentsContext::InboundPayment {
+			nonce,
+			hmac,
+			path_absolute_expiry,
+		});
+
+		let async_receive_message_paths = self
+			.create_blinded_paths(peers, context)
+			.map_err(|()| Bolt12SemanticError::MissingPaths)?;
+
+		StaticInvoiceBuilder::for_offer_using_derived_keys(
+			offer,
+			payment_paths,
+			async_receive_message_paths,
+			created_at,
+			expanded_key,
+			offer_nonce,
+			secp_ctx,
+		)
+		.map(|inv| inv.allow_mpp().relative_expiry(relative_expiry_secs))
+	}
+
+	/// Creates an [`InvoiceBuilder`] using the provided [`Refund`].
+	///
+	/// This method is used when a node wishes to construct an [`InvoiceBuilder`]
+	/// in response to a [`Refund`] request as part of a BOLT 12 flow.
+	///
+	/// Returns an `InvoiceBuilder` configured with:
+	/// - Blinded payment paths created using the parameterized [`Router`], with the provided
+	///   `payment_secret` included in the path payloads.
+	/// - The given `payment_hash` and `payment_secret`, enabling secure claim verification.
+	///
+	/// Returns an error if the refund targets a different chain or if no valid
+	/// blinded path can be constructed.
+	pub fn create_invoice_builder_from_refund<'a, ES: Deref, R: Deref>(
+		&'a self, router: &R, entropy_source: ES, refund: &'a Refund, payment_hash: PaymentHash,
+		payment_secret: PaymentSecret, usable_channels: Vec<ChannelDetails>,
+	) -> Result<InvoiceBuilder<'a, DerivedSigningPubkey>, Bolt12SemanticError>
+	where
+		ES::Target: EntropySource,
+		R::Target: Router,
+	{
+		if refund.chain() != self.chain_hash {
+			return Err(Bolt12SemanticError::UnsupportedChain);
+		}
+
+		let expanded_key = &self.inbound_payment_key;
+		let entropy = &*entropy_source;
+
+		let amount_msats = refund.amount_msats();
+		let relative_expiry = DEFAULT_RELATIVE_EXPIRY.as_secs() as u32;
+
+		let payment_context = PaymentContext::Bolt12Refund(Bolt12RefundContext {});
+		let payment_paths = self
+			.create_blinded_payment_paths(
+				router,
+				entropy,
+				usable_channels,
+				Some(amount_msats),
+				payment_secret,
+				payment_context,
+				relative_expiry,
+			)
+			.map_err(|_| Bolt12SemanticError::MissingPaths)?;
+
+		#[cfg(feature = "std")]
+		let builder = refund.respond_using_derived_keys(
+			payment_paths,
+			payment_hash,
+			expanded_key,
+			entropy,
+		)?;
+
+		#[cfg(not(feature = "std"))]
+		let created_at = Duration::from_secs(self.highest_seen_timestamp.load(Ordering::Acquire) as u64);
+		#[cfg(not(feature = "std"))]
+		let builder = refund.respond_using_derived_keys_no_std(
+			payment_paths,
+			payment_hash,
+			created_at,
+			expanded_key,
+			entropy,
+		)?;
+
+		Ok(builder.into())
+	}
+
+	/// Creates a response for the provided [`VerifiedInvoiceRequest`].
+	///
+	/// A response can be either an [`OffersMessage::Invoice`] with additional [`MessageContext`],
+	/// or an [`OffersMessage::InvoiceError`], depending on the [`InvoiceRequest`].
+	///
+	/// An [`OffersMessage::InvoiceError`] will be generated if:
+	/// - We fail to generate valid payment paths to include in the [`Bolt12Invoice`].
+	/// - We fail to generate a valid signed [`Bolt12Invoice`] for the [`InvoiceRequest`].
+	pub fn create_response_for_invoice_request<ES: Deref, NS: Deref, R: Deref>(
+		&self, signer: &NS, router: &R, entropy_source: ES,
+		invoice_request: VerifiedInvoiceRequest, amount_msats: u64, payment_hash: PaymentHash,
+		payment_secret: PaymentSecret, usable_channels: Vec<ChannelDetails>,
+	) -> (OffersMessage, Option<MessageContext>)
+	where
+		ES::Target: EntropySource,
+		NS::Target: NodeSigner,
+		R::Target: Router,
+	{
+		let entropy = &*entropy_source;
+		let expanded_key = &self.inbound_payment_key;
+		let secp_ctx = &self.secp_ctx;
+
+		let relative_expiry = DEFAULT_RELATIVE_EXPIRY.as_secs() as u32;
+
+		let context = PaymentContext::Bolt12Offer(Bolt12OfferContext {
+			offer_id: invoice_request.offer_id,
+			invoice_request: invoice_request.fields(),
+		});
+
+		let payment_paths = match self.create_blinded_payment_paths(
+			router,
+			entropy,
+			usable_channels,
+			Some(amount_msats),
+			payment_secret,
+			context,
+			relative_expiry,
+		) {
+			Ok(paths) => paths,
+			Err(_) => {
+				let error = InvoiceError::from(Bolt12SemanticError::MissingPaths);
+				return (OffersMessage::InvoiceError(error.into()), None);
+			},
+		};
+
+		#[cfg(not(feature = "std"))]
+		let created_at = Duration::from_secs(self.highest_seen_timestamp.load(Ordering::Acquire) as u64);
+
+		let response = if invoice_request.keys.is_some() {
+			#[cfg(feature = "std")]
+			let builder = invoice_request.respond_using_derived_keys(payment_paths, payment_hash);
+			#[cfg(not(feature = "std"))]
+			let builder = invoice_request.respond_using_derived_keys_no_std(
+				payment_paths,
+				payment_hash,
+				created_at,
+			);
+			builder
+				.map(InvoiceBuilder::<DerivedSigningPubkey>::from)
+				.and_then(|builder| builder.allow_mpp().build_and_sign(secp_ctx))
+				.map_err(InvoiceError::from)
+		} else {
+			#[cfg(feature = "std")]
+			let builder = invoice_request.respond_with(payment_paths, payment_hash);
+			#[cfg(not(feature = "std"))]
+			let builder = invoice_request.respond_with_no_std(payment_paths, payment_hash, created_at);
+			builder
+				.map(InvoiceBuilder::<ExplicitSigningPubkey>::from)
+				.and_then(|builder| builder.allow_mpp().build())
+				.map_err(InvoiceError::from)
+				.and_then(|invoice| {
+					#[cfg(c_bindings)]
+					let mut invoice = invoice;
+					invoice
+						.sign(|invoice: &UnsignedBolt12Invoice| signer.sign_bolt12_invoice(invoice))
+						.map_err(InvoiceError::from)
+				})
+		};
+
+		match response {
+			Ok(invoice) => {
+				let nonce = Nonce::from_entropy_source(entropy);
+				let hmac = payment_hash.hmac_for_offer_payment(nonce, expanded_key);
+				let context = MessageContext::Offers(OffersContext::InboundPayment {
+					payment_hash,
+					nonce,
+					hmac,
+				});
+
+				(OffersMessage::Invoice(invoice), Some(context))
+			},
+			Err(error) => (OffersMessage::InvoiceError(error.into()), None),
+		}
+	}
+
+	/// Enqueues the created [`InvoiceRequest`] to be sent to the counterparty.
+	///
+	/// # Payment
+	///
+	/// The provided `payment_id` is used to create a unique [`MessageContext`] for the
+	/// blinded paths sent to the counterparty. This allows them to respond with an invoice,
+	/// over those blinded paths, which can be verified against the intended outbound payment,
+	/// ensuring the invoice corresponds to a payment we actually want to make.
+	///
+	/// # Nonce
+	/// The nonce is used to create a unique [`MessageContext`] for the reply paths.
+	/// These will be used to verify the corresponding [`Bolt12Invoice`] when it is received.
+	///
+	/// Note: The provided [`Nonce`] MUST be the same as the [`Nonce`] used for creating the
+	/// [`InvoiceRequest`] to ensure correct verification of the corresponding [`Bolt12Invoice`].
+	///
+	/// See [`OffersMessageFlow::create_invoice_request_builder`] for more details.
+	///
+	/// # Peers
+	///
+	/// The user must provide a list of [`MessageForwardNode`] that will be used to generate
+	/// valid reply paths for the counterparty to send back the corresponding [`Bolt12Invoice`]
+	/// or [`InvoiceError`].
+	///
+	/// [`supports_onion_messages`]: crate::types::features::Features::supports_onion_messages
+	pub fn enqueue_invoice_request(
+		&self, invoice_request: InvoiceRequest, payment_id: PaymentId, nonce: Nonce,
+		peers: Vec<MessageForwardNode>,
+	) -> Result<(), Bolt12SemanticError> {
+		let expanded_key = &self.inbound_payment_key;
+
+		let hmac = payment_id.hmac_for_offer_payment(nonce, expanded_key);
+		let context = MessageContext::Offers(OffersContext::OutboundPayment {
+			payment_id,
+			nonce,
+			hmac: Some(hmac),
+		});
+		let reply_paths = self
+			.create_blinded_paths(peers, context)
+			.map_err(|_| Bolt12SemanticError::MissingPaths)?;
+
+		let mut pending_offers_messages = self.pending_offers_messages.lock().unwrap();
+		if !invoice_request.paths().is_empty() {
+			let message = OffersMessage::InvoiceRequest(invoice_request.clone());
+			enqueue_onion_message_with_reply_paths(
+				message,
+				invoice_request.paths(),
+				reply_paths,
+				&mut pending_offers_messages,
+			);
+		} else if let Some(node_id) = invoice_request.issuer_signing_pubkey() {
+			for reply_path in reply_paths {
+				let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
+					destination: Destination::Node(node_id),
+					reply_path,
+				};
+				let message = OffersMessage::InvoiceRequest(invoice_request.clone());
+				pending_offers_messages.push((message, instructions));
+			}
+		} else {
+			debug_assert!(false);
+			return Err(Bolt12SemanticError::MissingIssuerSigningPubkey);
+		}
+
+		Ok(())
+	}
+
+	/// Enqueues the created [`Bolt12Invoice`] corresponding to a [`Refund`] to be sent
+	/// to the counterparty.
+	///
+	/// # Peers
+	///
+	/// The user must provide a list of [`MessageForwardNode`] that will be used to generate valid
+	/// reply paths for the counterparty to send back the corresponding [`InvoiceError`] if we fail
+	/// to create blinded reply paths
+	///
+	/// [`supports_onion_messages`]: crate::types::features::Features::supports_onion_messages
+	pub fn enqueue_invoice<ES: Deref>(
+		&self, entropy_source: ES, invoice: Bolt12Invoice, refund: &Refund,
+		peers: Vec<MessageForwardNode>,
+	) -> Result<(), Bolt12SemanticError>
+	where
+		ES::Target: EntropySource,
+	{
+		let expanded_key = &self.inbound_payment_key;
+		let entropy = &*entropy_source;
+
+		let payment_hash = invoice.payment_hash();
+
+		let nonce = Nonce::from_entropy_source(entropy);
+		let hmac = payment_hash.hmac_for_offer_payment(nonce, expanded_key);
+
+		let context =
+			MessageContext::Offers(OffersContext::InboundPayment { payment_hash, nonce, hmac });
+
+		let reply_paths = self
+			.create_blinded_paths(peers, context)
+			.map_err(|_| Bolt12SemanticError::MissingPaths)?;
+
+		let mut pending_offers_messages = self.pending_offers_messages.lock().unwrap();
+
+		if refund.paths().is_empty() {
+			for reply_path in reply_paths {
+				let instructions = MessageSendInstructions::WithSpecifiedReplyPath {
+					destination: Destination::Node(refund.payer_signing_pubkey()),
+					reply_path,
+				};
+				let message = OffersMessage::Invoice(invoice.clone());
+				pending_offers_messages.push((message, instructions));
+			}
+		} else {
+			let message = OffersMessage::Invoice(invoice);
+			enqueue_onion_message_with_reply_paths(
+				message,
+				refund.paths(),
+				reply_paths,
+				&mut pending_offers_messages,
+			);
+		}
+
+		Ok(())
+	}
+
+	/// Enqueues `held_htlc_available` onion messages to be sent to the payee via the reply paths
+	/// contained within the provided [`StaticInvoice`].
+	///
+	/// # Peers
+	///
+	/// The user must provide a list of [`MessageForwardNode`] that will be used to generate valid
+	/// reply paths for the recipient to send back the corresponding [`ReleaseHeldHtlc`] onion message.
+	///
+	/// [`ReleaseHeldHtlc`]: crate::onion_message::async_payments::ReleaseHeldHtlc
+	/// [`supports_onion_messages`]: crate::types::features::Features::supports_onion_messages
+	#[cfg(async_payments)]
+	pub fn enqueue_held_htlc_available<ES: Deref>(
+		&self, entropy_source: ES, invoice: &StaticInvoice, payment_id: PaymentId,
+		peers: Vec<MessageForwardNode>,
+	) -> Result<(), Bolt12SemanticError>
+	where
+		ES::Target: EntropySource,
+	{
+		let expanded_key = &self.inbound_payment_key;
+		let entropy = &*entropy_source;
+
+		let nonce = Nonce::from_entropy_source(entropy);
+		let hmac = payment_id.hmac_for_async_payment(nonce, expanded_key);
+		let context = MessageContext::AsyncPayments(AsyncPaymentsContext::OutboundPayment {
+			payment_id,
+			nonce,
+			hmac,
+		});
+
+		let reply_paths = self
+			.create_blinded_paths(peers, context)
+			.map_err(|_| Bolt12SemanticError::MissingPaths)?;
+
+		let mut pending_async_payments_messages =
+			self.pending_async_payments_messages.lock().unwrap();
+
+		let message = AsyncPaymentsMessage::HeldHtlcAvailable(HeldHtlcAvailable {});
+		enqueue_onion_message_with_reply_paths(
+			message,
+			invoice.message_paths(),
+			reply_paths,
+			&mut pending_async_payments_messages,
+		);
+
+		Ok(())
+	}
+
+	/// Enqueues the created [`DNSSECQuery`] to be sent to the counterparty.
+	///
+	/// # Peers
+	///
+	/// The user must provide a list of [`MessageForwardNode`] that will be used to generate
+	/// valid reply paths for the counterparty to send back the corresponding response for
+	/// the [`DNSSECQuery`] message.
+	///
+	/// [`supports_onion_messages`]: crate::types::features::Features::supports_onion_messages
+	#[cfg(feature = "dnssec")]
+	pub fn enqueue_dns_onion_message(
+		&self, message: DNSSECQuery, context: DNSResolverContext, dns_resolvers: Vec<Destination>,
+		peers: Vec<MessageForwardNode>,
+	) -> Result<(), Bolt12SemanticError> {
+		let reply_paths = self
+			.create_blinded_paths(peers, MessageContext::DNSResolver(context))
+			.map_err(|_| Bolt12SemanticError::MissingPaths)?;
+
+		let message_params = dns_resolvers
+			.iter()
+			.flat_map(|destination| reply_paths.iter().map(move |path| (path, destination)))
+			.take(OFFERS_MESSAGE_REQUEST_LIMIT);
+		for (reply_path, destination) in message_params {
+			self.pending_dns_onion_messages.lock().unwrap().push((
+				DNSResolverMessage::DNSSECQuery(message.clone()),
+				MessageSendInstructions::WithSpecifiedReplyPath {
+					destination: destination.clone(),
+					reply_path: reply_path.clone(),
+				},
+			));
+		}
+
+		Ok(())
+	}
+
+	/// Gets the enqueued [`OffersMessage`] with their corresponding [`MessageSendInstructions`].
+	pub fn release_pending_offers_messages(&self) -> Vec<(OffersMessage, MessageSendInstructions)> {
+		core::mem::take(&mut self.pending_offers_messages.lock().unwrap())
+	}
+
+	/// Gets the enqueued [`AsyncPaymentsMessage`] with their corresponding [`MessageSendInstructions`].
+	pub fn release_pending_async_messages(
+		&self,
+	) -> Vec<(AsyncPaymentsMessage, MessageSendInstructions)> {
+		core::mem::take(&mut self.pending_async_payments_messages.lock().unwrap())
+	}
+
+	/// Gets the enqueued [`DNSResolverMessage`] with their corresponding [`MessageSendInstructions`].
+	#[cfg(feature = "dnssec")]
+	pub fn release_pending_dns_messages(
+		&self,
+	) -> Vec<(DNSResolverMessage, MessageSendInstructions)> {
+		core::mem::take(&mut self.pending_dns_onion_messages.lock().unwrap())
+	}
+}
diff --git a/lightning/src/offers/mod.rs b/lightning/src/offers/mod.rs
index 49a95b96f86..cf078ed0e67 100644
--- a/lightning/src/offers/mod.rs
+++ b/lightning/src/offers/mod.rs
@@ -14,6 +14,7 @@
 
 #[macro_use]
 pub mod offer;
+pub mod flow;
 
 pub mod invoice;
 pub mod invoice_error;