Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Prepare BloackAnnounceValidator for pushing block announcements fro…
Browse files Browse the repository at this point in the history
…m somewhere else
  • Loading branch information
dmitry-markin committed Aug 24, 2023
1 parent f137c55 commit c717dbc
Show file tree
Hide file tree
Showing 6 changed files with 289 additions and 91 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion client/network/sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ array-bytes = "6.1"
async-channel = "1.8.0"
async-trait = "0.1.58"
codec = { package = "parity-scale-codec", version = "3.6.1", features = ["derive"] }
event-listener = "2.5.3"
futures = "0.3.21"
futures-timer = "3.0.2"
libp2p = "0.52.1"
Expand Down
116 changes: 27 additions & 89 deletions client/network/sync/src/block_announce_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@

//! `BlockAnnounceValidator` is responsible for async validation of block announcements.

use event_listener::{Event, EventListener};
use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
use crate::futures_stream::{futures_stream, FuturesStreamReceiver, FuturesStreamSender};
use futures::{Future, FutureExt, Stream, StreamExt};
use libp2p::PeerId;
use log::{debug, error, trace, warn};
use sc_network_common::sync::message::BlockAnnounce;
use sp_consensus::block_validation::Validation;
use sp_runtime::traits::{Block as BlockT, Header, Zero};
use std::{
collections::{hash_map::Entry, HashMap},
default::Default,
pin::Pin,
task::{Context, Poll},
};
Expand Down Expand Up @@ -96,29 +97,27 @@ enum AllocateSlotForBlockAnnounceValidation {
pub(crate) struct BlockAnnounceValidator<B: BlockT> {
/// A type to check incoming block announcements.
validator: Box<dyn sp_consensus::block_validation::BlockAnnounceValidator<B> + Send>,
/// All block announcements that are currently being validated.
validations: FuturesUnordered<
/// All block announcements that are currently being validated, sending side of the stream.
tx_validations: FuturesStreamSender<
Pin<Box<dyn Future<Output = BlockAnnounceValidationResult<B::Header>> + Send>>,
>,
/// All block announcements that are currently being validated, receiving side of the stream.
rx_validations: FuturesStreamReceiver<
Pin<Box<dyn Future<Output = BlockAnnounceValidationResult<B::Header>> + Send>>,
>,
/// Number of concurrent block announce validations per peer.
validations_per_peer: HashMap<PeerId, usize>,
/// Wake-up event when new validations are pushed.
event: Event,
/// Listener for wake-up events in [`Stream::poll_next`] implementation.
event_listener: Option<EventListener>,
}

impl<B: BlockT> BlockAnnounceValidator<B> {
pub(crate) fn new(
validator: Box<dyn sp_consensus::block_validation::BlockAnnounceValidator<B> + Send>,
) -> Self {
Self {
validator,
validations: Default::default(),
validations_per_peer: Default::default(),
event: Event::new(),
event_listener: None,
}
let (tx_validations, rx_validations) = futures_stream(
"mpsc_block_announce_validation_stream",
MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS + 1,
);
Self { validator, tx_validations, rx_validations, validations_per_peer: Default::default() }
}

/// Push a block announce validation.
Expand Down Expand Up @@ -178,7 +177,7 @@ impl<B: BlockT> BlockAnnounceValidator<B> {
let assoc_data = announce.data.as_ref().map_or(&[][..], |v| v.as_slice());
let future = self.validator.validate(header, assoc_data);

self.validations.push(
let _ = self.tx_validations.push(
async move {
match future.await {
Ok(Validation::Success { is_new_best }) => {
Expand Down Expand Up @@ -220,9 +219,6 @@ impl<B: BlockT> BlockAnnounceValidator<B> {
}
.boxed(),
);

// Make sure [`Stream::poll_next`] is woken up.
self.event.notify(1);
}

/// Checks if there is a slot for a block announce validation.
Expand All @@ -240,7 +236,7 @@ impl<B: BlockT> BlockAnnounceValidator<B> {
&mut self,
peer_id: &PeerId,
) -> AllocateSlotForBlockAnnounceValidation {
if self.validations.len() >= MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS {
if self.rx_validations.len_lower_bound() >= MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS {
return AllocateSlotForBlockAnnounceValidation::TotalMaximumSlotsReached
}

Expand Down Expand Up @@ -300,37 +296,16 @@ impl<B: BlockT> Stream for BlockAnnounceValidator<B> {

/// Poll for finished block announce validations. The stream never terminates.
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Note: the wake-up code below is modeled after `async-channel`.
// See https://github.com/smol-rs/async-channel/blob/4cae9cb0cbbd7c3c0518438e03a01e312b303e59/src/lib.rs#L787-L825
loop {
// Wait for wake-up event if we are in a waiting state after `self.validations`
// was deplenished.
if let Some(listener) = self.event_listener.as_mut() {
let () = futures::ready!(listener.poll_unpin(cx));
self.event_listener = None;
}

loop {
if let Some(validation) = futures::ready!(self.validations.poll_next_unpin(cx)) {
self.event_listener = None;
self.deallocate_slot_for_block_announce_validation(validation.peer_id());
if let Some(validation) = futures::ready!(self.rx_validations.poll_next_unpin(cx)) {
self.deallocate_slot_for_block_announce_validation(validation.peer_id());

return Poll::Ready(Some(validation))
}

// `self.validations` was deplenished, start/continue waiting for a wake-up event.
match self.event_listener {
Some(_) => {
// Go back to the outer loop to wait for a wake-up event.
break
},
None => {
// Create listener and go polling `self.validations` again in case it was
// populated just before the listener was created.
self.event_listener = Some(self.event.listen());
},
}
}
Poll::Ready(Some(validation))
} else {
trace!(
target: LOG_TARGET,
"Block announce validations stream terminated, terminating `BlockAnnounceValidator`",
);
Poll::Ready(None)
}
}
}
Expand All @@ -339,47 +314,10 @@ impl<B: BlockT> Stream for BlockAnnounceValidator<B> {
mod tests {
use super::*;
use crate::block_announce_validator::AllocateSlotForBlockAnnounceValidation;
use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt};
use libp2p::PeerId;
use sp_consensus::block_validation::DefaultBlockAnnounceValidator;
use std::task::Poll;
use substrate_test_runtime_client::runtime::Block;

/// `Stream` implementation for `BlockAnnounceValidator` relies on the undocumented
/// feature that `FuturesUnordered` can be polled and repeatedly yield
/// `Poll::Ready(None)` before any futures were added into it.
#[tokio::test]
async fn empty_futures_unordered_can_be_polled() {
let mut unordered = FuturesUnordered::<BoxFuture<()>>::default();

futures::future::poll_fn(|cx| {
assert_eq!(unordered.poll_next_unpin(cx), Poll::Ready(None));
assert_eq!(unordered.poll_next_unpin(cx), Poll::Ready(None));

Poll::Ready(())
})
.await;
}

/// `Stream` implementation for `BlockAnnounceValidator` relies on the undocumented
/// feature that `FuturesUnordered` can be polled and repeatedly yield
/// `Poll::Ready(None)` after all the futures in it have resolved.
#[tokio::test]
async fn deplenished_futures_unordered_can_be_polled() {
let mut unordered = FuturesUnordered::<BoxFuture<()>>::default();

unordered.push(futures::future::ready(()).boxed());
assert_eq!(unordered.next().await, Some(()));

futures::future::poll_fn(|cx| {
assert_eq!(unordered.poll_next_unpin(cx), Poll::Ready(None));
assert_eq!(unordered.poll_next_unpin(cx), Poll::Ready(None));

Poll::Ready(())
})
.await;
}

#[test]
fn allocate_one_validation_slot() {
let mut validator =
Expand Down Expand Up @@ -460,12 +398,12 @@ mod tests {
BlockAnnounceValidator::<Block>::new(Box::new(DefaultBlockAnnounceValidator {}));

for _ in 0..MAX_CONCURRENT_BLOCK_ANNOUNCE_VALIDATIONS {
validator.validations.push(
let _ = validator.tx_validations.push(
futures::future::ready(BlockAnnounceValidationResult::Skip {
peer_id: PeerId::random(),
})
.boxed(),
)
);
}

let peer_id = PeerId::random();
Expand Down
Loading

0 comments on commit c717dbc

Please sign in to comment.