@@ -25,8 +25,10 @@ pub struct Empty;
2525pub struct MixTrafficController {
2626 gateway_transceiver : Box < dyn GatewayTransceiver + Send > ,
2727
28+ mix_tx : BatchMixMessageSender ,
2829 mix_rx : BatchMixMessageReceiver ,
2930 client_rx : ClientRequestReceiver ,
31+ client_tx : ClientRequestSender ,
3032
3133 // TODO: this is temporary work-around.
3234 // in long run `gateway_client` will be moved away from `MixTrafficController` anyway.
@@ -36,14 +38,7 @@ pub struct MixTrafficController {
3638}
3739
3840impl MixTrafficController {
39- pub fn new < T > (
40- gateway_transceiver : T ,
41- shutdown_token : ShutdownToken ,
42- ) -> (
43- MixTrafficController ,
44- BatchMixMessageSender ,
45- ClientRequestSender ,
46- )
41+ pub fn new < T > ( gateway_transceiver : T , shutdown_token : ShutdownToken ) -> MixTrafficController
4742 where
4843 T : GatewayTransceiver + Send + ' static ,
4944 {
@@ -52,41 +47,30 @@ impl MixTrafficController {
5247
5348 let ( client_sender, client_receiver) = tokio:: sync:: mpsc:: channel ( 8 ) ;
5449
55- (
56- MixTrafficController {
57- gateway_transceiver : Box :: new ( gateway_transceiver) ,
58- mix_rx : message_receiver,
59- client_rx : client_receiver,
60- consecutive_gateway_failure_count : 0 ,
61- shutdown_token,
62- } ,
63- message_sender,
64- client_sender,
65- )
50+ MixTrafficController {
51+ gateway_transceiver : Box :: new ( gateway_transceiver) ,
52+ mix_tx : message_sender,
53+ mix_rx : message_receiver,
54+ client_rx : client_receiver,
55+ client_tx : client_sender,
56+ consecutive_gateway_failure_count : 0 ,
57+ shutdown_token,
58+ }
6659 }
6760
6861 pub fn new_dynamic (
6962 gateway_transceiver : Box < dyn GatewayTransceiver + Send > ,
7063 shutdown_token : ShutdownToken ,
71- ) -> (
72- MixTrafficController ,
73- BatchMixMessageSender ,
74- ClientRequestSender ,
75- ) {
76- let ( message_sender, message_receiver) =
77- tokio:: sync:: mpsc:: channel ( MIX_MESSAGE_RECEIVER_BUFFER_SIZE ) ;
78- let ( client_sender, client_receiver) = tokio:: sync:: mpsc:: channel ( 8 ) ;
79- (
80- MixTrafficController {
81- gateway_transceiver,
82- mix_rx : message_receiver,
83- client_rx : client_receiver,
84- consecutive_gateway_failure_count : 0 ,
85- shutdown_token,
86- } ,
87- message_sender,
88- client_sender,
89- )
64+ ) -> MixTrafficController {
65+ Self :: new ( gateway_transceiver, shutdown_token)
66+ }
67+
68+ pub fn client_tx ( & self ) -> ClientRequestSender {
69+ self . client_tx . clone ( )
70+ }
71+
72+ pub fn mix_rx ( & self ) -> BatchMixMessageSender {
73+ self . mix_tx . clone ( )
9074 }
9175
9276 async fn on_messages (
@@ -145,34 +129,26 @@ impl MixTrafficController {
145129 trace!( "MixTrafficController: Received shutdown" ) ;
146130 break ;
147131 }
148- mix_packets = self . mix_rx. recv( ) => match mix_packets {
149- Some ( mix_packets) => {
150- if let Err ( err) = self . on_messages( mix_packets) . await {
151- error!( "Failed to send sphinx packet(s) to the gateway: {err}" ) ;
152- if self . consecutive_gateway_failure_count == MAX_FAILURE_COUNT {
153- // Disconnect from the gateway. If we should try to re-connect
154- // is handled at a higher layer.
155- error!( "Failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead" ) ;
156- // Do we need to handle the embedded mixnet client case
157- // separately?
158- break ;
159- }
132+ // mix_rx should never error out as we're holding one instance of the sender
133+
134+ Some ( mix_packets) = self . mix_rx. recv( ) => {
135+ if let Err ( err) = self . on_messages( mix_packets) . await {
136+ error!( "Failed to send sphinx packet(s) to the gateway: {err}" ) ;
137+ if self . consecutive_gateway_failure_count == MAX_FAILURE_COUNT {
138+ // Disconnect from the gateway. If we should try to re-connect
139+ // is handled at a higher layer.
140+ error!( "Failed to send sphinx packet to the gateway {MAX_FAILURE_COUNT} times in a row - assuming the gateway is dead" ) ;
141+ // Do we need to handle the embedded mixnet client case
142+ // separately?
143+ self . shutdown_token. cancel( ) ;
144+ break ;
160145 }
161- } ,
162- None => {
163- trace!( "MixTrafficController: Stopping since channel closed" ) ;
164- break ;
165- }
166- } ,
167- client_request = self . client_rx. recv( ) => match client_request {
168- Some ( client_request) => {
169- self . on_client_request( client_request) . await ;
170- } ,
171- None => {
172- trace!( "MixTrafficController, client request channel closed" ) ;
173- break
174146 }
175147 } ,
148+ // client_rx should never error out as we're holding one instance of the sender
149+ Some ( client_request) = self . client_rx. recv( ) => {
150+ self . on_client_request( client_request) . await ;
151+ }
176152 }
177153 }
178154 debug ! ( "MixTrafficController: Exiting" ) ;
0 commit comments