diff --git a/cmd/ethrex/ethrex.rs b/cmd/ethrex/ethrex.rs index 96851dbe26c..8ee4160bb07 100644 --- a/cmd/ethrex/ethrex.rs +++ b/cmd/ethrex/ethrex.rs @@ -6,8 +6,11 @@ use ethrex::{ }; use ethrex_p2p::{discv4::peer_table::PeerTable, types::NodeRecord}; use serde::Deserialize; -use std::{path::Path, time::Duration}; -use tokio::signal::unix::{SignalKind, signal}; +use std::{path::Path, sync::Arc, time::Duration}; +use tokio::{ + signal::unix::{SignalKind, signal}, + sync::RwLock, +}; use tokio_util::sync::CancellationToken; use tracing::info; @@ -36,13 +39,14 @@ async fn server_shutdown( datadir: &Path, cancel_token: &CancellationToken, peer_table: PeerTable, - local_node_record: NodeRecord, + local_node_record: Arc>, ) { info!("Server shut down started..."); let node_config_path = datadir.join("node_config.json"); info!("Storing config at {:?}...", node_config_path); cancel_token.cancel(); - let node_config = NodeConfigFile::new(peer_table, local_node_record).await; + let record = local_node_record.read().await.clone(); + let node_config = NodeConfigFile::new(peer_table, record).await; store_node_config_file(node_config, node_config_path).await; tokio::time::sleep(Duration::from_secs(1)).await; info!("Server shutting down!"); diff --git a/cmd/ethrex/initializers.rs b/cmd/ethrex/initializers.rs index 0c1595586e4..b38c8c4b82c 100644 --- a/cmd/ethrex/initializers.rs +++ b/cmd/ethrex/initializers.rs @@ -36,6 +36,7 @@ use std::{ sync::Arc, time::{SystemTime, UNIX_EPOCH}, }; +use tokio::sync::RwLock; use tokio_util::{sync::CancellationToken, task::TaskTracker}; use tracing::{Level, debug, error, info, warn}; use tracing_subscriber::{ @@ -132,7 +133,7 @@ pub async fn init_rpc_api( opts: &Options, peer_handler: PeerHandler, local_p2p_node: Node, - local_node_record: NodeRecord, + local_node_record: Arc>, store: Store, blockchain: Arc, cancel_token: CancellationToken, @@ -377,7 +378,12 @@ async fn set_sync_block(store: &Store) { pub async fn init_l1( opts: Options, log_filter_handler: Option>, -) -> eyre::Result<(PathBuf, CancellationToken, PeerTable, NodeRecord)> { +) -> eyre::Result<( + PathBuf, + CancellationToken, + PeerTable, + Arc>, +)> { let datadir: &PathBuf = if opts.dev && cfg!(feature = "dev") { &opts.datadir.join("dev") } else { @@ -426,7 +432,17 @@ pub async fn init_l1( let local_p2p_node = get_local_p2p_node(&opts, &signer); - let local_node_record = get_local_node_record(datadir, &local_p2p_node, &signer); + let mut local_node_record = get_local_node_record(datadir, &local_p2p_node, &signer); + let fork_id = store + .get_fork_id() + .await + .expect("Failed to get fork id from store"); + + local_node_record + .set_fork_id(fork_id, &signer) + .expect("Failed to set fork id on local node record"); + + let local_node_record = Arc::new(RwLock::new(local_node_record)); let peer_table = PeerTable::spawn(opts.target_peers); @@ -437,6 +453,7 @@ pub async fn init_l1( let p2p_context = P2PContext::new( local_p2p_node.clone(), + local_node_record.clone(), tracker.clone(), signer, peer_table.clone(), diff --git a/cmd/ethrex/l2/initializers.rs b/cmd/ethrex/l2/initializers.rs index d8124eca057..01e40525c7b 100644 --- a/cmd/ethrex/l2/initializers.rs +++ b/cmd/ethrex/l2/initializers.rs @@ -28,6 +28,7 @@ use eyre::OptionExt; use secp256k1::SecretKey; use spawned_concurrency::tasks::GenServerHandle; use std::{fs::read_to_string, path::Path, sync::Arc, time::Duration}; +use tokio::sync::RwLock; use tokio::task::JoinSet; use tokio_util::{sync::CancellationToken, task::TaskTracker}; use tracing::{info, warn}; @@ -41,7 +42,7 @@ async fn init_rpc_api( l2_opts: &L2Options, peer_handler: Option, local_p2p_node: Node, - local_node_record: NodeRecord, + local_node_record: Arc>, store: Store, blockchain: Arc, syncer: Option>, @@ -209,7 +210,11 @@ pub async fn init_l2( let local_p2p_node = get_local_p2p_node(&opts.node_opts, &signer); - let local_node_record = get_local_node_record(&datadir, &local_p2p_node, &signer); + let local_node_record = Arc::new(RwLock::new(get_local_node_record( + &datadir, + &local_p2p_node, + &signer, + ))); // TODO: Check every module starts properly. let tracker = TaskTracker::new(); @@ -223,6 +228,7 @@ pub async fn init_l2( let peer_table = PeerTable::spawn(opts.node_opts.target_peers); let p2p_context = P2PContext::new( local_p2p_node.clone(), + local_node_record.clone(), tracker.clone(), signer, peer_table.clone(), @@ -341,7 +347,8 @@ pub async fn init_l2( cancel_token.cancel(); if based { let peer_handler = peer_handler.ok_or_eyre("Peer handler not initialized")?; - let node_config = NodeConfigFile::new(peer_handler.peer_table, local_node_record).await; + let record = local_node_record.read().await.clone(); + let node_config = NodeConfigFile::new(peer_handler.peer_table, record).await; store_node_config_file(node_config, node_config_path).await; } tokio::time::sleep(Duration::from_secs(1)).await; diff --git a/crates/l2/networking/rpc/rpc.rs b/crates/l2/networking/rpc/rpc.rs index f99f77cb2f1..804865784e5 100644 --- a/crates/l2/networking/rpc/rpc.rs +++ b/crates/l2/networking/rpc/rpc.rs @@ -31,6 +31,7 @@ use std::{ sync::{Arc, Mutex}, time::Duration, }; +use tokio::sync::RwLock; use tokio::{net::TcpListener, sync::Mutex as TokioMutex}; use tower_http::cors::CorsLayer; use tracing::{debug, info}; @@ -76,7 +77,7 @@ pub async fn start_api( blockchain: Arc, jwt_secret: Bytes, local_p2p_node: Node, - local_node_record: NodeRecord, + local_node_record: Arc>, syncer: Option>, peer_handler: Option, client_version: String, diff --git a/crates/networking/p2p/discv4/server.rs b/crates/networking/p2p/discv4/server.rs index e3b0df07de8..921a3c79ad8 100644 --- a/crates/networking/p2p/discv4/server.rs +++ b/crates/networking/p2p/discv4/server.rs @@ -14,7 +14,7 @@ use crate::{ }, }; use bytes::BytesMut; -use ethrex_common::{H256, H512, types::ForkId}; +use ethrex_common::{H256, H512, types::BlockHeader, types::ForkId}; use ethrex_storage::{Store, error::StoreError}; use futures::StreamExt; use rand::rngs::OsRng; @@ -26,8 +26,9 @@ use spawned_concurrency::{ send_message_on, spawn_listener, }, }; +use spawned_rt::tasks::BroadcastStream; use std::{net::SocketAddr, sync::Arc, time::Duration}; -use tokio::net::UdpSocket; +use tokio::{net::UdpSocket, sync::RwLock}; use tokio_util::udp::UdpFramed; use tracing::{debug, error, info, trace}; @@ -72,6 +73,7 @@ pub enum InMessage { Prune, ChangeFindNodeMessage, Shutdown, + NewHead(Box), } #[derive(Debug, Clone)] @@ -82,7 +84,7 @@ pub enum OutMessage { #[derive(Debug)] pub struct DiscoveryServer { local_node: Node, - local_node_record: NodeRecord, + local_node_record: Arc>, signer: SecretKey, udp_socket: Arc, store: Store, @@ -96,6 +98,7 @@ impl DiscoveryServer { pub async fn spawn( storage: Store, local_node: Node, + local_node_record: Arc>, signer: SecretKey, udp_socket: Arc, mut peer_table: PeerTable, @@ -103,14 +106,6 @@ impl DiscoveryServer { ) -> Result<(), DiscoveryServerError> { info!("Starting Discovery Server"); - let mut local_node_record = NodeRecord::from_node(&local_node, 1, &signer) - .expect("Failed to create local node record"); - if let Ok(fork_id) = storage.get_fork_id().await { - local_node_record - .set_fork_id(fork_id, &signer) - .expect("Failed to set fork_id on local node record"); - } - let mut discovery_server = Self { local_node: local_node.clone(), local_node_record, @@ -338,7 +333,7 @@ impl DiscoveryServer { udp_port: node.udp_port, tcp_port: node.tcp_port, }; - let enr_seq = self.local_node_record.seq; + let enr_seq = self.local_node_record.read().await.seq; let ping = Message::Ping(PingMessage::new(from, to, expiration).with_enr_seq(enr_seq)); ping.encode_with_header(&mut buf, &self.signer); let ping_hash: [u8; 32] = buf[..32] @@ -360,7 +355,7 @@ impl DiscoveryServer { tcp_port: node.tcp_port, }; - let enr_seq = self.local_node_record.seq; + let enr_seq = self.local_node_record.read().await.seq; let pong = Message::Pong(PongMessage::new(to, ping_hash, expiration).with_enr_seq(enr_seq)); @@ -395,7 +390,10 @@ impl DiscoveryServer { ) -> Result<(), DiscoveryServerError> { let node_record = &self.local_node_record; - let msg = Message::ENRResponse(ENRResponseMessage::new(request_hash, node_record.clone())); + let msg = Message::ENRResponse(ENRResponseMessage::new( + request_hash, + node_record.read().await.clone(), + )); self.send(msg, from).await?; @@ -629,6 +627,31 @@ impl DiscoveryServer { |e| error!(sending = ?message, addr = ?addr, err=?e, "Error sending message"), )?) } + + /// Update Local node record with correct fork_id when a new canonical head is received. + async fn update_local_node_record( + &self, + latest_block_header: BlockHeader, + ) -> Result<(), DiscoveryServerError> { + let mut node_record = self.local_node_record.read().await.clone(); + let pairs = node_record.decode_pairs(); + + tracing::trace!(fork_id=?&pairs.eth, latest_block_number=latest_block_header.number, latest_block_timestamp=latest_block_header.timestamp, "Updating ForkId"); + if let Some(fork_id) = pairs.eth + && fork_id.fork_next <= latest_block_header.timestamp + { + let latest_fork_id = self.store.get_fork_id().await?; + if node_record + .set_fork_id(latest_fork_id, &self.signer) + .inspect_err(|e| error!(err = ?e, "Failed to update node record with fork id")) + .is_err() + { + return Ok(()); + } + *self.local_node_record.write().await = node_record; + } + Ok(()) + } } impl GenServer for DiscoveryServer { @@ -658,6 +681,21 @@ impl GenServer for DiscoveryServer { } }), ); + + let head_rx = BroadcastStream::new(self.store.subscribe_chain_head_update()); + spawn_listener( + handle.clone(), + head_rx.filter_map(|header| async { + match header { + Ok(header) => Some(InMessage::NewHead(Box::new(header))), + Err(e) => { + debug!(error=?e, "Error receiving latest canonical head"); + None + } + } + }), + ); + send_interval( REVALIDATION_CHECK_INTERVAL, handle.clone(), @@ -725,6 +763,12 @@ impl GenServer for DiscoveryServer { Self::CastMsg::ChangeFindNodeMessage => { self.find_node_message = Self::random_message(&self.signer); } + Self::CastMsg::NewHead(header) => { + let _ = self + .update_local_node_record(*header) + .await + .inspect_err(|e| error!(err=?e, "Error updating local node record")); + } Self::CastMsg::Shutdown => return CastResponse::Stop, } CastResponse::NoReply diff --git a/crates/networking/p2p/network.rs b/crates/networking/p2p/network.rs index 55574e18d99..460783318b4 100644 --- a/crates/networking/p2p/network.rs +++ b/crates/networking/p2p/network.rs @@ -15,7 +15,7 @@ use crate::{ p2p::SUPPORTED_SNAP_CAPABILITIES, }, tx_broadcaster::{TxBroadcaster, TxBroadcasterError}, - types::Node, + types::{Node, NodeRecord}, }; use ethrex_blockchain::Blockchain; use ethrex_storage::Store; @@ -27,7 +27,10 @@ use std::{ sync::{Arc, atomic::Ordering}, time::{Duration, SystemTime}, }; -use tokio::net::{TcpListener, TcpSocket, UdpSocket}; +use tokio::{ + net::{TcpListener, TcpSocket, UdpSocket}, + sync::RwLock, +}; use tokio_util::task::TaskTracker; use tracing::{error, info}; @@ -42,6 +45,7 @@ pub struct P2PContext { pub blockchain: Arc, pub(crate) broadcast: PeerConnBroadcastSender, pub local_node: Node, + pub local_node_record: Arc>, pub client_version: String, #[cfg(feature = "l2")] pub based_context: Option, @@ -52,6 +56,7 @@ impl P2PContext { #[allow(clippy::too_many_arguments)] pub async fn new( local_node: Node, + local_node_record: Arc>, tracker: TaskTracker, signer: SecretKey, peer_table: PeerTable, @@ -81,6 +86,7 @@ impl P2PContext { Ok(P2PContext { local_node, + local_node_record, tracker, signer, table: peer_table, @@ -113,6 +119,7 @@ pub async fn start_network(context: P2PContext, bootnodes: Vec) -> Result< DiscoveryServer::spawn( context.storage.clone(), context.local_node.clone(), + context.local_node_record.clone(), context.signer, udp_socket.clone(), context.table.clone(), diff --git a/crates/networking/p2p/types.rs b/crates/networking/p2p/types.rs index cb5be55ec57..84c5488a1a5 100644 --- a/crates/networking/p2p/types.rs +++ b/crates/networking/p2p/types.rs @@ -378,15 +378,8 @@ impl NodeRecord { // But the spec requires nested lists: // [[forkHash, forkNext]] let eth = vec![fork_id]; - self.pairs.push(("eth".into(), eth.encode_to_vec().into())); - //Pairs need to be sorted by their key. - //The keys are Bytes which implements Ord, so they can be compared directly. The sorting - //will be lexicographic (alphabetical for string keys like "eth", "id", "ip", etc.). - self.pairs.sort_by(|a, b| a.0.cmp(&b.0)); - - self.signature = self.sign_record(signer)?; - Ok(()) + self.update("eth".into(), eth.encode_to_vec().into(), signer) } fn sign_record(&self, signer: &SecretKey) -> Result { @@ -400,6 +393,35 @@ impl NodeRecord { Ok(H512::from_slice(&signature_bytes)) } + // TODO: This is absolutely not optimized, might be a time to rethink of using vec for pairs. + pub fn update( + &mut self, + key: Bytes, + value: Bytes, + signer: &SecretKey, + ) -> Result<(), NodeError> { + let mut found = false; + for (k, v) in self.pairs.iter_mut() { + if *k == key { + found = true; + *v = value.clone(); + break; + } + } + if !found { + self.pairs.push((key, value)); + } + + //Pairs need to be sorted by their key. + //The keys are Bytes which implements Ord, so they can be compared directly. The sorting + //will be lexicographic (alphabetical for string keys like "eth", "id", "ip", etc.). + self.pairs.sort_by(|a, b| a.0.cmp(&b.0)); + + self.seq += 1; + self.signature = self.sign_record(signer)?; + Ok(()) + } + pub fn get_signature_digest(&self) -> [u8; 32] { let mut rlp = vec![]; structs::Encoder::new(&mut rlp) diff --git a/crates/networking/rpc/admin/mod.rs b/crates/networking/rpc/admin/mod.rs index 408a0fee43c..f7638bb1691 100644 --- a/crates/networking/rpc/admin/mod.rs +++ b/crates/networking/rpc/admin/mod.rs @@ -35,9 +35,9 @@ enum Protocol { Eth(ChainConfig), } -pub fn node_info(storage: Store, node_data: &NodeData) -> Result { +pub async fn node_info(storage: Store, node_data: &NodeData) -> Result { let enode_url = node_data.local_p2p_node.enode_url(); - let enr_url = match node_data.local_node_record.enr_url() { + let enr_url = match node_data.local_node_record.read().await.enr_url() { Ok(enr) => enr, Err(_) => "".into(), }; diff --git a/crates/networking/rpc/rpc.rs b/crates/networking/rpc/rpc.rs index 8787a0a1bd3..d08121f2c42 100644 --- a/crates/networking/rpc/rpc.rs +++ b/crates/networking/rpc/rpc.rs @@ -71,6 +71,7 @@ use std::{ time::Duration, }; use tokio::net::TcpListener; +use tokio::sync::RwLock; use tokio::sync::{ Mutex as TokioMutex, mpsc::{UnboundedSender, unbounded_channel}, @@ -182,7 +183,7 @@ pub struct RpcApiContext { pub struct NodeData { pub jwt_secret: Bytes, pub local_p2p_node: Node, - pub local_node_record: NodeRecord, + pub local_node_record: Arc>, pub client_version: String, pub extra_data: Bytes, } @@ -274,7 +275,7 @@ pub async fn start_api( blockchain: Arc, jwt_secret: Bytes, local_p2p_node: Node, - local_node_record: NodeRecord, + local_node_record: Arc>, syncer: SyncManager, peer_handler: PeerHandler, client_version: String, @@ -614,7 +615,7 @@ pub async fn map_admin_requests( mut context: RpcApiContext, ) -> Result { match req.method.as_str() { - "admin_nodeInfo" => admin::node_info(context.storage, &context.node_data), + "admin_nodeInfo" => admin::node_info(context.storage, &context.node_data).await, "admin_peers" => admin::peers(&mut context).await, "admin_setLogLevel" => admin::set_log_level(req, &context.log_filter_handler).await, "admin_addPeer" => admin::add_peer(&mut context, req).await, @@ -700,7 +701,13 @@ mod tests { let context = default_context_with_storage(storage).await; let local_p2p_node = context.node_data.local_p2p_node.clone(); - let enr_url = context.node_data.local_node_record.enr_url().unwrap(); + let enr_url = context + .node_data + .local_node_record + .read() + .await + .enr_url() + .unwrap(); let result = map_http_requests(&request, context).await; let rpc_response = rpc_response(request.id, result).unwrap(); let blob_schedule = serde_json::json!({ diff --git a/crates/networking/rpc/test_utils.rs b/crates/networking/rpc/test_utils.rs index 1b29f45e4eb..71f0806570d 100644 --- a/crates/networking/rpc/test_utils.rs +++ b/crates/networking/rpc/test_utils.rs @@ -26,7 +26,7 @@ use hex_literal::hex; use secp256k1::SecretKey; use spawned_concurrency::tasks::{GenServer, GenServerHandle}; use std::{net::SocketAddr, str::FromStr, sync::Arc}; -use tokio::sync::Mutex as TokioMutex; +use tokio::sync::{Mutex as TokioMutex, RwLock}; use tokio_util::{sync::CancellationToken, task::TaskTracker}; use tracing::info; // Base price for each test transaction. @@ -235,7 +235,7 @@ pub async fn start_test_api() -> tokio::task::JoinHandle<()> { let blockchain = Arc::new(Blockchain::default_with_store(storage.clone())); let jwt_secret = Default::default(); let local_p2p_node = example_p2p_node(); - let local_node_record = example_local_node_record(); + let local_node_record = Arc::new(RwLock::new(example_local_node_record())); tokio::spawn(async move { start_api( http_addr, @@ -271,7 +271,7 @@ pub async fn default_context_with_storage(storage: Store) -> RpcApiContext { node_data: NodeData { jwt_secret: Default::default(), local_p2p_node: example_p2p_node(), - local_node_record, + local_node_record: Arc::new(RwLock::new(local_node_record)), client_version: "ethrex/test".to_string(), extra_data: Bytes::new(), }, @@ -317,15 +317,22 @@ pub async fn dummy_gen_server(peer_table: PeerTable) -> GenServerHandle P2PContext { + let signer = SecretKey::from_byte_array(&[0xcd; 32]).expect("32 bytes, within curve order"); let local_node = Node::from_enode_url( "enode://d860a01f9722d78051619d1e2351aba3f43f943f6f00718d1b9baa4101932a1f5011f16bb2b1bb35db20d6fe28fa0bf09636d26a87d31de9ec6203eeedb1f666@18.138.108.67:30303", + ).expect("Bad enode url"); + + let local_node_record = NodeRecord::from_node(&local_node, 1, &signer) + .expect("Node record could not be created from local node"); + let storage = Store::new("./temp", EngineType::InMemory).expect("Failed to create Store"); P2PContext::new( local_node, + Arc::new(RwLock::new(local_node_record)), TaskTracker::default(), - SecretKey::from_byte_array(&[0xcd; 32]).expect("32 bytes, within curve order"), + signer, peer_table, storage.clone(), Arc::new(Blockchain::default_with_store(storage)), diff --git a/crates/storage/Cargo.toml b/crates/storage/Cargo.toml index 803cf8b1909..d7adc45c620 100644 --- a/crates/storage/Cargo.toml +++ b/crates/storage/Cargo.toml @@ -24,7 +24,7 @@ serde.workspace = true serde_json = "1.0.117" rocksdb = { workspace = true, optional = true } rustc-hash.workspace = true -tokio = { workspace = true, optional = true, features = ["rt"] } +tokio = { workspace = true, features = ["rt"] } bincode = "1.3.3" qfilter = "0.2.5" rayon.workspace = true @@ -32,7 +32,7 @@ lru = "0.16.2" [features] default = [] -rocksdb = ["dep:rocksdb", "dep:tokio"] +rocksdb = ["dep:rocksdb"] [dev-dependencies] hex.workspace = true diff --git a/crates/storage/store.rs b/crates/storage/store.rs index 709000d9062..e76d4c31c88 100644 --- a/crates/storage/store.rs +++ b/crates/storage/store.rs @@ -24,6 +24,7 @@ use std::{ sync::Mutex, }; use std::{fmt::Debug, path::Path}; +use tokio::sync::broadcast; use tracing::{debug, error, info}; /// Number of state trie segments to fetch concurrently during state sync pub const STATE_TRIE_SEGMENTS: usize = 2; @@ -42,6 +43,10 @@ pub struct Store { /// - a Latest tag for RPC, where a small extra delay before the newest block is expected /// - sync-related operations, which must be idempotent in order to handle reorgs latest_block_header: LatestBlockHeaderCache, + + /// Broadcasting channel used to allow other parts of the code to subscribe when new head has + /// been committed. + new_head_tx: broadcast::Sender, } pub type StorageTrieNodes = Vec<(H256, Vec<(Nibbles, Vec)>)>; @@ -83,17 +88,21 @@ impl Store { pub fn new(path: impl AsRef, engine_type: EngineType) -> Result { let path = path.as_ref(); info!(engine = ?engine_type, ?path, "Opening storage engine"); + + let (new_head_tx, _) = broadcast::channel(128); let store = match engine_type { #[cfg(feature = "rocksdb")] EngineType::RocksDB => Self { engine: Arc::new(RocksDBStore::new(path)?), chain_config: Default::default(), latest_block_header: Default::default(), + new_head_tx, }, EngineType::InMemory => Self { engine: Arc::new(InMemoryStore::new()), chain_config: Default::default(), latest_block_header: Default::default(), + new_head_tx, }, }; @@ -833,7 +842,7 @@ impl Store { .engine .get_block_header_by_hash(head_hash)? .ok_or_else(|| StoreError::MissingLatestBlockNumber)?; - self.latest_block_header.update(latest_block_header); + self.latest_block_header.update(latest_block_header.clone()); self.engine .forkchoice_update( new_canonical_blocks, @@ -844,6 +853,13 @@ impl Store { ) .await?; + // In following scenarios receivers are not initialized: + // - Genesis setup using `add_initial_state`. + // - Importing blocks using `import/import-bench` commands. + // - During tests and benchmarks. + // So, we ignore the error if there are no receivers for this notification. + let _ = self.new_head_tx.send(latest_block_header); + Ok(()) } @@ -1387,6 +1403,10 @@ impl Store { pub fn get_store_directory(&self) -> Result { self.engine.get_store_directory() } + + pub fn subscribe_chain_head_update(&self) -> broadcast::Receiver { + self.new_head_tx.subscribe() + } } pub struct AccountProof {