diff --git a/client/consensus/beefy/src/communication/peers.rs b/client/consensus/beefy/src/communication/peers.rs index 4704b8dcf4576..8f2d5cc90a1d9 100644 --- a/client/consensus/beefy/src/communication/peers.rs +++ b/client/consensus/beefy/src/communication/peers.rs @@ -24,7 +24,7 @@ use std::collections::{HashMap, VecDeque}; /// Report specifying a reputation change for a given peer. #[derive(Debug, PartialEq)] -pub(crate) struct PeerReport { +pub struct PeerReport { pub who: PeerId, pub cost_benefit: ReputationChange, } diff --git a/client/consensus/beefy/src/communication/request_response/incoming_requests_handler.rs b/client/consensus/beefy/src/communication/request_response/incoming_requests_handler.rs index 8240dd71104c2..b8d8cd35434c9 100644 --- a/client/consensus/beefy/src/communication/request_response/incoming_requests_handler.rs +++ b/client/consensus/beefy/src/communication/request_response/incoming_requests_handler.rs @@ -18,7 +18,7 @@ use codec::DecodeAll; use futures::{channel::oneshot, StreamExt}; -use log::{debug, error, trace}; +use log::{debug, trace}; use sc_client_api::BlockBackend; use sc_network::{ config as netconfig, config::RequestResponseConfig, types::ProtocolName, PeerId, @@ -182,7 +182,9 @@ where } /// Run [`BeefyJustifsRequestHandler`]. - pub async fn run(mut self) { + /// + /// Should never end, returns `Error` otherwise. + pub async fn run(&mut self) -> Error { trace!(target: BEEFY_SYNC_LOG_TARGET, "🥩 Running BeefyJustifsRequestHandler"); while let Ok(request) = self @@ -215,9 +217,6 @@ where }, } } - error!( - target: crate::LOG_TARGET, - "🥩 On-demand requests receiver stream terminated, closing worker." - ); + Error::RequestsReceiverStreamClosed } } diff --git a/client/consensus/beefy/src/communication/request_response/mod.rs b/client/consensus/beefy/src/communication/request_response/mod.rs index 1801512fa5421..4bad3b061c8e9 100644 --- a/client/consensus/beefy/src/communication/request_response/mod.rs +++ b/client/consensus/beefy/src/communication/request_response/mod.rs @@ -75,7 +75,7 @@ pub struct JustificationRequest { } #[derive(Debug, thiserror::Error)] -pub(crate) enum Error { +pub enum Error { #[error(transparent)] Client(#[from] sp_blockchain::Error), @@ -102,4 +102,7 @@ pub(crate) enum Error { #[error("Internal error while getting response.")] ResponseError, + + #[error("On-demand requests receiver stream terminated.")] + RequestsReceiverStreamClosed, } diff --git a/client/consensus/beefy/src/error.rs b/client/consensus/beefy/src/error.rs index 08b9960f41a1a..b4773f940193e 100644 --- a/client/consensus/beefy/src/error.rs +++ b/client/consensus/beefy/src/error.rs @@ -34,8 +34,18 @@ pub enum Error { Signature(String), #[error("Session uninitialized")] UninitSession, - #[error("pallet-beefy was reset, please restart voter")] + #[error("pallet-beefy was reset")] ConsensusReset, + #[error("Block import stream terminated")] + BlockImportStreamTerminated, + #[error("Gossip Engine terminated")] + GossipEngineTerminated, + #[error("Finality proofs gossiping stream terminated")] + FinalityProofGossipStreamTerminated, + #[error("Finality stream terminated")] + FinalityStreamTerminated, + #[error("Votes gossiping stream terminated")] + VotesGossipStreamTerminated, } #[cfg(test)] diff --git a/client/consensus/beefy/src/lib.rs b/client/consensus/beefy/src/lib.rs index da339dae7e1f5..0b3baa007c1ce 100644 --- a/client/consensus/beefy/src/lib.rs +++ b/client/consensus/beefy/src/lib.rs @@ -221,7 +221,7 @@ pub async fn start_beefy_gadget( B: Block, BE: Backend, C: Client + BlockBackend, - P: PayloadProvider, + P: PayloadProvider + Clone, R: ProvideRuntimeApi, R::Api: BeefyApi + MmrApi>, N: GossipNetwork + NetworkRequest + Send + Sync + 'static, @@ -237,7 +237,7 @@ pub async fn start_beefy_gadget( min_block_delta, prometheus_registry, links, - on_demand_justifications_handler, + mut on_demand_justifications_handler, } = beefy_params; let BeefyNetworkParams { @@ -248,83 +248,105 @@ pub async fn start_beefy_gadget( .. } = network_params; - let known_peers = Arc::new(Mutex::new(KnownPeers::new())); - // Default votes filter is to discard everything. - // Validator is updated later with correct starting round and set id. - let (gossip_validator, gossip_report_stream) = - communication::gossip::GossipValidator::new(known_peers.clone()); - let gossip_validator = Arc::new(gossip_validator); - let mut gossip_engine = GossipEngine::new( - network.clone(), - sync.clone(), - gossip_protocol_name, - gossip_validator.clone(), - None, - ); let metrics = register_metrics(prometheus_registry.clone()); - // The `GossipValidator` adds and removes known peers based on valid votes and network events. - let on_demand_justifications = OnDemandJustificationsEngine::new( - network.clone(), - justifications_protocol_name, - known_peers, - prometheus_registry.clone(), - ); - // Subscribe to finality notifications and justifications before waiting for runtime pallet and // reuse the streams, so we don't miss notifications while waiting for pallet to be available. let mut finality_notifications = client.finality_notification_stream().fuse(); - let block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse(); - - // Wait for BEEFY pallet to be active before starting voter. - let persisted_state = - match wait_for_runtime_pallet(&*runtime, &mut gossip_engine, &mut finality_notifications) - .await - .and_then(|(beefy_genesis, best_grandpa)| { - load_or_init_voter_state( - &*backend, - &*runtime, - beefy_genesis, - best_grandpa, - min_block_delta, - ) - }) { + let mut block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse(); + + // We re-create and re-run the worker in this loop in order to quickly reinit and resume after + // select recoverable errors. + loop { + let known_peers = Arc::new(Mutex::new(KnownPeers::new())); + // Default votes filter is to discard everything. + // Validator is updated later with correct starting round and set id. + let (gossip_validator, gossip_report_stream) = + communication::gossip::GossipValidator::new(known_peers.clone()); + let gossip_validator = Arc::new(gossip_validator); + let mut gossip_engine = GossipEngine::new( + network.clone(), + sync.clone(), + gossip_protocol_name.clone(), + gossip_validator.clone(), + None, + ); + + // The `GossipValidator` adds and removes known peers based on valid votes and network + // events. + let on_demand_justifications = OnDemandJustificationsEngine::new( + network.clone(), + justifications_protocol_name.clone(), + known_peers, + prometheus_registry.clone(), + ); + + // Wait for BEEFY pallet to be active before starting voter. + let persisted_state = match wait_for_runtime_pallet( + &*runtime, + &mut gossip_engine, + &mut finality_notifications, + ) + .await + .and_then(|(beefy_genesis, best_grandpa)| { + load_or_init_voter_state( + &*backend, + &*runtime, + beefy_genesis, + best_grandpa, + min_block_delta, + ) + }) { Ok(state) => state, Err(e) => { error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e); return }, }; - // Update the gossip validator with the right starting round and set id. - if let Err(e) = persisted_state - .gossip_filter_config() - .map(|f| gossip_validator.update_filter(f)) - { - error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e); - return - } + // Update the gossip validator with the right starting round and set id. + if let Err(e) = persisted_state + .gossip_filter_config() + .map(|f| gossip_validator.update_filter(f)) + { + error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e); + return + } - let worker = worker::BeefyWorker { - backend, - payload_provider, - runtime, - sync, - key_store: key_store.into(), - gossip_engine, - gossip_validator, - gossip_report_stream, - on_demand_justifications, - links, - metrics, - pending_justifications: BTreeMap::new(), - persisted_state, - }; + let worker = worker::BeefyWorker { + backend: backend.clone(), + payload_provider: payload_provider.clone(), + runtime: runtime.clone(), + sync: sync.clone(), + key_store: key_store.clone().into(), + gossip_engine, + gossip_validator, + gossip_report_stream, + on_demand_justifications, + links: links.clone(), + metrics: metrics.clone(), + pending_justifications: BTreeMap::new(), + persisted_state, + }; - futures::future::select( - Box::pin(worker.run(block_import_justif, finality_notifications)), - Box::pin(on_demand_justifications_handler.run()), - ) - .await; + match futures::future::select( + Box::pin(worker.run(&mut block_import_justif, &mut finality_notifications)), + Box::pin(on_demand_justifications_handler.run()), + ) + .await + { + // On `ConsensusReset` error, just reinit and restart voter. + futures::future::Either::Left((error::Error::ConsensusReset, _)) => { + error!(target: LOG_TARGET, "🥩 Error: {:?}. Restarting voter.", error::Error::ConsensusReset); + continue + }, + // On other errors, bring down / finish the task. + futures::future::Either::Left((worker_err, _)) => + error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", worker_err), + futures::future::Either::Right((odj_handler_err, _)) => + error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", odj_handler_err), + }; + return + } } fn load_or_init_voter_state( diff --git a/client/consensus/beefy/src/worker.rs b/client/consensus/beefy/src/worker.rs index 17a8891b06142..0d3845a270368 100644 --- a/client/consensus/beefy/src/worker.rs +++ b/client/consensus/beefy/src/worker.rs @@ -447,11 +447,7 @@ where .ok() .flatten() .filter(|genesis| *genesis == self.persisted_state.pallet_genesis) - .ok_or_else(|| { - let err = Error::ConsensusReset; - error!(target: LOG_TARGET, "🥩 Error: {}", err); - err - })?; + .ok_or(Error::ConsensusReset)?; if *header.number() > self.best_grandpa_block() { // update best GRANDPA finalized block we have seen @@ -795,11 +791,12 @@ where /// Main loop for BEEFY worker. /// /// Run the main async loop which is driven by finality notifications and gossiped votes. + /// Should never end, returns `Error` otherwise. pub(crate) async fn run( mut self, - mut block_import_justif: Fuse>>, - mut finality_notifications: Fuse>, - ) { + block_import_justif: &mut Fuse>>, + finality_notifications: &mut Fuse>, + ) -> Error { info!( target: LOG_TARGET, "🥩 run BEEFY worker, best grandpa: #{:?}.", @@ -848,17 +845,17 @@ where // Use `select_biased!` to prioritize order below. // Process finality notifications first since these drive the voter. notification = finality_notifications.next() => { - if notification.and_then(|notif| { - self.handle_finality_notification(¬if).ok() - }).is_none() { - error!(target: LOG_TARGET, "🥩 Finality stream terminated, closing worker."); - return; + if let Some(notif) = notification { + if let Err(err) = self.handle_finality_notification(¬if) { + return err; + } + } else { + return Error::FinalityStreamTerminated; } }, // Make sure to pump gossip engine. _ = gossip_engine => { - error!(target: LOG_TARGET, "🥩 Gossip engine has terminated, closing worker."); - return; + return Error::GossipEngineTerminated; }, // Process incoming justifications as these can make some in-flight votes obsolete. response_info = self.on_demand_justifications.next().fuse() => { @@ -881,8 +878,7 @@ where debug!(target: LOG_TARGET, "🥩 {}", err); } } else { - error!(target: LOG_TARGET, "🥩 Block import stream terminated, closing worker."); - return; + return Error::BlockImportStreamTerminated; } }, justif = gossip_proofs.next() => { @@ -892,11 +888,7 @@ where debug!(target: LOG_TARGET, "🥩 {}", err); } } else { - error!( - target: LOG_TARGET, - "🥩 Finality proofs gossiping stream terminated, closing worker." - ); - return; + return Error::FinalityProofGossipStreamTerminated; } }, // Finally process incoming votes. @@ -907,11 +899,7 @@ where debug!(target: LOG_TARGET, "🥩 {}", err); } } else { - error!( - target: LOG_TARGET, - "🥩 Votes gossiping stream terminated, closing worker." - ); - return; + return Error::VotesGossipStreamTerminated; } }, // Process peer reports. diff --git a/primitives/consensus/beefy/src/mmr.rs b/primitives/consensus/beefy/src/mmr.rs index 991dc07c5a7f3..660506b8763f1 100644 --- a/primitives/consensus/beefy/src/mmr.rs +++ b/primitives/consensus/beefy/src/mmr.rs @@ -162,6 +162,12 @@ mod mmr_root_provider { _phantom: PhantomData, } + impl Clone for MmrRootProvider { + fn clone(&self) -> Self { + Self { runtime: self.runtime.clone(), _phantom: PhantomData } + } + } + impl MmrRootProvider where B: Block,