diff --git a/common/client-core/src/client/base_client/mod.rs b/common/client-core/src/client/base_client/mod.rs index a663aadd51..e414e81ed5 100644 --- a/common/client-core/src/client/base_client/mod.rs +++ b/common/client-core/src/client/base_client/mod.rs @@ -801,7 +801,7 @@ where event_tx, ); - let mix_tx = mix_traffic_controller.mix_rx(); + let mix_tx = mix_traffic_controller.mix_tx(); let client_tx = mix_traffic_controller.client_tx(); shutdown_tracker.try_spawn_named( @@ -1004,8 +1004,8 @@ where // Create a shutdown tracker for this client - either as a child of provided tracker // or get one from the registry let shutdown_tracker = match self.shutdown { - Some(parent_tracker) => parent_tracker.child_tracker(), - None => nym_task::get_sdk_shutdown_tracker()?, + Some(parent_tracker) => parent_tracker.clone(), + None => nym_task::create_sdk_shutdown_tracker()?, }; Self::start_event_control(self.event_tx, event_receiver, &shutdown_tracker); @@ -1044,7 +1044,7 @@ where self.user_agent.clone(), generate_client_stats_id(*self_address.identity()), input_sender.clone(), - &shutdown_tracker.child_tracker(), + &shutdown_tracker.clone(), ); // needs to be started as the first thing to block if required waiting for the gateway @@ -1054,7 +1054,7 @@ where shared_topology_accessor.clone(), self_address.gateway(), self.wait_for_gateway, - &shutdown_tracker.child_tracker(), + &shutdown_tracker.clone(), ) .await?; @@ -1074,7 +1074,7 @@ where stats_reporter.clone(), #[cfg(unix)] self.connection_fd_callback, - &shutdown_tracker.child_tracker(), + &shutdown_tracker.clone(), ) .await?; let gateway_ws_fd = gateway_transceiver.ws_fd(); @@ -1082,7 +1082,7 @@ where let reply_storage = Self::setup_persistent_reply_storage( reply_storage_backend, key_rotation_config, - &shutdown_tracker.child_tracker(), + &shutdown_tracker.clone(), ) .await?; @@ -1093,7 +1093,7 @@ where reply_storage.key_storage(), reply_controller_sender.clone(), stats_reporter.clone(), - &shutdown_tracker.child_tracker(), + &shutdown_tracker.clone(), ); // The message_sender is the transmitter for any component generating sphinx packets @@ -1103,7 +1103,7 @@ where let (message_sender, client_request_sender) = Self::start_mix_traffic_controller( gateway_transceiver, - &shutdown_tracker.child_tracker(), + &shutdown_tracker.clone(), EventSender(event_sender), ); @@ -1134,7 +1134,7 @@ where shared_lane_queue_lengths.clone(), client_connection_rx, stats_reporter.clone(), - &shutdown_tracker.child_tracker(), + &shutdown_tracker.clone(), ); if !self @@ -1150,7 +1150,7 @@ where shared_topology_accessor.clone(), message_sender, stats_reporter.clone(), - &shutdown_tracker.child_tracker(), + &shutdown_tracker.clone(), ); } diff --git a/common/client-core/src/client/cover_traffic_stream.rs b/common/client-core/src/client/cover_traffic_stream.rs index 77e8def15c..8b7aa8494f 100644 --- a/common/client-core/src/client/cover_traffic_stream.rs +++ b/common/client-core/src/client/cover_traffic_stream.rs @@ -205,7 +205,7 @@ impl LoopCoverTrafficStream { TrySendError::Full(_) => { // This isn't a problem, if the channel is full means we're already sending the // max amount of messages downstream can handle. - tracing::debug!("Failed to send cover message - channel full"); + tracing::trace!("Failed to send cover message - channel full"); } TrySendError::Closed(_) => { tracing::warn!("Failed to send cover message - channel closed"); diff --git a/common/client-core/src/client/mix_traffic/mod.rs b/common/client-core/src/client/mix_traffic/mod.rs index bf83c62462..3aced33db7 100644 --- a/common/client-core/src/client/mix_traffic/mod.rs +++ b/common/client-core/src/client/mix_traffic/mod.rs @@ -20,7 +20,10 @@ pub mod transceiver; // We remind ourselves that 32 x 32kb = 1024kb, a reasonable size for a network buffer. pub const MIX_MESSAGE_RECEIVER_BUFFER_SIZE: usize = 32; -const MAX_FAILURE_COUNT: usize = 100; + +/// Reduced from 100 to 20 to fail fast (~1-2 seconds instead of ~6 seconds). +/// If we can't send 20 packets in a row, the gateway is unreachable. +const MAX_FAILURE_COUNT: usize = 20; // that's also disgusting. pub struct Empty; @@ -84,7 +87,7 @@ impl MixTrafficController { self.client_tx.clone() } - pub fn mix_rx(&self) -> BatchMixMessageSender { + pub fn mix_tx(&self) -> BatchMixMessageSender { self.mix_tx.clone() } @@ -156,6 +159,11 @@ impl MixTrafficController { // Do we need to handle the embedded mixnet client case // separately? self.event_tx.send(MixnetClientEvent::Traffic(MixTrafficEvent::FailedSendingSphinx)); + // IMO it shouldn't be signalled from there but it is how it is + // TODO : report the failure upwards and shutdown from upwards + // Gateway is dead, we have to shut down currently + error!("Signalling shutdown from the MixTrafficController"); + self.shutdown_token.cancel(); break; } } diff --git a/common/client-core/src/client/real_messages_control/real_traffic_stream.rs b/common/client-core/src/client/real_messages_control/real_traffic_stream.rs index 07b3506c45..1b90208b6d 100644 --- a/common/client-core/src/client/real_messages_control/real_traffic_stream.rs +++ b/common/client-core/src/client/real_messages_control/real_traffic_stream.rs @@ -298,6 +298,8 @@ where "failed to send mixnet packet due to closed channel (outside of shutdown!)" ); } + // Early return to avoid further processing when channel is closed + return; } Ok(_) => { let event = if fragment_id.is_some() { diff --git a/common/task/src/lib.rs b/common/task/src/lib.rs index 3eaf27cbde..2d095da757 100644 --- a/common/task/src/lib.rs +++ b/common/task/src/lib.rs @@ -24,6 +24,6 @@ pub use crate::runtime_registry::RegistryAccessError; /// Get or create a ShutdownTracker for SDK use. /// This provides automatic task management without requiring manual setup. -pub fn get_sdk_shutdown_tracker() -> Result { - Ok(runtime_registry::RuntimeRegistry::get_or_create_sdk()?.shutdown_tracker_owned()) +pub fn create_sdk_shutdown_tracker() -> Result { + Ok(runtime_registry::RuntimeRegistry::create_sdk()?.shutdown_tracker_owned()) } diff --git a/common/task/src/runtime_registry.rs b/common/task/src/runtime_registry.rs index 11be65331d..08bf52613c 100644 --- a/common/task/src/runtime_registry.rs +++ b/common/task/src/runtime_registry.rs @@ -19,32 +19,47 @@ pub(crate) struct RuntimeRegistry { pub enum RegistryAccessError { #[error("the runtime registry is poisoned")] Poisoned, + + #[error("The SDK ShutdownManager already exists")] + ExistingShutdownManager, + + #[error("No existing SDK ShutdownManager")] + MissingShutdownManager, } impl RuntimeRegistry { - /// Get or create a ShutdownManager for SDK use. + /// Create a ShutdownManager for SDK use. /// This manager doesn't listen to OS signals, making it suitable for library use. - pub(crate) fn get_or_create_sdk() -> Result, RegistryAccessError> { - let guard = REGISTRY - .sdk_manager - .read() - .map_err(|_| RegistryAccessError::Poisoned)?; - if let Some(manager) = guard.as_ref() { - return Ok(manager.clone()); - } - drop(guard); - + /// This function overwrite any existing manager! + pub(crate) fn create_sdk() -> Result, RegistryAccessError> { let mut guard = REGISTRY .sdk_manager .write() .map_err(|_| RegistryAccessError::Poisoned)?; + Ok(guard - .get_or_insert_with(|| { - Arc::new(ShutdownManager::new_without_signals().with_cancel_on_panic()) - }) + .insert(Arc::new( + ShutdownManager::new_without_signals().with_cancel_on_panic(), + )) .clone()) } + /// Get the ShutdownManager for SDK use. + /// This manager doesn't listen to OS signals, making it suitable for library use. + /// Not yet used, but maybe in the future + #[allow(dead_code)] + pub(crate) fn get_sdk() -> Result, RegistryAccessError> { + let guard = REGISTRY + .sdk_manager + .read() + .map_err(|_| RegistryAccessError::Poisoned)?; + if let Some(manager) = guard.as_ref() { + Ok(manager.clone()) + } else { + Err(RegistryAccessError::MissingShutdownManager) + } + } + /// Check if an SDK manager has been created. /// Useful for testing and debugging. #[allow(dead_code)] @@ -85,10 +100,13 @@ mod tests { assert!(!RuntimeRegistry::has_sdk_manager().unwrap()); - let manager1 = RuntimeRegistry::get_or_create_sdk().unwrap(); + // Error if nothing was created + assert!(RuntimeRegistry::get_sdk().is_err()); + + let manager1 = RuntimeRegistry::create_sdk().unwrap(); assert!(RuntimeRegistry::has_sdk_manager().unwrap()); - let manager2 = RuntimeRegistry::get_or_create_sdk().unwrap(); + let manager2 = RuntimeRegistry::get_sdk().unwrap(); // Should return the same instance assert!(Arc::ptr_eq(&manager1, &manager2)); diff --git a/sdk/rust/nym-sdk/src/mixnet/client.rs b/sdk/rust/nym-sdk/src/mixnet/client.rs index a9e12b0fe5..27c9983f7f 100644 --- a/sdk/rust/nym-sdk/src/mixnet/client.rs +++ b/sdk/rust/nym-sdk/src/mixnet/client.rs @@ -736,15 +736,11 @@ where base_builder = base_builder.with_topology_provider(topology_provider); } - // Use custom shutdown if provided, otherwise get from registry - let shutdown_tracker = match self.custom_shutdown { - Some(custom) => custom, - None => { - // Auto-create from registry for SDK use - nym_task::get_sdk_shutdown_tracker()? - } - }; - base_builder = base_builder.with_shutdown(shutdown_tracker); + // Use custom shutdown if provided, otherwise the sdk one will be used later down the line + if let Some(shutdown_tracker) = self.custom_shutdown { + base_builder = base_builder.with_shutdown(shutdown_tracker); + } + if let Some(event_tx) = self.event_tx { base_builder = base_builder.with_event_tx(event_tx); } @@ -809,7 +805,7 @@ where client_output, client_state.clone(), nym_address, - started_client.shutdown_handle.child_tracker(), + started_client.shutdown_handle.clone(), packet_type, );