Skip to content

Commit 8b482fb

Browse files
tommyv1987jstuczyn
authored andcommitted
Merge pull request #6143 from nymtech/bugfix/mix-tx-closed-v2
Bugfix: Add circuit breaker
1 parent ae388d7 commit 8b482fb

File tree

7 files changed

+66
-42
lines changed

7 files changed

+66
-42
lines changed

common/client-core/src/client/base_client/mod.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -791,7 +791,7 @@ where
791791
event_tx,
792792
);
793793

794-
let mix_tx = mix_traffic_controller.mix_rx();
794+
let mix_tx = mix_traffic_controller.mix_tx();
795795
let client_tx = mix_traffic_controller.client_tx();
796796

797797
shutdown_tracker.try_spawn_named(
@@ -958,8 +958,8 @@ where
958958
// Create a shutdown tracker for this client - either as a child of provided tracker
959959
// or get one from the registry
960960
let shutdown_tracker = match self.shutdown {
961-
Some(parent_tracker) => parent_tracker.child_tracker(),
962-
None => nym_task::get_sdk_shutdown_tracker()?,
961+
Some(parent_tracker) => parent_tracker.clone(),
962+
None => nym_task::create_sdk_shutdown_tracker()?,
963963
};
964964

965965
Self::start_event_control(self.event_tx, event_receiver, &shutdown_tracker);
@@ -998,7 +998,7 @@ where
998998
self.user_agent.clone(),
999999
generate_client_stats_id(*self_address.identity()),
10001000
input_sender.clone(),
1001-
&shutdown_tracker.child_tracker(),
1001+
&shutdown_tracker.clone(),
10021002
);
10031003

10041004
// needs to be started as the first thing to block if required waiting for the gateway
@@ -1008,7 +1008,7 @@ where
10081008
shared_topology_accessor.clone(),
10091009
self_address.gateway(),
10101010
self.wait_for_gateway,
1011-
&shutdown_tracker.child_tracker(),
1011+
&shutdown_tracker.clone(),
10121012
)
10131013
.await?;
10141014

@@ -1028,15 +1028,15 @@ where
10281028
stats_reporter.clone(),
10291029
#[cfg(unix)]
10301030
self.connection_fd_callback,
1031-
&shutdown_tracker.child_tracker(),
1031+
&shutdown_tracker.clone(),
10321032
)
10331033
.await?;
10341034
let gateway_ws_fd = gateway_transceiver.ws_fd();
10351035

10361036
let reply_storage = Self::setup_persistent_reply_storage(
10371037
reply_storage_backend,
10381038
key_rotation_config,
1039-
&shutdown_tracker.child_tracker(),
1039+
&shutdown_tracker.clone(),
10401040
)
10411041
.await?;
10421042

@@ -1047,7 +1047,7 @@ where
10471047
reply_storage.key_storage(),
10481048
reply_controller_sender.clone(),
10491049
stats_reporter.clone(),
1050-
&shutdown_tracker.child_tracker(),
1050+
&shutdown_tracker.clone(),
10511051
);
10521052

10531053
// The message_sender is the transmitter for any component generating sphinx packets
@@ -1057,7 +1057,7 @@ where
10571057

10581058
let (message_sender, client_request_sender) = Self::start_mix_traffic_controller(
10591059
gateway_transceiver,
1060-
&shutdown_tracker.child_tracker(),
1060+
&shutdown_tracker.clone(),
10611061
EventSender(event_sender),
10621062
);
10631063

@@ -1088,7 +1088,7 @@ where
10881088
shared_lane_queue_lengths.clone(),
10891089
client_connection_rx,
10901090
stats_reporter.clone(),
1091-
&shutdown_tracker.child_tracker(),
1091+
&shutdown_tracker.clone(),
10921092
);
10931093

10941094
if !self
@@ -1104,7 +1104,7 @@ where
11041104
shared_topology_accessor.clone(),
11051105
message_sender,
11061106
stats_reporter.clone(),
1107-
&shutdown_tracker.child_tracker(),
1107+
&shutdown_tracker.clone(),
11081108
);
11091109
}
11101110

common/client-core/src/client/cover_traffic_stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,7 @@ impl LoopCoverTrafficStream<OsRng> {
205205
TrySendError::Full(_) => {
206206
// This isn't a problem, if the channel is full means we're already sending the
207207
// max amount of messages downstream can handle.
208-
tracing::debug!("Failed to send cover message - channel full");
208+
tracing::trace!("Failed to send cover message - channel full");
209209
}
210210
TrySendError::Closed(_) => {
211211
tracing::warn!("Failed to send cover message - channel closed");

common/client-core/src/client/mix_traffic/mod.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ pub mod transceiver;
2020

2121
// We remind ourselves that 32 x 32kb = 1024kb, a reasonable size for a network buffer.
2222
pub const MIX_MESSAGE_RECEIVER_BUFFER_SIZE: usize = 32;
23-
const MAX_FAILURE_COUNT: usize = 100;
23+
24+
/// Reduced from 100 to 20 to fail fast (~1-2 seconds instead of ~6 seconds).
25+
/// If we can't send 20 packets in a row, the gateway is unreachable.
26+
const MAX_FAILURE_COUNT: usize = 20;
2427

2528
// that's also disgusting.
2629
pub struct Empty;
@@ -84,7 +87,7 @@ impl MixTrafficController {
8487
self.client_tx.clone()
8588
}
8689

87-
pub fn mix_rx(&self) -> BatchMixMessageSender {
90+
pub fn mix_tx(&self) -> BatchMixMessageSender {
8891
self.mix_tx.clone()
8992
}
9093

@@ -156,6 +159,11 @@ impl MixTrafficController {
156159
// Do we need to handle the embedded mixnet client case
157160
// separately?
158161
self.event_tx.send(MixnetClientEvent::Traffic(MixTrafficEvent::FailedSendingSphinx));
162+
// IMO it shouldn't be signalled from there but it is how it is
163+
// TODO : report the failure upwards and shutdown from upwards
164+
// Gateway is dead, we have to shut down currently
165+
error!("Signalling shutdown from the MixTrafficController");
166+
self.shutdown_token.cancel();
159167
break;
160168
}
161169
}

common/client-core/src/client/real_messages_control/real_traffic_stream.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,8 @@ where
298298
"failed to send mixnet packet due to closed channel (outside of shutdown!)"
299299
);
300300
}
301+
// Early return to avoid further processing when channel is closed
302+
return;
301303
}
302304
Ok(_) => {
303305
let event = if fragment_id.is_some() {

common/task/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,6 @@ pub use crate::runtime_registry::RegistryAccessError;
2424

2525
/// Get or create a ShutdownTracker for SDK use.
2626
/// This provides automatic task management without requiring manual setup.
27-
pub fn get_sdk_shutdown_tracker() -> Result<ShutdownTracker, RegistryAccessError> {
28-
Ok(runtime_registry::RuntimeRegistry::get_or_create_sdk()?.shutdown_tracker_owned())
27+
pub fn create_sdk_shutdown_tracker() -> Result<ShutdownTracker, RegistryAccessError> {
28+
Ok(runtime_registry::RuntimeRegistry::create_sdk()?.shutdown_tracker_owned())
2929
}

common/task/src/runtime_registry.rs

Lines changed: 34 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,32 +19,47 @@ pub(crate) struct RuntimeRegistry {
1919
pub enum RegistryAccessError {
2020
#[error("the runtime registry is poisoned")]
2121
Poisoned,
22+
23+
#[error("The SDK ShutdownManager already exists")]
24+
ExistingShutdownManager,
25+
26+
#[error("No existing SDK ShutdownManager")]
27+
MissingShutdownManager,
2228
}
2329

2430
impl RuntimeRegistry {
25-
/// Get or create a ShutdownManager for SDK use.
31+
/// Create a ShutdownManager for SDK use.
2632
/// This manager doesn't listen to OS signals, making it suitable for library use.
27-
pub(crate) fn get_or_create_sdk() -> Result<Arc<ShutdownManager>, RegistryAccessError> {
28-
let guard = REGISTRY
29-
.sdk_manager
30-
.read()
31-
.map_err(|_| RegistryAccessError::Poisoned)?;
32-
if let Some(manager) = guard.as_ref() {
33-
return Ok(manager.clone());
34-
}
35-
drop(guard);
36-
33+
/// This function overwrite any existing manager!
34+
pub(crate) fn create_sdk() -> Result<Arc<ShutdownManager>, RegistryAccessError> {
3735
let mut guard = REGISTRY
3836
.sdk_manager
3937
.write()
4038
.map_err(|_| RegistryAccessError::Poisoned)?;
39+
4140
Ok(guard
42-
.get_or_insert_with(|| {
43-
Arc::new(ShutdownManager::new_without_signals().with_cancel_on_panic())
44-
})
41+
.insert(Arc::new(
42+
ShutdownManager::new_without_signals().with_cancel_on_panic(),
43+
))
4544
.clone())
4645
}
4746

47+
/// Get the ShutdownManager for SDK use.
48+
/// This manager doesn't listen to OS signals, making it suitable for library use.
49+
/// Not yet used, but maybe in the future
50+
#[allow(dead_code)]
51+
pub(crate) fn get_sdk() -> Result<Arc<ShutdownManager>, RegistryAccessError> {
52+
let guard = REGISTRY
53+
.sdk_manager
54+
.read()
55+
.map_err(|_| RegistryAccessError::Poisoned)?;
56+
if let Some(manager) = guard.as_ref() {
57+
Ok(manager.clone())
58+
} else {
59+
Err(RegistryAccessError::MissingShutdownManager)
60+
}
61+
}
62+
4863
/// Check if an SDK manager has been created.
4964
/// Useful for testing and debugging.
5065
#[allow(dead_code)]
@@ -85,10 +100,13 @@ mod tests {
85100

86101
assert!(!RuntimeRegistry::has_sdk_manager().unwrap());
87102

88-
let manager1 = RuntimeRegistry::get_or_create_sdk().unwrap();
103+
// Error if nothing was created
104+
assert!(RuntimeRegistry::get_sdk().is_err());
105+
106+
let manager1 = RuntimeRegistry::create_sdk().unwrap();
89107
assert!(RuntimeRegistry::has_sdk_manager().unwrap());
90108

91-
let manager2 = RuntimeRegistry::get_or_create_sdk().unwrap();
109+
let manager2 = RuntimeRegistry::get_sdk().unwrap();
92110
// Should return the same instance
93111
assert!(Arc::ptr_eq(&manager1, &manager2));
94112

sdk/rust/nym-sdk/src/mixnet/client.rs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -744,15 +744,11 @@ where
744744
base_builder = base_builder.with_topology_provider(topology_provider);
745745
}
746746

747-
// Use custom shutdown if provided, otherwise get from registry
748-
let shutdown_tracker = match self.custom_shutdown {
749-
Some(custom) => custom,
750-
None => {
751-
// Auto-create from registry for SDK use
752-
nym_task::get_sdk_shutdown_tracker()?
753-
}
754-
};
755-
base_builder = base_builder.with_shutdown(shutdown_tracker);
747+
// Use custom shutdown if provided, otherwise the sdk one will be used later down the line
748+
if let Some(shutdown_tracker) = self.custom_shutdown {
749+
base_builder = base_builder.with_shutdown(shutdown_tracker);
750+
}
751+
756752
if let Some(event_tx) = self.event_tx {
757753
base_builder = base_builder.with_event_tx(event_tx);
758754
}
@@ -817,7 +813,7 @@ where
817813
client_output,
818814
client_state.clone(),
819815
nym_address,
820-
started_client.shutdown_handle.child_tracker(),
816+
started_client.shutdown_handle.clone(),
821817
packet_type,
822818
);
823819

0 commit comments

Comments
 (0)