Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning-liquidity: Introduce EventQueue notifier and wake BP for message processing #3509

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lightning-background-processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ rustdoc-args = ["--cfg", "docsrs"]

[features]
futures = [ ]
std = ["lightning/std", "bitcoin-io/std", "bitcoin_hashes/std"]
std = ["lightning/std", "lightning-liquidity/std", "bitcoin-io/std", "bitcoin_hashes/std"]

default = ["std"]

Expand All @@ -25,6 +25,7 @@ bitcoin_hashes = { version = "0.14.0", default-features = false }
bitcoin-io = { version = "0.1.2", default-features = false }
lightning = { version = "0.2.0", path = "../lightning", default-features = false }
lightning-rapid-gossip-sync = { version = "0.2.0", path = "../lightning-rapid-gossip-sync", default-features = false }
lightning-liquidity = { version = "0.2.0", path = "../lightning-liquidity", default-features = false }

[dev-dependencies]
tokio = { version = "1.35", features = [ "macros", "rt", "rt-multi-thread", "sync", "time" ] }
Expand Down
222 changes: 143 additions & 79 deletions lightning-background-processor/src/lib.rs

Large diffs are not rendered by default.

121 changes: 103 additions & 18 deletions lightning-liquidity/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use crate::prelude::{Vec, VecDeque};
use crate::sync::{Arc, Mutex};

use core::future::Future;
#[cfg(debug_assertions)]
use core::sync::atomic::{AtomicU8, Ordering};
use core::task::{Poll, Waker};

/// The maximum queue size we allow before starting to drop events.
Expand All @@ -31,37 +33,40 @@ pub(crate) struct EventQueue {
queue: Arc<Mutex<VecDeque<LiquidityEvent>>>,
waker: Arc<Mutex<Option<Waker>>>,
#[cfg(feature = "std")]
condvar: crate::sync::Condvar,
condvar: Arc<crate::sync::Condvar>,
#[cfg(debug_assertions)]
num_held_notifier_guards: Arc<AtomicU8>,
}

impl EventQueue {
pub fn new() -> Self {
let queue = Arc::new(Mutex::new(VecDeque::new()));
let waker = Arc::new(Mutex::new(None));
#[cfg(feature = "std")]
{
let condvar = crate::sync::Condvar::new();
Self { queue, waker, condvar }
Self {
queue,
waker,
#[cfg(feature = "std")]
condvar: Arc::new(crate::sync::Condvar::new()),
#[cfg(debug_assertions)]
num_held_notifier_guards: Arc::new(AtomicU8::new(0)),
}
#[cfg(not(feature = "std"))]
Self { queue, waker }
}

pub fn enqueue<E: Into<LiquidityEvent>>(&self, event: E) {
#[cfg(debug_assertions)]
{
let mut queue = self.queue.lock().unwrap();
if queue.len() < MAX_EVENT_QUEUE_SIZE {
queue.push_back(event.into());
} else {
return;
}
let num_held_notifier_guards = self.num_held_notifier_guards.load(Ordering::Relaxed);
debug_assert!(
num_held_notifier_guards > 0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this should always work, it's still possible we forget to grab the notifier in a method and it goes unnoticed due to lack of test coverage. Could we make it so that the queue is not available for access without the notifier guard? It might require the use of a sealed module as I did in #3624 with MaybeTweakedSecretKey.

"We should be holding at least one notifier guard whenever enqueuing new events"
);
}

if let Some(waker) = self.waker.lock().unwrap().take() {
waker.wake();
let mut queue = self.queue.lock().unwrap();
if queue.len() < MAX_EVENT_QUEUE_SIZE {
queue.push_back(event.into());
} else {
return;
}
#[cfg(feature = "std")]
self.condvar.notify_one();
}

pub fn next_event(&self) -> Option<LiquidityEvent> {
Expand Down Expand Up @@ -100,6 +105,81 @@ impl EventQueue {
pub fn get_and_clear_pending_events(&self) -> Vec<LiquidityEvent> {
self.queue.lock().unwrap().split_off(0).into()
}

// Returns an [`EventQueueNotifierGuard`] that will notify about new event when dropped.
pub fn notifier(&self) -> EventQueueNotifierGuard {
#[cfg(debug_assertions)]
{
self.num_held_notifier_guards.fetch_add(1, Ordering::Relaxed);
}
EventQueueNotifierGuard {
queue: Arc::clone(&self.queue),
waker: Arc::clone(&self.waker),
#[cfg(feature = "std")]
condvar: Arc::clone(&self.condvar),
#[cfg(debug_assertions)]
num_held_notifier_guards: Arc::clone(&self.num_held_notifier_guards),
}
}
}

impl Drop for EventQueue {
fn drop(&mut self) {
#[cfg(debug_assertions)]
{
let num_held_notifier_guards = self.num_held_notifier_guards.load(Ordering::Relaxed);
debug_assert!(
num_held_notifier_guards == 0,
"We should not be holding any notifier guards when the event queue is dropped"
);
}
}
}

// A guard type that will notify about new events when dropped.
#[must_use]
pub(crate) struct EventQueueNotifierGuard {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we be removing the manual notifying now that the guard is in place?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd also be nice if we could guarantee at compile-time that the guard is being held when queueing events

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we be removing the manual notifying now that the guard is in place?

Ugh, good catch. I removed it before in enqueue but it seems to have snuck back in due to some rebase foo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd also be nice if we could guarantee at compile-time that the guard is being held when queueing events

Good idea, now added corresponding debug assertions.

queue: Arc<Mutex<VecDeque<LiquidityEvent>>>,
waker: Arc<Mutex<Option<Waker>>>,
#[cfg(feature = "std")]
condvar: Arc<crate::sync::Condvar>,
#[cfg(debug_assertions)]
num_held_notifier_guards: Arc<AtomicU8>,
}

impl Drop for EventQueueNotifierGuard {
fn drop(&mut self) {
let should_notify = !self.queue.lock().unwrap().is_empty();

if should_notify {
if let Some(waker) = self.waker.lock().unwrap().take() {
waker.wake();
}

#[cfg(feature = "std")]
self.condvar.notify_one();
}

#[cfg(debug_assertions)]
{
let res = self.num_held_notifier_guards.fetch_update(
Ordering::Relaxed,
Ordering::Relaxed,
|x| Some(x.saturating_sub(1)),
);
match res {
Ok(previous_value) if previous_value == 0 => debug_assert!(
false,
"num_held_notifier_guards counter out-of-sync! This should never happen!"
),
Err(_) => debug_assert!(
false,
"num_held_notifier_guards counter out-of-sync! This should never happen!"
),
_ => {},
}
}
}
}

/// An event which you should probably take some action in response to.
Expand Down Expand Up @@ -193,6 +273,7 @@ mod tests {
});

for _ in 0..3 {
let _guard = event_queue.notifier();
event_queue.enqueue(expected_event.clone());
}

Expand All @@ -218,13 +299,15 @@ mod tests {
let mut delayed_enqueue = false;

for _ in 0..25 {
let _guard = event_queue.notifier();
event_queue.enqueue(expected_event.clone());
enqueued_events.fetch_add(1, Ordering::SeqCst);
}

loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(10)), if !delayed_enqueue => {
let _guard = event_queue.notifier();
event_queue.enqueue(expected_event.clone());
enqueued_events.fetch_add(1, Ordering::SeqCst);
delayed_enqueue = true;
Expand All @@ -233,6 +316,7 @@ mod tests {
assert_eq!(e, expected_event);
received_events.fetch_add(1, Ordering::SeqCst);

let _guard = event_queue.notifier();
event_queue.enqueue(expected_event.clone());
enqueued_events.fetch_add(1, Ordering::SeqCst);
}
Expand Down Expand Up @@ -265,6 +349,7 @@ mod tests {
std::thread::spawn(move || {
// Sleep a bit before we enqueue the events everybody is waiting for.
std::thread::sleep(Duration::from_millis(20));
let _guard = thread_queue.notifier();
thread_queue.enqueue(thread_event.clone());
thread_queue.enqueue(thread_event.clone());
});
Expand Down
4 changes: 3 additions & 1 deletion lightning-liquidity/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,6 @@ mod sync;
mod tests;
mod utils;

pub use manager::{LiquidityClientConfig, LiquidityManager, LiquidityServiceConfig};
pub use manager::{
ALiquidityManager, LiquidityClientConfig, LiquidityManager, LiquidityServiceConfig,
};
2 changes: 2 additions & 0 deletions lightning-liquidity/src/lsps0/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ where
fn handle_response(
&self, response: LSPS0Response, counterparty_node_id: &PublicKey,
) -> Result<(), LightningError> {
let _event_queue_notifier = self.pending_events.notifier();

match response {
LSPS0Response::ListProtocols(LSPS0ListProtocolsResponse { protocols }) => {
self.pending_events.enqueue(LSPS0ClientEvent::ListProtocolsResponse {
Expand Down
13 changes: 12 additions & 1 deletion lightning-liquidity/src/lsps1/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,9 @@ where
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
result: LSPS1GetInfoResponse,
) -> Result<(), LightningError> {
let outer_state_lock = self.per_peer_state.write().unwrap();
let _event_queue_notifier = self.pending_events.notifier();

let outer_state_lock = self.per_peer_state.write().unwrap();
match outer_state_lock.get(counterparty_node_id) {
Some(inner_state_lock) => {
let mut peer_state_lock = inner_state_lock.lock().unwrap();
Expand Down Expand Up @@ -142,6 +143,8 @@ where
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
error: LSPSResponseError,
) -> Result<(), LightningError> {
let _event_queue_notifier = self.pending_events.notifier();

let outer_state_lock = self.per_peer_state.read().unwrap();
match outer_state_lock.get(counterparty_node_id) {
Some(inner_state_lock) => {
Expand Down Expand Up @@ -219,6 +222,8 @@ where
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
response: LSPS1CreateOrderResponse,
) -> Result<(), LightningError> {
let _event_queue_notifier = self.pending_events.notifier();

let outer_state_lock = self.per_peer_state.read().unwrap();
match outer_state_lock.get(counterparty_node_id) {
Some(inner_state_lock) => {
Expand Down Expand Up @@ -261,6 +266,8 @@ where
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
error: LSPSResponseError,
) -> Result<(), LightningError> {
let _event_queue_notifier = self.pending_events.notifier();

let outer_state_lock = self.per_peer_state.read().unwrap();
match outer_state_lock.get(counterparty_node_id) {
Some(inner_state_lock) => {
Expand Down Expand Up @@ -338,6 +345,8 @@ where
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
response: LSPS1CreateOrderResponse,
) -> Result<(), LightningError> {
let _event_queue_notifier = self.pending_events.notifier();

let outer_state_lock = self.per_peer_state.read().unwrap();
match outer_state_lock.get(counterparty_node_id) {
Some(inner_state_lock) => {
Expand Down Expand Up @@ -380,6 +389,8 @@ where
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
error: LSPSResponseError,
) -> Result<(), LightningError> {
let _event_queue_notifier = self.pending_events.notifier();

let outer_state_lock = self.per_peer_state.read().unwrap();
match outer_state_lock.get(counterparty_node_id) {
Some(inner_state_lock) => {
Expand Down
4 changes: 4 additions & 0 deletions lightning-liquidity/src/lsps2/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ where
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
result: LSPS2GetInfoResponse,
) -> Result<(), LightningError> {
let _event_queue_notifier = self.pending_events.notifier();

let outer_state_lock = self.per_peer_state.read().unwrap();
match outer_state_lock.get(counterparty_node_id) {
Some(inner_state_lock) => {
Expand Down Expand Up @@ -250,6 +252,8 @@ where
&self, request_id: LSPSRequestId, counterparty_node_id: &PublicKey,
result: LSPS2BuyResponse,
) -> Result<(), LightningError> {
let _event_queue_notifier = self.pending_events.notifier();

let outer_state_lock = self.per_peer_state.read().unwrap();
match outer_state_lock.get(counterparty_node_id) {
Some(inner_state_lock) => {
Expand Down
16 changes: 8 additions & 8 deletions lightning-liquidity/src/lsps2/payment_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,11 @@ use lightning_types::payment::PaymentHash;
/// Holds payments with the corresponding HTLCs until it is possible to pay the fee.
/// When the fee is successfully paid with a forwarded payment, the queue should be consumed and the
/// remaining payments forwarded.
#[derive(Clone, PartialEq, Eq, Debug)]
#[derive(Clone, Default, PartialEq, Eq, Debug)]
pub(crate) struct PaymentQueue {
payments: Vec<(PaymentHash, Vec<InterceptedHTLC>)>,
}

#[derive(Copy, Clone, PartialEq, Eq, Debug)]
pub(crate) struct InterceptedHTLC {
pub(crate) intercept_id: InterceptId,
pub(crate) expected_outbound_amount_msat: u64,
pub(crate) payment_hash: PaymentHash,
}

impl PaymentQueue {
pub(crate) fn new() -> PaymentQueue {
PaymentQueue { payments: Vec::new() }
Expand Down Expand Up @@ -54,6 +47,13 @@ impl PaymentQueue {
}
}

#[derive(Copy, Clone, PartialEq, Eq, Debug)]
pub(crate) struct InterceptedHTLC {
pub(crate) intercept_id: InterceptId,
pub(crate) expected_outbound_amount_msat: u64,
pub(crate) payment_hash: PaymentHash,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading
Loading