Skip to content

Commit 981b32c

Browse files
tommyv1987Tommy Verrall
authored andcommitted
Merge pull request #6143 from nymtech/bugfix/mix-tx-closed-v2
Bugfix: Add circuit breaker
1 parent 9c336f5 commit 981b32c

File tree

7 files changed

+66
-42
lines changed

7 files changed

+66
-42
lines changed

common/client-core/src/client/base_client/mod.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -783,7 +783,7 @@ where
783783
event_tx,
784784
);
785785

786-
let mix_tx = mix_traffic_controller.mix_rx();
786+
let mix_tx = mix_traffic_controller.mix_tx();
787787
let client_tx = mix_traffic_controller.client_tx();
788788

789789
shutdown_tracker.try_spawn_named(
@@ -940,8 +940,8 @@ where
940940
// Create a shutdown tracker for this client - either as a child of provided tracker
941941
// or get one from the registry
942942
let shutdown_tracker = match self.shutdown {
943-
Some(parent_tracker) => parent_tracker.child_tracker(),
944-
None => nym_task::get_sdk_shutdown_tracker()?,
943+
Some(parent_tracker) => parent_tracker.clone(),
944+
None => nym_task::create_sdk_shutdown_tracker()?,
945945
};
946946

947947
Self::start_event_control(self.event_tx, event_receiver, &shutdown_tracker);
@@ -976,7 +976,7 @@ where
976976
self.user_agent.clone(),
977977
generate_client_stats_id(*self_address.identity()),
978978
input_sender.clone(),
979-
&shutdown_tracker.child_tracker(),
979+
&shutdown_tracker.clone(),
980980
);
981981

982982
// needs to be started as the first thing to block if required waiting for the gateway
@@ -986,7 +986,7 @@ where
986986
shared_topology_accessor.clone(),
987987
self_address.gateway(),
988988
self.wait_for_gateway,
989-
&shutdown_tracker.child_tracker(),
989+
&shutdown_tracker.clone(),
990990
)
991991
.await?;
992992

@@ -1006,15 +1006,15 @@ where
10061006
stats_reporter.clone(),
10071007
#[cfg(unix)]
10081008
self.connection_fd_callback,
1009-
&shutdown_tracker.child_tracker(),
1009+
&shutdown_tracker.clone(),
10101010
)
10111011
.await?;
10121012
let gateway_ws_fd = gateway_transceiver.ws_fd();
10131013

10141014
let reply_storage = Self::setup_persistent_reply_storage(
10151015
reply_storage_backend,
10161016
key_rotation_config,
1017-
&shutdown_tracker.child_tracker(),
1017+
&shutdown_tracker.clone(),
10181018
)
10191019
.await?;
10201020

@@ -1025,7 +1025,7 @@ where
10251025
reply_storage.key_storage(),
10261026
reply_controller_sender.clone(),
10271027
stats_reporter.clone(),
1028-
&shutdown_tracker.child_tracker(),
1028+
&shutdown_tracker.clone(),
10291029
);
10301030

10311031
// The message_sender is the transmitter for any component generating sphinx packets
@@ -1035,7 +1035,7 @@ where
10351035

10361036
let (message_sender, client_request_sender) = Self::start_mix_traffic_controller(
10371037
gateway_transceiver,
1038-
&shutdown_tracker.child_tracker(),
1038+
&shutdown_tracker.clone(),
10391039
EventSender(event_sender),
10401040
);
10411041

@@ -1066,7 +1066,7 @@ where
10661066
shared_lane_queue_lengths.clone(),
10671067
client_connection_rx,
10681068
stats_reporter.clone(),
1069-
&shutdown_tracker.child_tracker(),
1069+
&shutdown_tracker.clone(),
10701070
);
10711071

10721072
if !self
@@ -1082,7 +1082,7 @@ where
10821082
shared_topology_accessor.clone(),
10831083
message_sender,
10841084
stats_reporter.clone(),
1085-
&shutdown_tracker.child_tracker(),
1085+
&shutdown_tracker.clone(),
10861086
);
10871087
}
10881088

common/client-core/src/client/cover_traffic_stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ impl LoopCoverTrafficStream<OsRng> {
205205
TrySendError::Full(_) => {
206206
// This isn't a problem, if the channel is full means we're already sending the
207207
// max amount of messages downstream can handle.
208-
tracing::debug!("Failed to send cover message - channel full");
208+
tracing::trace!("Failed to send cover message - channel full");
209209
}
210210
TrySendError::Closed(_) => {
211211
tracing::warn!("Failed to send cover message - channel closed");

common/client-core/src/client/mix_traffic/mod.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ pub mod transceiver;
2020

2121
// We remind ourselves that 32 x 32kb = 1024kb, a reasonable size for a network buffer.
2222
pub const MIX_MESSAGE_RECEIVER_BUFFER_SIZE: usize = 32;
23-
const MAX_FAILURE_COUNT: usize = 100;
23+
24+
/// Reduced from 100 to 20 to fail fast (~1-2 seconds instead of ~6 seconds).
25+
/// If we can't send 20 packets in a row, the gateway is unreachable.
26+
const MAX_FAILURE_COUNT: usize = 20;
2427

2528
// that's also disgusting.
2629
pub struct Empty;
@@ -84,7 +87,7 @@ impl MixTrafficController {
8487
self.client_tx.clone()
8588
}
8689

87-
pub fn mix_rx(&self) -> BatchMixMessageSender {
90+
pub fn mix_tx(&self) -> BatchMixMessageSender {
8891
self.mix_tx.clone()
8992
}
9093

@@ -156,6 +159,11 @@ impl MixTrafficController {
156159
// Do we need to handle the embedded mixnet client case
157160
// separately?
158161
self.event_tx.send(MixnetClientEvent::Traffic(MixTrafficEvent::FailedSendingSphinx));
162+
// IMO it shouldn't be signalled from there but it is how it is
163+
// TODO : report the failure upwards and shutdown from upwards
164+
// Gateway is dead, we have to shut down currently
165+
error!("Signalling shutdown from the MixTrafficController");
166+
self.shutdown_token.cancel();
159167
break;
160168
}
161169
}

common/client-core/src/client/real_messages_control/real_traffic_stream.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,8 @@ where
298298
"failed to send mixnet packet due to closed channel (outside of shutdown!)"
299299
);
300300
}
301+
// Early return to avoid further processing when channel is closed
302+
return;
301303
}
302304
Ok(_) => {
303305
let event = if fragment_id.is_some() {

common/task/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,6 @@ pub use crate::runtime_registry::RegistryAccessError;
2424

2525
/// Get or create a ShutdownTracker for SDK use.
2626
/// This provides automatic task management without requiring manual setup.
27-
pub fn get_sdk_shutdown_tracker() -> Result<ShutdownTracker, RegistryAccessError> {
28-
Ok(runtime_registry::RuntimeRegistry::get_or_create_sdk()?.shutdown_tracker_owned())
27+
pub fn create_sdk_shutdown_tracker() -> Result<ShutdownTracker, RegistryAccessError> {
28+
Ok(runtime_registry::RuntimeRegistry::create_sdk()?.shutdown_tracker_owned())
2929
}

common/task/src/runtime_registry.rs

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,32 +19,47 @@ pub(crate) struct RuntimeRegistry {
1919
pub enum RegistryAccessError {
2020
#[error("the runtime registry is poisoned")]
2121
Poisoned,
22+
23+
#[error("The SDK ShutdownManager already exists")]
24+
ExistingShutdownManager,
25+
26+
#[error("No existing SDK ShutdownManager")]
27+
MissingShutdownManager,
2228
}
2329

2430
impl RuntimeRegistry {
25-
/// Get or create a ShutdownManager for SDK use.
31+
/// Create a ShutdownManager for SDK use.
2632
/// This manager doesn't listen to OS signals, making it suitable for library use.
27-
pub(crate) fn get_or_create_sdk() -> Result<Arc<ShutdownManager>, RegistryAccessError> {
28-
let guard = REGISTRY
29-
.sdk_manager
30-
.read()
31-
.map_err(|_| RegistryAccessError::Poisoned)?;
32-
if let Some(manager) = guard.as_ref() {
33-
return Ok(manager.clone());
34-
}
35-
drop(guard);
36-
33+
/// This function overwrite any existing manager!
34+
pub(crate) fn create_sdk() -> Result<Arc<ShutdownManager>, RegistryAccessError> {
3735
let mut guard = REGISTRY
3836
.sdk_manager
3937
.write()
4038
.map_err(|_| RegistryAccessError::Poisoned)?;
39+
4140
Ok(guard
42-
.get_or_insert_with(|| {
43-
Arc::new(ShutdownManager::new_without_signals().with_cancel_on_panic())
44-
})
41+
.insert(Arc::new(
42+
ShutdownManager::new_without_signals().with_cancel_on_panic(),
43+
))
4544
.clone())
4645
}
4746

47+
/// Get the ShutdownManager for SDK use.
48+
/// This manager doesn't listen to OS signals, making it suitable for library use.
49+
/// Not yet used, but maybe in the future
50+
#[allow(dead_code)]
51+
pub(crate) fn get_sdk() -> Result<Arc<ShutdownManager>, RegistryAccessError> {
52+
let guard = REGISTRY
53+
.sdk_manager
54+
.read()
55+
.map_err(|_| RegistryAccessError::Poisoned)?;
56+
if let Some(manager) = guard.as_ref() {
57+
Ok(manager.clone())
58+
} else {
59+
Err(RegistryAccessError::MissingShutdownManager)
60+
}
61+
}
62+
4863
/// Check if an SDK manager has been created.
4964
/// Useful for testing and debugging.
5065
#[allow(dead_code)]
@@ -85,10 +100,13 @@ mod tests {
85100

86101
assert!(!RuntimeRegistry::has_sdk_manager().unwrap());
87102

88-
let manager1 = RuntimeRegistry::get_or_create_sdk().unwrap();
103+
// Error if nothing was created
104+
assert!(RuntimeRegistry::get_sdk().is_err());
105+
106+
let manager1 = RuntimeRegistry::create_sdk().unwrap();
89107
assert!(RuntimeRegistry::has_sdk_manager().unwrap());
90108

91-
let manager2 = RuntimeRegistry::get_or_create_sdk().unwrap();
109+
let manager2 = RuntimeRegistry::get_sdk().unwrap();
92110
// Should return the same instance
93111
assert!(Arc::ptr_eq(&manager1, &manager2));
94112

sdk/rust/nym-sdk/src/mixnet/client.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -720,15 +720,11 @@ where
720720
base_builder = base_builder.with_topology_provider(topology_provider);
721721
}
722722

723-
// Use custom shutdown if provided, otherwise get from registry
724-
let shutdown_tracker = match self.custom_shutdown {
725-
Some(custom) => custom,
726-
None => {
727-
// Auto-create from registry for SDK use
728-
nym_task::get_sdk_shutdown_tracker()?
729-
}
730-
};
731-
base_builder = base_builder.with_shutdown(shutdown_tracker);
723+
// Use custom shutdown if provided, otherwise the sdk one will be used later down the line
724+
if let Some(shutdown_tracker) = self.custom_shutdown {
725+
base_builder = base_builder.with_shutdown(shutdown_tracker);
726+
}
727+
732728
if let Some(event_tx) = self.event_tx {
733729
base_builder = base_builder.with_event_tx(event_tx);
734730
}
@@ -793,7 +789,7 @@ where
793789
client_output,
794790
client_state.clone(),
795791
nym_address,
796-
started_client.shutdown_handle.child_tracker(),
792+
started_client.shutdown_handle.clone(),
797793
packet_type,
798794
);
799795

0 commit comments

Comments
 (0)