Skip to content

Commit 30ccf6b

Browse files
neacsujstuczyn
authored andcommitted
Introduce event backchannel (#6119)
* Introduce even backchannel * Rust fmt * Rename Event to MixnetClientEvent * Use unbounded_send for events * Remove unused file * Remove mut borrow * Event hierarchy and mixnet client intermediary * Export MixTrafficEvent in sdk
1 parent 591a3e2 commit 30ccf6b

File tree

11 files changed

+177
-36
lines changed

11 files changed

+177
-36
lines changed

Cargo.lock

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/client-core/src/client/base_client/mod.rs

Lines changed: 66 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@ use super::statistics_control::StatisticsControl;
77
use crate::client::base_client::storage::helpers::store_client_keys;
88
use crate::client::base_client::storage::MixnetClientStorage;
99
use crate::client::cover_traffic_stream::LoopCoverTrafficStream;
10+
use crate::client::event_control::EventControl;
1011
use crate::client::inbound_messages::{InputMessage, InputMessageReceiver, InputMessageSender};
1112
use crate::client::key_manager::persistence::KeyStore;
1213
use crate::client::key_manager::ClientKeys;
1314
use crate::client::mix_traffic::transceiver::{GatewayReceiver, GatewayTransceiver, RemoteGateway};
14-
use crate::client::mix_traffic::{BatchMixMessageSender, MixTrafficController};
15+
use crate::client::mix_traffic::{BatchMixMessageSender, MixTrafficController, MixTrafficEvent};
1516
use crate::client::real_messages_control;
1617
use crate::client::real_messages_control::RealMessagesController;
1718
use crate::client::received_buffer::{
@@ -66,7 +67,6 @@ use std::path::Path;
6667
use std::sync::Arc;
6768
use time::OffsetDateTime;
6869
use tokio::sync::mpsc::Sender;
69-
use tracing::*;
7070
use url::Url;
7171

7272
#[cfg(target_arch = "wasm32")]
@@ -83,6 +83,23 @@ pub mod non_wasm_helpers;
8383
pub mod helpers;
8484
pub mod storage;
8585

86+
#[derive(Clone, Copy, Debug)]
87+
pub enum MixnetClientEvent {
88+
Traffic(MixTrafficEvent),
89+
}
90+
91+
pub type EventReceiver = mpsc::UnboundedReceiver<MixnetClientEvent>;
92+
#[derive(Clone)]
93+
pub struct EventSender(pub mpsc::UnboundedSender<MixnetClientEvent>);
94+
95+
impl EventSender {
96+
pub fn send(&self, event: MixnetClientEvent) {
97+
if let Err(err) = self.0.unbounded_send(event) {
98+
tracing::warn!("Failed to send error event. The caller event reader was closed: {err}");
99+
}
100+
}
101+
}
102+
86103
#[derive(Clone)]
87104
pub struct ClientInput {
88105
pub connection_command_sender: ConnectionCommandSender,
@@ -199,6 +216,7 @@ pub struct BaseClientBuilder<C, S: MixnetClientStorage> {
199216
custom_topology_provider: Option<Box<dyn TopologyProvider + Send + Sync>>,
200217
custom_gateway_transceiver: Option<Box<dyn GatewayTransceiver + Send>>,
201218
shutdown: Option<ShutdownTracker>,
219+
event_tx: Option<EventSender>,
202220
user_agent: Option<UserAgent>,
203221
custom_nym_api_client: Option<nym_http_api_client::Client>,
204222

@@ -228,6 +246,7 @@ where
228246
custom_topology_provider: None,
229247
custom_gateway_transceiver: None,
230248
shutdown: None,
249+
event_tx: None,
231250
user_agent: None,
232251
custom_nym_api_client: None,
233252
setup_method: GatewaySetup::MustLoad { gateway_id: None },
@@ -297,6 +316,12 @@ where
297316
self
298317
}
299318

319+
#[must_use]
320+
pub fn with_event_tx(mut self, event_tx: EventSender) -> Self {
321+
self.event_tx = Some(event_tx);
322+
self
323+
}
324+
300325
#[must_use]
301326
pub fn with_user_agent(mut self, user_agent: UserAgent) -> Self {
302327
self.user_agent = Some(user_agent);
@@ -327,6 +352,18 @@ where
327352
details.client_address()
328353
}
329354

355+
fn start_event_control(
356+
parent_event_tx: Option<EventSender>,
357+
children_event_rx: EventReceiver,
358+
shutdown_tracker: &ShutdownTracker,
359+
) {
360+
let event_control = EventControl::new(parent_event_tx, children_event_rx);
361+
shutdown_tracker.try_spawn_named_with_shutdown(
362+
async move { event_control.run().await },
363+
"EventControl",
364+
);
365+
}
366+
330367
// future constantly pumping loop cover traffic at some specified average rate
331368
// the pumped traffic goes to the MixTrafficController
332369
fn start_cover_traffic_stream(
@@ -338,7 +375,7 @@ where
338375
stats_tx: ClientStatsSender,
339376
shutdown_tracker: &ShutdownTracker,
340377
) {
341-
info!("Starting loop cover traffic stream...");
378+
tracing::info!("Starting loop cover traffic stream...");
342379

343380
let mut stream = LoopCoverTrafficStream::new(
344381
ack_key,
@@ -370,7 +407,7 @@ where
370407
stats_tx: ClientStatsSender,
371408
shutdown_tracker: &ShutdownTracker,
372409
) {
373-
info!("Starting real traffic stream...");
410+
tracing::info!("Starting real traffic stream...");
374411

375412
let real_messages_controller = RealMessagesController::new(
376413
controller_config,
@@ -455,7 +492,7 @@ where
455492
metrics_reporter: ClientStatsSender,
456493
shutdown_tracker: &ShutdownTracker,
457494
) {
458-
info!("Starting received messages buffer controller...");
495+
tracing::info!("Starting received messages buffer controller...");
459496
let controller = ReceivedMessagesBufferController::<SphinxMessageReceiver>::new(
460497
local_encryption_keypair,
461498
query_receiver,
@@ -566,7 +603,7 @@ where
566603
details_store
567604
.upgrade_stored_remote_gateway_key(gateway_client.gateway_identity(), &updated_key)
568605
.await.map_err(|err| {
569-
error!("failed to store upgraded gateway key! this connection might be forever broken now: {err}");
606+
tracing::error!("failed to store upgraded gateway key! this connection might be forever broken now: {err}");
570607
ClientCoreError::GatewaysDetailsStoreError { source: Box::new(err) }
571608
})?
572609
}
@@ -663,7 +700,7 @@ where
663700

664701
if topology_config.disable_refreshing {
665702
// if we're not spawning the refresher, don't cause shutdown immediately
666-
info!("The background topology refresher is not going to be started");
703+
tracing::info!("The background topology refresher is not going to be started");
667704
}
668705

669706
let mut topology_refresher = TopologyRefresher::new(
@@ -673,7 +710,7 @@ where
673710
);
674711
// before returning, block entire runtime to refresh the current network view so that any
675712
// components depending on topology would see a non-empty view
676-
info!("Obtaining initial network topology");
713+
tracing::info!("Obtaining initial network topology");
677714
topology_refresher.try_refresh().await;
678715

679716
if let Err(err) = topology_refresher.ensure_topology_is_routable().await {
@@ -699,21 +736,21 @@ where
699736
.wait_for_gateway(local_gateway, waiting_timeout)
700737
.await
701738
{
702-
error!(
739+
tracing::error!(
703740
"the gateway did not come back online within the specified timeout: {err}"
704741
);
705742
return Err(err.into());
706743
}
707744
} else {
708-
error!("the gateway we're supposedly connected to does not exist. We'll not be able to send any packets to ourselves: {err}");
745+
tracing::error!("the gateway we're supposedly connected to does not exist. We'll not be able to send any packets to ourselves: {err}");
709746
return Err(err.into());
710747
}
711748
}
712749

713750
if !topology_config.disable_refreshing {
714751
// don't spawn the refresher if we don't want to be refreshing the topology.
715752
// only use the initial values obtained
716-
info!("Starting topology refresher...");
753+
tracing::info!("Starting topology refresher...");
717754
shutdown_tracker.try_spawn_named_with_shutdown(
718755
async move { topology_refresher.run().await },
719756
"TopologyRefresher",
@@ -730,7 +767,7 @@ where
730767
input_sender: Sender<InputMessage>,
731768
shutdown_tracker: &ShutdownTracker,
732769
) -> ClientStatsSender {
733-
info!("Starting statistics control...");
770+
tracing::info!("Starting statistics control...");
734771
StatisticsControl::create_and_start(
735772
config.debug.stats_reporting,
736773
user_agent
@@ -745,10 +782,14 @@ where
745782
fn start_mix_traffic_controller(
746783
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
747784
shutdown_tracker: &ShutdownTracker,
785+
event_tx: EventSender,
748786
) -> (BatchMixMessageSender, ClientRequestSender) {
749-
info!("Starting mix traffic controller...");
750-
let mut mix_traffic_controller =
751-
MixTrafficController::new(gateway_transceiver, shutdown_tracker.clone_shutdown_token());
787+
tracing::info!("Starting mix traffic controller...");
788+
let mut mix_traffic_controller = MixTrafficController::new(
789+
gateway_transceiver,
790+
shutdown_tracker.clone_shutdown_token(),
791+
event_tx,
792+
);
752793

753794
let mix_tx = mix_traffic_controller.mix_rx();
754795
let client_tx = mix_traffic_controller.client_tx();
@@ -815,7 +856,7 @@ where
815856
{
816857
// if client keys do not exist already, create and persist them
817858
if key_store.load_keys().await.is_err() {
818-
info!("could not find valid client keys - a new set will be generated");
859+
tracing::info!("could not find valid client keys - a new set will be generated");
819860
let mut rng = OsRng;
820861
let keys = if let Some(derivation_material) = derivation_material {
821862
ClientKeys::from_master_key(&mut rng, &derivation_material)
@@ -872,7 +913,7 @@ where
872913
<S::CredentialStore as CredentialStorage>::StorageError: Send + Sync + 'static,
873914
<S::GatewaysDetailsStore as GatewaysDetailsStore>::StorageError: Sync + Send,
874915
{
875-
info!("Starting nym client");
916+
tracing::info!("Starting nym client");
876917
#[cfg(debug_assertions)]
877918
#[cfg(target_arch = "wasm32")]
878919
{
@@ -906,6 +947,9 @@ where
906947
// channels responsible for controlling real messages
907948
let (input_sender, input_receiver) = tokio::sync::mpsc::channel::<InputMessage>(1);
908949

950+
// channels responsible for event management
951+
let (event_sender, event_receiver) = mpsc::unbounded();
952+
909953
// channels responsible for controlling ack messages
910954
let (ack_sender, ack_receiver) = mpsc::unbounded();
911955
let shared_topology_accessor =
@@ -918,6 +962,8 @@ where
918962
None => nym_task::get_sdk_shutdown_tracker()?,
919963
};
920964

965+
Self::start_event_control(self.event_tx, event_receiver, &shutdown_tracker);
966+
921967
// channels responsible for dealing with reply-related fun
922968
let (reply_controller_sender, reply_controller_receiver) =
923969
reply_controller::requests::new_control_channels();
@@ -1012,6 +1058,7 @@ where
10121058
let (message_sender, client_request_sender) = Self::start_mix_traffic_controller(
10131059
gateway_transceiver,
10141060
&shutdown_tracker.child_tracker(),
1061+
EventSender(event_sender),
10151062
);
10161063

10171064
// Channels that the websocket listener can use to signal downstream to the real traffic
@@ -1061,8 +1108,8 @@ where
10611108
);
10621109
}
10631110

1064-
debug!("Core client startup finished!");
1065-
debug!("The address of this client is: {self_address}");
1111+
tracing::debug!("Core client startup finished!");
1112+
tracing::debug!("The address of this client is: {self_address}");
10661113

10671114
#[cfg(debug_assertions)]
10681115
#[cfg(target_arch = "wasm32")]
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright 2025 - Nym Technologies SA <[email protected]>
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use futures::StreamExt;
5+
6+
use crate::client::base_client::{EventReceiver, EventSender, MixnetClientEvent};
7+
8+
/// Launches and manages task events, propagating upwards what is not strictly internal.
9+
pub(crate) struct EventControl {
10+
parent_event_tx: Option<EventSender>,
11+
children_event_rx: EventReceiver,
12+
}
13+
14+
impl EventControl {
15+
pub(crate) fn new(
16+
parent_event_tx: Option<EventSender>,
17+
children_event_rx: EventReceiver,
18+
) -> Self {
19+
EventControl {
20+
parent_event_tx,
21+
children_event_rx,
22+
}
23+
}
24+
25+
fn is_internal(event: MixnetClientEvent) -> bool {
26+
match event {
27+
MixnetClientEvent::Traffic(_) => false,
28+
}
29+
}
30+
31+
pub(crate) async fn run(mut self) {
32+
while let Some(event) = self.children_event_rx.next().await {
33+
if let Some(parent_event_tx) = &self.parent_event_tx {
34+
if !Self::is_internal(event) {
35+
parent_event_tx.send(event);
36+
}
37+
}
38+
}
39+
}
40+
}

common/client-core/src/client/mix_traffic/mod.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
// Copyright 2021 - Nym Technologies SA <[email protected]>
22
// SPDX-License-Identifier: Apache-2.0
33

4-
use crate::client::mix_traffic::transceiver::GatewayTransceiver;
4+
use crate::client::{
5+
base_client::{EventSender, MixnetClientEvent},
6+
mix_traffic::transceiver::GatewayTransceiver,
7+
};
58
use nym_gateway_requests::ClientRequest;
69
use nym_sphinx::forwarding::packet::MixPacket;
710
use nym_task::ShutdownToken;
@@ -22,6 +25,11 @@ const MAX_FAILURE_COUNT: usize = 100;
2225
// that's also disgusting.
2326
pub struct Empty;
2427

28+
#[derive(Clone, Copy, Debug)]
29+
pub enum MixTrafficEvent {
30+
FailedSendingSphinx,
31+
}
32+
2533
pub struct MixTrafficController {
2634
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
2735

@@ -35,10 +43,15 @@ pub struct MixTrafficController {
3543
consecutive_gateway_failure_count: usize,
3644

3745
shutdown_token: ShutdownToken,
46+
event_tx: EventSender,
3847
}
3948

4049
impl MixTrafficController {
41-
pub fn new<T>(gateway_transceiver: T, shutdown_token: ShutdownToken) -> MixTrafficController
50+
pub fn new<T>(
51+
gateway_transceiver: T,
52+
shutdown_token: ShutdownToken,
53+
event_tx: EventSender,
54+
) -> MixTrafficController
4255
where
4356
T: GatewayTransceiver + Send + 'static,
4457
{
@@ -55,14 +68,16 @@ impl MixTrafficController {
5568
client_tx: client_sender,
5669
consecutive_gateway_failure_count: 0,
5770
shutdown_token,
71+
event_tx,
5872
}
5973
}
6074

6175
pub fn new_dynamic(
6276
gateway_transceiver: Box<dyn GatewayTransceiver + Send>,
6377
shutdown_token: ShutdownToken,
78+
event_tx: EventSender,
6479
) -> MixTrafficController {
65-
Self::new(gateway_transceiver, shutdown_token)
80+
Self::new(gateway_transceiver, shutdown_token, event_tx)
6681
}
6782

6883
pub fn client_tx(&self) -> ClientRequestSender {
@@ -140,7 +155,7 @@ impl MixTrafficController {
140155
error!("Failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead");
141156
// Do we need to handle the embedded mixnet client case
142157
// separately?
143-
self.shutdown_token.cancel();
158+
self.event_tx.send(MixnetClientEvent::Traffic(MixTrafficEvent::FailedSendingSphinx));
144159
break;
145160
}
146161
}

common/client-core/src/client/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
pub mod base_client;
55
pub mod cover_traffic_stream;
6+
pub(crate) mod event_control;
67
pub(crate) mod helpers;
78
pub mod inbound_messages;
89
pub mod key_manager;

0 commit comments

Comments
 (0)