Skip to content

Commit 495398b

Browse files
neacsubenedettadavico
authored andcommitted
Introduce event backchannel (#6119)
* Introduce even backchannel * Rust fmt * Rename Event to MixnetClientEvent * Use unbounded_send for events * Remove unused file * Remove mut borrow * Event hierarchy and mixnet client intermediary * Export MixTrafficEvent in sdk
1 parent d87b315 commit 495398b

File tree

12 files changed

+177
-37
lines changed

12 files changed

+177
-37
lines changed

Cargo.lock

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/client-core/src/client/base_client/mod.rs

Lines changed: 66 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@ use super::statistics_control::StatisticsControl;
77
use crate::client::base_client::storage::helpers::store_client_keys;
88
use crate::client::base_client::storage::MixnetClientStorage;
99
use crate::client::cover_traffic_stream::LoopCoverTrafficStream;
10+
use crate::client::event_control::EventControl;
1011
use crate::client::inbound_messages::{InputMessage, InputMessageReceiver, InputMessageSender};
1112
use crate::client::key_manager::persistence::KeyStore;
1213
use crate::client::key_manager::ClientKeys;
1314
use crate::client::mix_traffic::transceiver::{GatewayReceiver, GatewayTransceiver, RemoteGateway};
14-
use crate::client::mix_traffic::{BatchMixMessageSender, MixTrafficController};
15+
use crate::client::mix_traffic::{BatchMixMessageSender, MixTrafficController, MixTrafficEvent};
1516
use crate::client::real_messages_control;
1617
use crate::client::real_messages_control::RealMessagesController;
1718
use crate::client::received_buffer::{
@@ -66,7 +67,6 @@ use std::path::Path;
6667
use std::sync::Arc;
6768
use time::OffsetDateTime;
6869
use tokio::sync::mpsc::Sender;
69-
use tracing::*;
7070
use url::Url;
7171

7272
#[cfg(target_arch = "wasm32")]
@@ -83,6 +83,23 @@ pub mod non_wasm_helpers;
8383
pub mod helpers;
8484
pub mod storage;
8585

86+
#[derive(Clone, Copy, Debug)]
87+
pub enum MixnetClientEvent {
88+
Traffic(MixTrafficEvent),
89+
}
90+
91+
pub type EventReceiver = mpsc::UnboundedReceiver<MixnetClientEvent>;
92+
#[derive(Clone)]
93+
pub struct EventSender(pub mpsc::UnboundedSender<MixnetClientEvent>);
94+
95+
impl EventSender {
96+
pub fn send(&self, event: MixnetClientEvent) {
97+
if let Err(err) = self.0.unbounded_send(event) {
98+
tracing::warn!("Failed to send error event. The caller event reader was closed: {err}");
99+
}
100+
}
101+
}
102+
86103
#[derive(Clone)]
87104
pub struct ClientInput {
88105
pub connection_command_sender: ConnectionCommandSender,
@@ -199,6 +216,7 @@ pub struct BaseClientBuilder<C, S: MixnetClientStorage> {
199216
custom_topology_provider: Option<Box<dyn TopologyProvider + Send + Sync>>,
200217
custom_gateway_transceiver: Option<Box<dyn GatewayTransceiver + Send>>,
201218
shutdown: Option<ShutdownTracker>,
219+
event_tx: Option<EventSender>,
202220
user_agent: Option<UserAgent>,
203221

204222
setup_method: GatewaySetup,
@@ -227,6 +245,7 @@ where
227245
custom_topology_provider: None,
228246
custom_gateway_transceiver: None,
229247
shutdown: None,
248+
event_tx: None,
230249
user_agent: None,
231250
setup_method: GatewaySetup::MustLoad { gateway_id: None },
232251
#[cfg(unix)]
@@ -289,6 +308,12 @@ where
289308
self
290309
}
291310

311+
#[must_use]
312+
pub fn with_event_tx(mut self, event_tx: EventSender) -> Self {
313+
self.event_tx = Some(event_tx);
314+
self
315+
}
316+
292317
#[must_use]
293318
pub fn with_user_agent(mut self, user_agent: UserAgent) -> Self {
294319
self.user_agent = Some(user_agent);
@@ -319,6 +344,18 @@ where
319344
details.client_address()
320345
}
321346

347+
fn start_event_control(
348+
parent_event_tx: Option<EventSender>,
349+
children_event_rx: EventReceiver,
350+
shutdown_tracker: &ShutdownTracker,
351+
) {
352+
let event_control = EventControl::new(parent_event_tx, children_event_rx);
353+
shutdown_tracker.try_spawn_named_with_shutdown(
354+
async move { event_control.run().await },
355+
"EventControl",
356+
);
357+
}
358+
322359
// future constantly pumping loop cover traffic at some specified average rate
323360
// the pumped traffic goes to the MixTrafficController
324361
fn start_cover_traffic_stream(
@@ -330,7 +367,7 @@ where
330367
stats_tx: ClientStatsSender,
331368
shutdown_tracker: &ShutdownTracker,
332369
) {
333-
info!("Starting loop cover traffic stream...");
370+
tracing::info!("Starting loop cover traffic stream...");
334371

335372
let mut stream = LoopCoverTrafficStream::new(
336373
ack_key,
@@ -362,7 +399,7 @@ where
362399
stats_tx: ClientStatsSender,
363400
shutdown_tracker: &ShutdownTracker,
364401
) {
365-
info!("Starting real traffic stream...");
402+
tracing::info!("Starting real traffic stream...");
366403

367404
let real_messages_controller = RealMessagesController::new(
368405
controller_config,
@@ -447,7 +484,7 @@ where
447484
metrics_reporter: ClientStatsSender,
448485
shutdown_tracker: &ShutdownTracker,
449486
) {
450-
info!("Starting received messages buffer controller...");
487+
tracing::info!("Starting received messages buffer controller...");
451488
let controller = ReceivedMessagesBufferController::<SphinxMessageReceiver>::new(
452489
local_encryption_keypair,
453490
query_receiver,
@@ -558,7 +595,7 @@ where
558595
details_store
559596
.upgrade_stored_remote_gateway_key(gateway_client.gateway_identity(), &updated_key)
560597
.await.map_err(|err| {
561-
error!("failed to store upgraded gateway key! this connection might be forever broken now: {err}");
598+
tracing::error!("failed to store upgraded gateway key! this connection might be forever broken now: {err}");
562599
ClientCoreError::GatewaysDetailsStoreError { source: Box::new(err) }
563600
})?
564601
}
@@ -655,7 +692,7 @@ where
655692

656693
if topology_config.disable_refreshing {
657694
// if we're not spawning the refresher, don't cause shutdown immediately
658-
info!("The background topology refresher is not going to be started");
695+
tracing::info!("The background topology refresher is not going to be started");
659696
}
660697

661698
let mut topology_refresher = TopologyRefresher::new(
@@ -665,7 +702,7 @@ where
665702
);
666703
// before returning, block entire runtime to refresh the current network view so that any
667704
// components depending on topology would see a non-empty view
668-
info!("Obtaining initial network topology");
705+
tracing::info!("Obtaining initial network topology");
669706
topology_refresher.try_refresh().await;
670707

671708
if let Err(err) = topology_refresher.ensure_topology_is_routable().await {
@@ -691,21 +728,21 @@ where
691728
.wait_for_gateway(local_gateway, waiting_timeout)
692729
.await
693730
{
694-
error!(
731+
tracing::error!(
695732
"the gateway did not come back online within the specified timeout: {err}"
696733
);
697734
return Err(err.into());
698735
}
699736
} else {
700-
error!("the gateway we're supposedly connected to does not exist. We'll not be able to send any packets to ourselves: {err}");
737+
tracing::error!("the gateway we're supposedly connected to does not exist. We'll not be able to send any packets to ourselves: {err}");
701738
return Err(err.into());
702739
}
703740
}
704741

705742
if !topology_config.disable_refreshing {
706743
// don't spawn the refresher if we don't want to be refreshing the topology.
707744
// only use the initial values obtained
708-
info!("Starting topology refresher...");
745+
tracing::info!("Starting topology refresher...");
709746
shutdown_tracker.try_spawn_named_with_shutdown(
710747
async move { topology_refresher.run().await },
711748
"TopologyRefresher",
@@ -722,7 +759,7 @@ where
722759
input_sender: Sender<InputMessage>,
723760
shutdown_tracker: &ShutdownTracker,
724761
) -> ClientStatsSender {
725-
info!("Starting statistics control...");
762+
tracing::info!("Starting statistics control...");
726763
StatisticsControl::create_and_start(
727764
config.debug.stats_reporting,
728765
user_agent
@@ -737,10 +774,14 @@ where
737774
fn start_mix_traffic_controller(
738775
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
739776
shutdown_tracker: &ShutdownTracker,
777+
event_tx: EventSender,
740778
) -> (BatchMixMessageSender, ClientRequestSender) {
741-
info!("Starting mix traffic controller...");
742-
let mut mix_traffic_controller =
743-
MixTrafficController::new(gateway_transceiver, shutdown_tracker.clone_shutdown_token());
779+
tracing::info!("Starting mix traffic controller...");
780+
let mut mix_traffic_controller = MixTrafficController::new(
781+
gateway_transceiver,
782+
shutdown_tracker.clone_shutdown_token(),
783+
event_tx,
784+
);
744785

745786
let mix_tx = mix_traffic_controller.mix_rx();
746787
let client_tx = mix_traffic_controller.client_tx();
@@ -807,7 +848,7 @@ where
807848
{
808849
// if client keys do not exist already, create and persist them
809850
if key_store.load_keys().await.is_err() {
810-
info!("could not find valid client keys - a new set will be generated");
851+
tracing::info!("could not find valid client keys - a new set will be generated");
811852
let mut rng = OsRng;
812853
let keys = if let Some(derivation_material) = derivation_material {
813854
ClientKeys::from_master_key(&mut rng, &derivation_material)
@@ -854,7 +895,7 @@ where
854895
<S::CredentialStore as CredentialStorage>::StorageError: Send + Sync + 'static,
855896
<S::GatewaysDetailsStore as GatewaysDetailsStore>::StorageError: Sync + Send,
856897
{
857-
info!("Starting nym client");
898+
tracing::info!("Starting nym client");
858899
#[cfg(debug_assertions)]
859900
#[cfg(target_arch = "wasm32")]
860901
{
@@ -888,6 +929,9 @@ where
888929
// channels responsible for controlling real messages
889930
let (input_sender, input_receiver) = tokio::sync::mpsc::channel::<InputMessage>(1);
890931

932+
// channels responsible for event management
933+
let (event_sender, event_receiver) = mpsc::unbounded();
934+
891935
// channels responsible for controlling ack messages
892936
let (ack_sender, ack_receiver) = mpsc::unbounded();
893937
let shared_topology_accessor =
@@ -900,6 +944,8 @@ where
900944
None => nym_task::get_sdk_shutdown_tracker()?,
901945
};
902946

947+
Self::start_event_control(self.event_tx, event_receiver, &shutdown_tracker);
948+
903949
// channels responsible for dealing with reply-related fun
904950
let (reply_controller_sender, reply_controller_receiver) =
905951
reply_controller::requests::new_control_channels();
@@ -990,6 +1036,7 @@ where
9901036
let (message_sender, client_request_sender) = Self::start_mix_traffic_controller(
9911037
gateway_transceiver,
9921038
&shutdown_tracker.child_tracker(),
1039+
EventSender(event_sender),
9931040
);
9941041

9951042
// Channels that the websocket listener can use to signal downstream to the real traffic
@@ -1039,8 +1086,8 @@ where
10391086
);
10401087
}
10411088

1042-
debug!("Core client startup finished!");
1043-
debug!("The address of this client is: {self_address}");
1089+
tracing::debug!("Core client startup finished!");
1090+
tracing::debug!("The address of this client is: {self_address}");
10441091

10451092
#[cfg(debug_assertions)]
10461093
#[cfg(target_arch = "wasm32")]
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright 2025 - Nym Technologies SA <[email protected]>
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use futures::StreamExt;
5+
6+
use crate::client::base_client::{EventReceiver, EventSender, MixnetClientEvent};
7+
8+
/// Launches and manages task events, propagating upwards what is not strictly internal.
9+
pub(crate) struct EventControl {
10+
parent_event_tx: Option<EventSender>,
11+
children_event_rx: EventReceiver,
12+
}
13+
14+
impl EventControl {
15+
pub(crate) fn new(
16+
parent_event_tx: Option<EventSender>,
17+
children_event_rx: EventReceiver,
18+
) -> Self {
19+
EventControl {
20+
parent_event_tx,
21+
children_event_rx,
22+
}
23+
}
24+
25+
fn is_internal(event: MixnetClientEvent) -> bool {
26+
match event {
27+
MixnetClientEvent::Traffic(_) => false,
28+
}
29+
}
30+
31+
pub(crate) async fn run(mut self) {
32+
while let Some(event) = self.children_event_rx.next().await {
33+
if let Some(parent_event_tx) = &self.parent_event_tx {
34+
if !Self::is_internal(event) {
35+
parent_event_tx.send(event);
36+
}
37+
}
38+
}
39+
}
40+
}

common/client-core/src/client/mix_traffic/mod.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
// Copyright 2021 - Nym Technologies SA <[email protected]>
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use crate::client::mix_traffic::transceiver::GatewayTransceiver;
4+
use crate::client::{
5+
base_client::{EventSender, MixnetClientEvent},
6+
mix_traffic::transceiver::GatewayTransceiver,
7+
};
58
use nym_gateway_requests::ClientRequest;
69
use nym_sphinx::forwarding::packet::MixPacket;
710
use nym_task::ShutdownToken;
@@ -22,6 +25,11 @@ const MAX_FAILURE_COUNT: usize = 100;
2225
// that's also disgusting.
2326
pub struct Empty;
2427

28+
#[derive(Clone, Copy, Debug)]
29+
pub enum MixTrafficEvent {
30+
FailedSendingSphinx,
31+
}
32+
2533
pub struct MixTrafficController {
2634
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
2735

@@ -35,10 +43,15 @@ pub struct MixTrafficController {
3543
consecutive_gateway_failure_count: usize,
3644

3745
shutdown_token: ShutdownToken,
46+
event_tx: EventSender,
3847
}
3948

4049
impl MixTrafficController {
41-
pub fn new<T>(gateway_transceiver: T, shutdown_token: ShutdownToken) -> MixTrafficController
50+
pub fn new<T>(
51+
gateway_transceiver: T,
52+
shutdown_token: ShutdownToken,
53+
event_tx: EventSender,
54+
) -> MixTrafficController
4255
where
4356
T: GatewayTransceiver + Send + 'static,
4457
{
@@ -55,14 +68,16 @@ impl MixTrafficController {
5568
client_tx: client_sender,
5669
consecutive_gateway_failure_count: 0,
5770
shutdown_token,
71+
event_tx,
5872
}
5973
}
6074

6175
pub fn new_dynamic(
6276
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
6377
shutdown_token: ShutdownToken,
78+
event_tx: EventSender,
6479
) -> MixTrafficController {
65-
Self::new(gateway_transceiver, shutdown_token)
80+
Self::new(gateway_transceiver, shutdown_token, event_tx)
6681
}
6782

6883
pub fn client_tx(&self) -> ClientRequestSender {
@@ -140,7 +155,7 @@ impl MixTrafficController {
140155
error!("Failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead");
141156
// Do we need to handle the embedded mixnet client case
142157
// separately?
143-
self.shutdown_token.cancel();
158+
self.event_tx.send(MixnetClientEvent::Traffic(MixTrafficEvent::FailedSendingSphinx));
144159
break;
145160
}
146161
}

common/client-core/src/client/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
pub mod base_client;
55
pub mod cover_traffic_stream;
6+
pub(crate) mod event_control;
67
pub(crate) mod helpers;
78
pub mod inbound_messages;
89
pub mod key_manager;

0 commit comments

Comments
 (0)