Skip to content

Commit 6dc095f

Browse files
committed
feat(da/replication): cache replication messages regardless of source
1 parent 38e81fa commit 6dc095f

File tree

1 file changed

+21
-10
lines changed

1 file changed

+21
-10
lines changed

nomos-da/network/core/src/protocols/replication/behaviour.rs

+21-10
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,10 @@ impl PendingOutbound {
171171
self.last_scheduled = next;
172172
next
173173
}
174+
175+
fn is_empty(&self) -> bool {
176+
self.messages.is_empty()
177+
}
174178
}
175179

176180
/// Nomos DA broadcast network behaviour.
@@ -274,22 +278,22 @@ where
274278
peers
275279
}
276280

277-
fn replicate_message(&mut self, waker: &Waker, message: &DaMessage) {
278-
let message_id = (message.blob.blob_id.to_vec(), message.subnetwork_id);
279-
if self.seen_message_cache.contains(&message_id) {
280-
return;
281-
}
282-
self.seen_message_cache.insert(message_id);
283-
self.send_message_impl(Some(waker), message);
284-
}
285-
286281
/// Initiate sending a replication message **from outside the behaviour**
287282
pub fn send_message(&mut self, message: &DaMessage) {
288283
let waker = self.waker.take();
289284
self.send_message_impl(waker.as_ref(), message);
290285
}
291286

287+
/// Send a replication message to all connected peers that are members of
288+
/// the same subnetwork. If the message has already been seen, it is not
289+
/// sent again.
292290
fn send_message_impl(&mut self, waker: Option<&Waker>, message: &DaMessage) {
291+
let message_id = (message.blob.blob_id.to_vec(), message.subnetwork_id);
292+
if self.seen_message_cache.contains(&message_id) {
293+
return;
294+
}
295+
self.seen_message_cache.insert(message_id);
296+
293297
// Push a message in the queue for every single peer connected that is a member
294298
// of the selected subnetwork_id
295299
let peers = self.no_loopback_member_peers_of(message.subnetwork_id);
@@ -368,7 +372,7 @@ where
368372
Poll::Ready(Some(Ok((peer_id, message, read_half)))) => {
369373
// Replicate the message to all connected peers from the same subnet if we
370374
// haven't seen it yet
371-
self.replicate_message(cx.waker(), &message);
375+
self.send_message_impl(Some(cx.waker()), &message);
372376
// Schedule waiting for any next incoming message on the same stream's read half
373377
self.incoming_tasks
374378
.push(Self::try_read_message(peer_id, read_half).boxed());
@@ -472,6 +476,13 @@ where
472476
cx.waker().wake_by_ref();
473477
}
474478

479+
// We must ensure that we ge awaken if there are still pending messages in the
480+
// queue. In some scenarios it may happen that there's no other event that
481+
// triggers the wake-up.
482+
if !self.pending_outbound.is_empty() {
483+
cx.waker().wake_by_ref();
484+
}
485+
475486
Ok(())
476487
}
477488

0 commit comments

Comments
 (0)