From aa0d15ee6791a2e4e2b48326567ee0789c36b585 Mon Sep 17 00:00:00 2001 From: Tommy Verrall Date: Thu, 23 Oct 2025 19:06:27 +0200 Subject: [PATCH 1/7] Better message to come in the PR description --- .../client-core/src/client/mix_traffic/mod.rs | 5 ++++- .../real_traffic_stream.rs | 21 +++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/common/client-core/src/client/mix_traffic/mod.rs b/common/client-core/src/client/mix_traffic/mod.rs index bf83c624620..c6a158e2e46 100644 --- a/common/client-core/src/client/mix_traffic/mod.rs +++ b/common/client-core/src/client/mix_traffic/mod.rs @@ -20,7 +20,10 @@ pub mod transceiver; // We remind ourselves that 32 x 32kb = 1024kb, a reasonable size for a network buffer. pub const MIX_MESSAGE_RECEIVER_BUFFER_SIZE: usize = 32; -const MAX_FAILURE_COUNT: usize = 100; + +/// Reduced from 100 to 20 to fail fast (~1-2 seconds instead of ~6 seconds). +/// If we can't send 20 packets in a row, the gateway is unreachable. +const MAX_FAILURE_COUNT: usize = 20; // that's also disgusting. pub struct Empty; diff --git a/common/client-core/src/client/real_messages_control/real_traffic_stream.rs b/common/client-core/src/client/real_messages_control/real_traffic_stream.rs index 07b3506c457..0efa75805e1 100644 --- a/common/client-core/src/client/real_messages_control/real_traffic_stream.rs +++ b/common/client-core/src/client/real_messages_control/real_traffic_stream.rs @@ -120,6 +120,9 @@ where stats_tx: ClientStatsSender, shutdown_token: ShutdownToken, + + /// Flag to indicate that the mix_tx channel is closed and we should stop processing + mix_tx_closed: bool, } #[derive(Debug)] @@ -195,6 +198,7 @@ where lane_queue_lengths, stats_tx, shutdown_token, + mix_tx_closed: false, } } @@ -297,7 +301,12 @@ where tracing::error!( "failed to send mixnet packet due to closed channel (outside of shutdown!)" ); + // This prevents an infinite loop where we keep trying to send + // packets through a closed channel + self.mix_tx_closed = true; } + // Early return to avoid further processing when channel is closed + return; } Ok(_) => { let event = if fragment_id.is_some() { @@ -601,6 +610,12 @@ where } next_message = self.next() => if let Some(next_message) = next_message { self.on_message(next_message).await; + // Check if mix_tx channel was closed during on_message + // and break immediately to prevent infinite loop + if self.mix_tx_closed { + tracing::error!("OutQueueControl: mix_tx channel closed, stopping traffic stream"); + break; + } } else { tracing::trace!("OutQueueControl: Stopping since channel closed"); break; @@ -620,6 +635,12 @@ where } next_message = self.next() => if let Some(next_message) = next_message { self.on_message(next_message).await; + // Check if mix_tx channel was closed during on_message + // and break immediately to prevent infinite loop + if self.mix_tx_closed { + tracing::error!("OutQueueControl: mix_tx channel closed, stopping traffic stream"); + break; + } } else { tracing::trace!("OutQueueControl: Stopping since channel closed"); break; From 67c32faa113c822bb6d2d729381d09cae5847d3a Mon Sep 17 00:00:00 2001 From: Tommy Verrall Date: Thu, 23 Oct 2025 19:22:26 +0200 Subject: [PATCH 2/7] Fix comments --- .../client/real_messages_control/real_traffic_stream.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/common/client-core/src/client/real_messages_control/real_traffic_stream.rs b/common/client-core/src/client/real_messages_control/real_traffic_stream.rs index 0efa75805e1..de3fcbb6056 100644 --- a/common/client-core/src/client/real_messages_control/real_traffic_stream.rs +++ b/common/client-core/src/client/real_messages_control/real_traffic_stream.rs @@ -301,8 +301,7 @@ where tracing::error!( "failed to send mixnet packet due to closed channel (outside of shutdown!)" ); - // This prevents an infinite loop where we keep trying to send - // packets through a closed channel + // This prevents a loop where we keep trying to send packets through a closed channel self.mix_tx_closed = true; } // Early return to avoid further processing when channel is closed @@ -611,7 +610,7 @@ where next_message = self.next() => if let Some(next_message) = next_message { self.on_message(next_message).await; // Check if mix_tx channel was closed during on_message - // and break immediately to prevent infinite loop + // and break immediately to prevent loop if self.mix_tx_closed { tracing::error!("OutQueueControl: mix_tx channel closed, stopping traffic stream"); break; @@ -636,7 +635,7 @@ where next_message = self.next() => if let Some(next_message) = next_message { self.on_message(next_message).await; // Check if mix_tx channel was closed during on_message - // and break immediately to prevent infinite loop + // and break immediately to prevent loop if self.mix_tx_closed { tracing::error!("OutQueueControl: mix_tx channel closed, stopping traffic stream"); break; From bc0b89b31c755751ac0ef055c63cca4d90a90c97 Mon Sep 17 00:00:00 2001 From: Tommy Verrall Date: Fri, 24 Oct 2025 12:44:10 +0200 Subject: [PATCH 3/7] Internal comments --- .../real_traffic_stream.rs | 40 ++++++++----------- 1 file changed, 16 insertions(+), 24 deletions(-) diff --git a/common/client-core/src/client/real_messages_control/real_traffic_stream.rs b/common/client-core/src/client/real_messages_control/real_traffic_stream.rs index de3fcbb6056..50fca6343ab 100644 --- a/common/client-core/src/client/real_messages_control/real_traffic_stream.rs +++ b/common/client-core/src/client/real_messages_control/real_traffic_stream.rs @@ -120,9 +120,6 @@ where stats_tx: ClientStatsSender, shutdown_token: ShutdownToken, - - /// Flag to indicate that the mix_tx channel is closed and we should stop processing - mix_tx_closed: bool, } #[derive(Debug)] @@ -198,7 +195,6 @@ where lane_queue_lengths, stats_tx, shutdown_token, - mix_tx_closed: false, } } @@ -301,8 +297,6 @@ where tracing::error!( "failed to send mixnet packet due to closed channel (outside of shutdown!)" ); - // This prevents a loop where we keep trying to send packets through a closed channel - self.mix_tx_closed = true; } // Early return to avoid further processing when channel is closed return; @@ -604,21 +598,20 @@ where tracing::trace!("OutQueueControl: Received shutdown"); break; } - _ = status_timer.tick() => { - self.log_status(); - } - next_message = self.next() => if let Some(next_message) = next_message { - self.on_message(next_message).await; - // Check if mix_tx channel was closed during on_message - // and break immediately to prevent loop - if self.mix_tx_closed { - tracing::error!("OutQueueControl: mix_tx channel closed, stopping traffic stream"); - break; - } - } else { - tracing::trace!("OutQueueControl: Stopping since channel closed"); + _ = status_timer.tick() => { + self.log_status(); + } + next_message = self.next() => if let Some(next_message) = next_message { + // Check if mix_tx channel is closed BEFORE processing message + if self.mix_tx.is_closed() { + tracing::error!("OutQueueControl: mix_tx channel closed, stopping traffic stream"); break; } + self.on_message(next_message).await; + } else { + tracing::trace!("OutQueueControl: Stopping since channel closed"); + break; + } } } } @@ -627,19 +620,18 @@ where { loop { tokio::select! { - biased; + biased; _ = shutdown_token.cancelled() => { tracing::trace!("OutQueueControl: Received shutdown"); break; } next_message = self.next() => if let Some(next_message) = next_message { - self.on_message(next_message).await; - // Check if mix_tx channel was closed during on_message - // and break immediately to prevent loop - if self.mix_tx_closed { + // Check if mix_tx channel is closed BEFORE processing message + if self.mix_tx.is_closed() { tracing::error!("OutQueueControl: mix_tx channel closed, stopping traffic stream"); break; } + self.on_message(next_message).await; } else { tracing::trace!("OutQueueControl: Stopping since channel closed"); break; From 6dce55a99b93d894944f576e9c68cf68c46087dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C4=99drzej=20Stuczy=C5=84ski?= Date: Fri, 24 Oct 2025 14:03:18 +0200 Subject: [PATCH 4/7] using same hierarchy of trackers for client shutdown control --- .../client-core/src/client/base_client/mod.rs | 18 +++++++++--------- sdk/rust/nym-sdk/src/mixnet/client.rs | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/common/client-core/src/client/base_client/mod.rs b/common/client-core/src/client/base_client/mod.rs index a663aadd516..b045ed99551 100644 --- a/common/client-core/src/client/base_client/mod.rs +++ b/common/client-core/src/client/base_client/mod.rs @@ -1004,7 +1004,7 @@ where // Create a shutdown tracker for this client - either as a child of provided tracker // or get one from the registry let shutdown_tracker = match self.shutdown { - Some(parent_tracker) => parent_tracker.child_tracker(), + Some(parent_tracker) => parent_tracker.clone(), None => nym_task::get_sdk_shutdown_tracker()?, }; @@ -1044,7 +1044,7 @@ where self.user_agent.clone(), generate_client_stats_id(*self_address.identity()), input_sender.clone(), - &shutdown_tracker.child_tracker(), + &shutdown_tracker.clone(), ); // needs to be started as the first thing to block if required waiting for the gateway @@ -1054,7 +1054,7 @@ where shared_topology_accessor.clone(), self_address.gateway(), self.wait_for_gateway, - &shutdown_tracker.child_tracker(), + &shutdown_tracker.clone(), ) .await?; @@ -1074,7 +1074,7 @@ where stats_reporter.clone(), #[cfg(unix)] self.connection_fd_callback, - &shutdown_tracker.child_tracker(), + &shutdown_tracker.clone(), ) .await?; let gateway_ws_fd = gateway_transceiver.ws_fd(); @@ -1082,7 +1082,7 @@ where let reply_storage = Self::setup_persistent_reply_storage( reply_storage_backend, key_rotation_config, - &shutdown_tracker.child_tracker(), + &shutdown_tracker.clone(), ) .await?; @@ -1093,7 +1093,7 @@ where reply_storage.key_storage(), reply_controller_sender.clone(), stats_reporter.clone(), - &shutdown_tracker.child_tracker(), + &shutdown_tracker.clone(), ); // The message_sender is the transmitter for any component generating sphinx packets @@ -1103,7 +1103,7 @@ where let (message_sender, client_request_sender) = Self::start_mix_traffic_controller( gateway_transceiver, - &shutdown_tracker.child_tracker(), + &shutdown_tracker.clone(), EventSender(event_sender), ); @@ -1134,7 +1134,7 @@ where shared_lane_queue_lengths.clone(), client_connection_rx, stats_reporter.clone(), - &shutdown_tracker.child_tracker(), + &shutdown_tracker.clone(), ); if !self @@ -1150,7 +1150,7 @@ where shared_topology_accessor.clone(), message_sender, stats_reporter.clone(), - &shutdown_tracker.child_tracker(), + &shutdown_tracker.clone(), ); } diff --git a/sdk/rust/nym-sdk/src/mixnet/client.rs b/sdk/rust/nym-sdk/src/mixnet/client.rs index a9e12b0fe5f..8c59b29e90e 100644 --- a/sdk/rust/nym-sdk/src/mixnet/client.rs +++ b/sdk/rust/nym-sdk/src/mixnet/client.rs @@ -809,7 +809,7 @@ where client_output, client_state.clone(), nym_address, - started_client.shutdown_handle.child_tracker(), + started_client.shutdown_handle.clone(), packet_type, ); From 08559a76601c94e1f914af21171f148396e4d5b7 Mon Sep 17 00:00:00 2001 From: Simon Wicky Date: Fri, 24 Oct 2025 14:07:15 +0200 Subject: [PATCH 5/7] calling for shutdown from the MixTrafficController --- .../client-core/src/client/base_client/mod.rs | 2 +- .../client-core/src/client/mix_traffic/mod.rs | 7 +++++- .../real_traffic_stream.rs | 24 ++++++------------- 3 files changed, 14 insertions(+), 19 deletions(-) diff --git a/common/client-core/src/client/base_client/mod.rs b/common/client-core/src/client/base_client/mod.rs index b045ed99551..e907cc5c7c0 100644 --- a/common/client-core/src/client/base_client/mod.rs +++ b/common/client-core/src/client/base_client/mod.rs @@ -801,7 +801,7 @@ where event_tx, ); - let mix_tx = mix_traffic_controller.mix_rx(); + let mix_tx = mix_traffic_controller.mix_tx(); let client_tx = mix_traffic_controller.client_tx(); shutdown_tracker.try_spawn_named( diff --git a/common/client-core/src/client/mix_traffic/mod.rs b/common/client-core/src/client/mix_traffic/mod.rs index c6a158e2e46..3aced33db7b 100644 --- a/common/client-core/src/client/mix_traffic/mod.rs +++ b/common/client-core/src/client/mix_traffic/mod.rs @@ -87,7 +87,7 @@ impl MixTrafficController { self.client_tx.clone() } - pub fn mix_rx(&self) -> BatchMixMessageSender { + pub fn mix_tx(&self) -> BatchMixMessageSender { self.mix_tx.clone() } @@ -159,6 +159,11 @@ impl MixTrafficController { // Do we need to handle the embedded mixnet client case // separately? self.event_tx.send(MixnetClientEvent::Traffic(MixTrafficEvent::FailedSendingSphinx)); + // IMO it shouldn't be signalled from there but it is how it is + // TODO : report the failure upwards and shutdown from upwards + // Gateway is dead, we have to shut down currently + error!("Signalling shutdown from the MixTrafficController"); + self.shutdown_token.cancel(); break; } } diff --git a/common/client-core/src/client/real_messages_control/real_traffic_stream.rs b/common/client-core/src/client/real_messages_control/real_traffic_stream.rs index 50fca6343ab..37b9dbac633 100644 --- a/common/client-core/src/client/real_messages_control/real_traffic_stream.rs +++ b/common/client-core/src/client/real_messages_control/real_traffic_stream.rs @@ -598,20 +598,15 @@ where tracing::trace!("OutQueueControl: Received shutdown"); break; } - _ = status_timer.tick() => { - self.log_status(); - } - next_message = self.next() => if let Some(next_message) = next_message { - // Check if mix_tx channel is closed BEFORE processing message - if self.mix_tx.is_closed() { - tracing::error!("OutQueueControl: mix_tx channel closed, stopping traffic stream"); + _ = status_timer.tick() => { + self.log_status(); + } + next_message = self.next() => if let Some(next_message) = next_message { + self.on_message(next_message).await; + } else { + tracing::trace!("OutQueueControl: Stopping since channel closed"); break; } - self.on_message(next_message).await; - } else { - tracing::trace!("OutQueueControl: Stopping since channel closed"); - break; - } } } } @@ -626,11 +621,6 @@ where break; } next_message = self.next() => if let Some(next_message) = next_message { - // Check if mix_tx channel is closed BEFORE processing message - if self.mix_tx.is_closed() { - tracing::error!("OutQueueControl: mix_tx channel closed, stopping traffic stream"); - break; - } self.on_message(next_message).await; } else { tracing::trace!("OutQueueControl: Stopping since channel closed"); From c61df791820dad6c0e3fbaf459d2cc23ad60425f Mon Sep 17 00:00:00 2001 From: Simon Wicky Date: Fri, 24 Oct 2025 14:11:56 +0200 Subject: [PATCH 6/7] typo --- .../src/client/real_messages_control/real_traffic_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/client-core/src/client/real_messages_control/real_traffic_stream.rs b/common/client-core/src/client/real_messages_control/real_traffic_stream.rs index 37b9dbac633..1b90208b6dd 100644 --- a/common/client-core/src/client/real_messages_control/real_traffic_stream.rs +++ b/common/client-core/src/client/real_messages_control/real_traffic_stream.rs @@ -615,7 +615,7 @@ where { loop { tokio::select! { - biased; + biased; _ = shutdown_token.cancelled() => { tracing::trace!("OutQueueControl: Received shutdown"); break; From fd5a95fa4d5ae991b18d86aa0e95b4040730582e Mon Sep 17 00:00:00 2001 From: Simon Wicky Date: Fri, 24 Oct 2025 16:17:29 +0200 Subject: [PATCH 7/7] allow overwriting existing sdk shutdown manager --- .../client-core/src/client/base_client/mod.rs | 2 +- .../src/client/cover_traffic_stream.rs | 2 +- common/task/src/lib.rs | 4 +- common/task/src/runtime_registry.rs | 50 +++++++++++++------ sdk/rust/nym-sdk/src/mixnet/client.rs | 14 ++---- 5 files changed, 43 insertions(+), 29 deletions(-) diff --git a/common/client-core/src/client/base_client/mod.rs b/common/client-core/src/client/base_client/mod.rs index e907cc5c7c0..e414e81ed5a 100644 --- a/common/client-core/src/client/base_client/mod.rs +++ b/common/client-core/src/client/base_client/mod.rs @@ -1005,7 +1005,7 @@ where // or get one from the registry let shutdown_tracker = match self.shutdown { Some(parent_tracker) => parent_tracker.clone(), - None => nym_task::get_sdk_shutdown_tracker()?, + None => nym_task::create_sdk_shutdown_tracker()?, }; Self::start_event_control(self.event_tx, event_receiver, &shutdown_tracker); diff --git a/common/client-core/src/client/cover_traffic_stream.rs b/common/client-core/src/client/cover_traffic_stream.rs index 77e8def15cc..8b7aa8494f8 100644 --- a/common/client-core/src/client/cover_traffic_stream.rs +++ b/common/client-core/src/client/cover_traffic_stream.rs @@ -205,7 +205,7 @@ impl LoopCoverTrafficStream { TrySendError::Full(_) => { // This isn't a problem, if the channel is full means we're already sending the // max amount of messages downstream can handle. - tracing::debug!("Failed to send cover message - channel full"); + tracing::trace!("Failed to send cover message - channel full"); } TrySendError::Closed(_) => { tracing::warn!("Failed to send cover message - channel closed"); diff --git a/common/task/src/lib.rs b/common/task/src/lib.rs index 3eaf27cbde7..2d095da7577 100644 --- a/common/task/src/lib.rs +++ b/common/task/src/lib.rs @@ -24,6 +24,6 @@ pub use crate::runtime_registry::RegistryAccessError; /// Get or create a ShutdownTracker for SDK use. /// This provides automatic task management without requiring manual setup. -pub fn get_sdk_shutdown_tracker() -> Result { - Ok(runtime_registry::RuntimeRegistry::get_or_create_sdk()?.shutdown_tracker_owned()) +pub fn create_sdk_shutdown_tracker() -> Result { + Ok(runtime_registry::RuntimeRegistry::create_sdk()?.shutdown_tracker_owned()) } diff --git a/common/task/src/runtime_registry.rs b/common/task/src/runtime_registry.rs index 11be65331df..08bf52613cf 100644 --- a/common/task/src/runtime_registry.rs +++ b/common/task/src/runtime_registry.rs @@ -19,32 +19,47 @@ pub(crate) struct RuntimeRegistry { pub enum RegistryAccessError { #[error("the runtime registry is poisoned")] Poisoned, + + #[error("The SDK ShutdownManager already exists")] + ExistingShutdownManager, + + #[error("No existing SDK ShutdownManager")] + MissingShutdownManager, } impl RuntimeRegistry { - /// Get or create a ShutdownManager for SDK use. + /// Create a ShutdownManager for SDK use. /// This manager doesn't listen to OS signals, making it suitable for library use. - pub(crate) fn get_or_create_sdk() -> Result, RegistryAccessError> { - let guard = REGISTRY - .sdk_manager - .read() - .map_err(|_| RegistryAccessError::Poisoned)?; - if let Some(manager) = guard.as_ref() { - return Ok(manager.clone()); - } - drop(guard); - + /// This function overwrite any existing manager! + pub(crate) fn create_sdk() -> Result, RegistryAccessError> { let mut guard = REGISTRY .sdk_manager .write() .map_err(|_| RegistryAccessError::Poisoned)?; + Ok(guard - .get_or_insert_with(|| { - Arc::new(ShutdownManager::new_without_signals().with_cancel_on_panic()) - }) + .insert(Arc::new( + ShutdownManager::new_without_signals().with_cancel_on_panic(), + )) .clone()) } + /// Get the ShutdownManager for SDK use. + /// This manager doesn't listen to OS signals, making it suitable for library use. + /// Not yet used, but maybe in the future + #[allow(dead_code)] + pub(crate) fn get_sdk() -> Result, RegistryAccessError> { + let guard = REGISTRY + .sdk_manager + .read() + .map_err(|_| RegistryAccessError::Poisoned)?; + if let Some(manager) = guard.as_ref() { + Ok(manager.clone()) + } else { + Err(RegistryAccessError::MissingShutdownManager) + } + } + /// Check if an SDK manager has been created. /// Useful for testing and debugging. #[allow(dead_code)] @@ -85,10 +100,13 @@ mod tests { assert!(!RuntimeRegistry::has_sdk_manager().unwrap()); - let manager1 = RuntimeRegistry::get_or_create_sdk().unwrap(); + // Error if nothing was created + assert!(RuntimeRegistry::get_sdk().is_err()); + + let manager1 = RuntimeRegistry::create_sdk().unwrap(); assert!(RuntimeRegistry::has_sdk_manager().unwrap()); - let manager2 = RuntimeRegistry::get_or_create_sdk().unwrap(); + let manager2 = RuntimeRegistry::get_sdk().unwrap(); // Should return the same instance assert!(Arc::ptr_eq(&manager1, &manager2)); diff --git a/sdk/rust/nym-sdk/src/mixnet/client.rs b/sdk/rust/nym-sdk/src/mixnet/client.rs index 8c59b29e90e..27c9983f7fd 100644 --- a/sdk/rust/nym-sdk/src/mixnet/client.rs +++ b/sdk/rust/nym-sdk/src/mixnet/client.rs @@ -736,15 +736,11 @@ where base_builder = base_builder.with_topology_provider(topology_provider); } - // Use custom shutdown if provided, otherwise get from registry - let shutdown_tracker = match self.custom_shutdown { - Some(custom) => custom, - None => { - // Auto-create from registry for SDK use - nym_task::get_sdk_shutdown_tracker()? - } - }; - base_builder = base_builder.with_shutdown(shutdown_tracker); + // Use custom shutdown if provided, otherwise the sdk one will be used later down the line + if let Some(shutdown_tracker) = self.custom_shutdown { + base_builder = base_builder.with_shutdown(shutdown_tracker); + } + if let Some(event_tx) = self.event_tx { base_builder = base_builder.with_event_tx(event_tx); }