Skip to content

Commit c8d1893

Browse files
committed
Convert process_events_async to take an asynchronous Persister
Also provide a wrapper to allow a sync kvstore to be used.
1 parent 5abcf12 commit c8d1893

File tree

2 files changed

+188
-50
lines changed

2 files changed

+188
-50
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use lightning::sign::ChangeDestinationSourceSync;
4040
use lightning::sign::EntropySource;
4141
use lightning::sign::OutputSpender;
4242
use lightning::util::logger::Logger;
43-
use lightning::util::persist::{KVStoreSync, PersisterSync};
43+
use lightning::util::persist::{KVStoreSync, Persister, PersisterSync};
4444
use lightning::util::sweep::OutputSweeper;
4545
#[cfg(feature = "std")]
4646
use lightning::util::sweep::OutputSweeperSync;
@@ -311,6 +311,15 @@ fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + Wri
311311
true
312312
}
313313

314+
macro_rules! maybe_await {
315+
(true, $e:expr) => {
316+
$e.await
317+
};
318+
(false, $e:expr) => {
319+
$e
320+
};
321+
}
322+
314323
macro_rules! define_run_body {
315324
(
316325
$persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
@@ -319,7 +328,7 @@ macro_rules! define_run_body {
319328
$peer_manager: ident, $gossip_sync: ident,
320329
$process_sweeper: expr,
321330
$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
322-
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
331+
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, $async: tt,
323332
) => { {
324333
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
325334
$channel_manager.get_cm().timer_tick_occurred();
@@ -375,7 +384,7 @@ macro_rules! define_run_body {
375384

376385
if $channel_manager.get_cm().get_and_clear_needs_persistence() {
377386
log_trace!($logger, "Persisting ChannelManager...");
378-
$persister.persist_manager(&$channel_manager)?;
387+
maybe_await!($async, $persister.persist_manager(&$channel_manager))?;
379388
log_trace!($logger, "Done persisting ChannelManager.");
380389
}
381390
if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
@@ -436,7 +445,7 @@ macro_rules! define_run_body {
436445
log_trace!($logger, "Persisting network graph.");
437446
}
438447

439-
if let Err(e) = $persister.persist_graph(network_graph) {
448+
if let Err(e) = maybe_await!($async, $persister.persist_graph(network_graph)) {
440449
log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
441450
}
442451

@@ -464,8 +473,8 @@ macro_rules! define_run_body {
464473
} else {
465474
log_trace!($logger, "Persisting scorer");
466475
}
467-
if let Err(e) = $persister.persist_scorer(&scorer) {
468-
log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
476+
if let Err(e) = maybe_await!($async, $persister.persist_scorer(&scorer)) {
477+
log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
469478
}
470479
}
471480
last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
@@ -487,16 +496,16 @@ macro_rules! define_run_body {
487496
// After we exit, ensure we persist the ChannelManager one final time - this avoids
488497
// some races where users quit while channel updates were in-flight, with
489498
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
490-
$persister.persist_manager(&$channel_manager)?;
499+
maybe_await!($async, $persister.persist_manager(&$channel_manager))?;
491500

492501
// Persist Scorer on exit
493502
if let Some(ref scorer) = $scorer {
494-
$persister.persist_scorer(&scorer)?;
503+
maybe_await!($async, $persister.persist_scorer(&scorer))?;
495504
}
496505

497506
// Persist NetworkGraph on exit
498507
if let Some(network_graph) = $gossip_sync.network_graph() {
499-
$persister.persist_graph(network_graph)?;
508+
maybe_await!($async, $persister.persist_graph(network_graph))?;
500509
}
501510

502511
Ok(())
@@ -653,7 +662,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
653662
/// # fn log(&self, _record: lightning::util::logger::Record) {}
654663
/// # }
655664
/// # struct Store {}
656-
/// # impl lightning::util::persist::KVStore for Store {
665+
/// # impl lightning::util::persist::KVStoreSync for Store {
657666
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
658667
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
659668
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
@@ -684,7 +693,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
684693
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
685694
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
686695
/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
687-
/// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
696+
/// # K: lightning::util::persist::KVStoreSync + Send + Sync + 'static,
688697
/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
689698
/// # > {
690699
/// # peer_manager: Arc<PeerManager<B, F, FE, UL>>,
@@ -706,10 +715,11 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
706715
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
707716
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
708717
/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
709-
/// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
718+
/// # K: lightning::util::persist::KVStoreSync + Send + Sync + 'static,
710719
/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
711720
/// # >(node: Node<B, F, FE, UL, D, K, O>) {
712-
/// let background_persister = Arc::clone(&node.persister);
721+
/// let background_persister_sync = Arc::clone(&node.persister);
722+
/// let background_persister = Arc::new(Arc::new(lightning::util::persist::KVStoreSyncWrapper(background_persister_sync)));
713723
/// let background_event_handler = Arc::clone(&node.event_handler);
714724
/// let background_chain_mon = Arc::clone(&node.chain_monitor);
715725
/// let background_chan_man = Arc::clone(&node.channel_manager);
@@ -814,7 +824,7 @@ where
814824
F::Target: 'static + FeeEstimator,
815825
L::Target: 'static + Logger,
816826
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
817-
PS::Target: 'static + PersisterSync<'a, CM, L, S>,
827+
PS::Target: 'static + Persister<'a, CM, L, S>,
818828
ES::Target: 'static + EntropySource,
819829
CM::Target: AChannelManager,
820830
OM::Target: AOnionMessenger,
@@ -841,7 +851,7 @@ where
841851
if let Some(duration_since_epoch) = fetch_time() {
842852
if update_scorer(scorer, &event, duration_since_epoch) {
843853
log_trace!(logger, "Persisting scorer after update");
844-
if let Err(e) = persister.persist_scorer(&*scorer) {
854+
if let Err(e) = persister.persist_scorer(&*scorer).await {
845855
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
846856
// We opt not to abort early on persistence failure here as persisting
847857
// the scorer is non-critical and we still hope that it will have
@@ -919,6 +929,7 @@ where
919929
},
920930
mobile_interruptable_platform,
921931
fetch_time,
932+
true,
922933
)
923934
}
924935

@@ -1098,6 +1109,7 @@ impl BackgroundProcessor {
10981109
.expect("Time should be sometime after 1970"),
10991110
)
11001111
},
1112+
false,
11011113
)
11021114
});
11031115
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
@@ -1186,7 +1198,7 @@ mod tests {
11861198
use lightning::types::payment::PaymentHash;
11871199
use lightning::util::config::UserConfig;
11881200
use lightning::util::persist::{
1189-
KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY,
1201+
KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY,
11901202
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
11911203
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
11921204
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
@@ -2107,9 +2119,10 @@ mod tests {
21072119
open_channel!(nodes[0], nodes[1], 100000);
21082120

21092121
let data_dir = nodes[0].kv_store.get_data_dir();
2110-
let persister = Arc::new(
2122+
let persister_sync = Arc::new(
21112123
PersisterSync::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
21122124
);
2125+
let persister = Arc::new(Arc::new(KVStoreSyncWrapper(persister_sync)));
21132126

21142127
let bp_future = super::process_events_async(
21152128
persister,
@@ -2618,8 +2631,9 @@ mod tests {
26182631
let (_, nodes) =
26192632
create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async");
26202633
let data_dir = nodes[0].kv_store.get_data_dir();
2621-
let persister =
2634+
let persister_sync =
26222635
Arc::new(PersisterSync::new(data_dir).with_graph_persistence_notifier(sender));
2636+
let persister = Arc::new(Arc::new(KVStoreSyncWrapper(persister_sync)));
26232637

26242638
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
26252639
let bp_future = super::process_events_async(
@@ -2835,7 +2849,8 @@ mod tests {
28352849

28362850
let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async");
28372851
let data_dir = nodes[0].kv_store.get_data_dir();
2838-
let persister = Arc::new(PersisterSync::new(data_dir));
2852+
let persister_sync = Arc::new(PersisterSync::new(data_dir));
2853+
let persister = Arc::new(Arc::new(KVStoreSyncWrapper(persister_sync)));
28392854

28402855
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
28412856

0 commit comments

Comments
 (0)