@@ -2720,6 +2720,10 @@ pub struct ChannelManager<
2720
2720
/// A simple atomic flag to ensure only one task at a time can be processing events asynchronously.
2721
2721
pending_events_processor: AtomicBool,
2722
2722
2723
+ /// A simple atomic flag to ensure only one task at a time can be processing HTLC forwards via
2724
+ /// [`Self::process_pending_htlc_forwards`].
2725
+ pending_htlc_forwards_processor: AtomicBool,
2726
+
2723
2727
/// If we are running during init (either directly during the deserialization method or in
2724
2728
/// block connection methods which run after deserialization but before normal operation) we
2725
2729
/// cannot provide the user with [`ChannelMonitorUpdate`]s through the normal update flow -
@@ -3784,6 +3788,7 @@ where
3784
3788
3785
3789
pending_events: Mutex::new(VecDeque::new()),
3786
3790
pending_events_processor: AtomicBool::new(false),
3791
+ pending_htlc_forwards_processor: AtomicBool::new(false),
3787
3792
pending_background_events: Mutex::new(Vec::new()),
3788
3793
total_consistency_lock: RwLock::new(()),
3789
3794
background_events_processed_since_startup: AtomicBool::new(false),
@@ -6334,9 +6339,19 @@ where
6334
6339
///
6335
6340
/// Will regularly be called by the background processor.
6336
6341
pub fn process_pending_htlc_forwards(&self) {
6342
+ if self
6343
+ .pending_htlc_forwards_processor
6344
+ .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
6345
+ .is_err()
6346
+ {
6347
+ return;
6348
+ }
6349
+
6337
6350
let _persistence_guard = PersistenceNotifierGuard::optionally_notify(self, || {
6338
6351
self.internal_process_pending_htlc_forwards()
6339
6352
});
6353
+
6354
+ self.pending_htlc_forwards_processor.store(false, Ordering::Release);
6340
6355
}
6341
6356
6342
6357
// Returns whether or not we need to re-persist.
@@ -16348,6 +16363,7 @@ where
16348
16363
16349
16364
pending_events: Mutex::new(pending_events_read),
16350
16365
pending_events_processor: AtomicBool::new(false),
16366
+ pending_htlc_forwards_processor: AtomicBool::new(false),
16351
16367
pending_background_events: Mutex::new(pending_background_events),
16352
16368
total_consistency_lock: RwLock::new(()),
16353
16369
background_events_processed_since_startup: AtomicBool::new(false),
0 commit comments