@@ -274,19 +274,24 @@ where
274
274
peers
275
275
}
276
276
277
+ fn replicate_message ( & mut self , waker : & Waker , message : & DaMessage ) {
278
+ let message_id = ( message. blob . blob_id . to_vec ( ) , message. subnetwork_id ) ;
279
+ if self . seen_message_cache . contains ( & message_id) {
280
+ return ;
281
+ }
282
+ self . seen_message_cache . insert ( message_id) ;
283
+ self . send_message_impl ( Some ( waker) , message) ;
284
+ }
285
+
277
286
/// Initiate sending a replication message **from outside the behaviour**
278
287
pub fn send_message ( & mut self , message : & DaMessage ) {
288
+ let message_id = ( message. blob . blob_id . to_vec ( ) , message. subnetwork_id ) ;
289
+ self . seen_message_cache . insert ( message_id) ;
279
290
let waker = self . waker . take ( ) ;
280
291
self . send_message_impl ( waker. as_ref ( ) , message) ;
281
292
}
282
293
283
294
fn send_message_impl ( & mut self , waker : Option < & Waker > , message : & DaMessage ) {
284
- let message_id = ( message. blob . blob_id . to_vec ( ) , message. subnetwork_id ) ;
285
- if self . seen_message_cache . contains ( & message_id) {
286
- return ;
287
- }
288
- self . seen_message_cache . insert ( message_id) ;
289
-
290
295
// Push a message in the queue for every single peer connected that is a member
291
296
// of the selected subnetwork_id
292
297
let peers = self . no_loopback_member_peers_of ( message. subnetwork_id ) ;
@@ -365,7 +370,7 @@ where
365
370
Poll :: Ready ( Some ( Ok ( ( peer_id, message, read_half) ) ) ) => {
366
371
// Replicate the message to all connected peers from the same subnet if we
367
372
// haven't seen it yet
368
- self . send_message_impl ( Some ( cx. waker ( ) ) , & message) ;
373
+ self . replicate_message ( cx. waker ( ) , & message) ;
369
374
// Schedule waiting for any next incoming message on the same stream's read half
370
375
self . incoming_tasks
371
376
. push ( Self :: try_read_message ( peer_id, read_half) . boxed ( ) ) ;
0 commit comments