Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions cmd/ethrex/ethrex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ use ethrex::{
};
use ethrex_p2p::{discv4::peer_table::PeerTable, types::NodeRecord};
use serde::Deserialize;
use std::{path::Path, time::Duration};
use tokio::signal::unix::{SignalKind, signal};
use std::{path::Path, sync::Arc, time::Duration};
use tokio::{
signal::unix::{SignalKind, signal},
sync::RwLock,
};
use tokio_util::sync::CancellationToken;
use tracing::info;

Expand Down Expand Up @@ -36,13 +39,14 @@ async fn server_shutdown(
datadir: &Path,
cancel_token: &CancellationToken,
peer_table: PeerTable,
local_node_record: NodeRecord,
local_node_record: Arc<RwLock<NodeRecord>>,
) {
info!("Server shut down started...");
let node_config_path = datadir.join("node_config.json");
info!("Storing config at {:?}...", node_config_path);
cancel_token.cancel();
let node_config = NodeConfigFile::new(peer_table, local_node_record).await;
let record = local_node_record.read().await.clone();
let node_config = NodeConfigFile::new(peer_table, record).await;
store_node_config_file(node_config, node_config_path).await;
tokio::time::sleep(Duration::from_secs(1)).await;
info!("Server shutting down!");
Expand Down
23 changes: 20 additions & 3 deletions cmd/ethrex/initializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use std::{
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
};
use tokio::sync::RwLock;
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use tracing::{Level, debug, error, info, warn};
use tracing_subscriber::{
Expand Down Expand Up @@ -132,7 +133,7 @@ pub async fn init_rpc_api(
opts: &Options,
peer_handler: PeerHandler,
local_p2p_node: Node,
local_node_record: NodeRecord,
local_node_record: Arc<RwLock<NodeRecord>>,
store: Store,
blockchain: Arc<Blockchain>,
cancel_token: CancellationToken,
Expand Down Expand Up @@ -377,7 +378,12 @@ async fn set_sync_block(store: &Store) {
pub async fn init_l1(
opts: Options,
log_filter_handler: Option<reload::Handle<EnvFilter, Registry>>,
) -> eyre::Result<(PathBuf, CancellationToken, PeerTable, NodeRecord)> {
) -> eyre::Result<(
PathBuf,
CancellationToken,
PeerTable,
Arc<RwLock<NodeRecord>>,
)> {
let datadir: &PathBuf = if opts.dev && cfg!(feature = "dev") {
&opts.datadir.join("dev")
} else {
Expand Down Expand Up @@ -426,7 +432,17 @@ pub async fn init_l1(

let local_p2p_node = get_local_p2p_node(&opts, &signer);

let local_node_record = get_local_node_record(datadir, &local_p2p_node, &signer);
let mut local_node_record = get_local_node_record(datadir, &local_p2p_node, &signer);
let fork_id = store
.get_fork_id()
.await
.expect("Failed to get fork id from store");

local_node_record
.set_fork_id(fork_id, &signer)
.expect("Failed to set fork id on local node record");

let local_node_record = Arc::new(RwLock::new(local_node_record));

let peer_table = PeerTable::spawn(opts.target_peers);

Expand All @@ -437,6 +453,7 @@ pub async fn init_l1(

let p2p_context = P2PContext::new(
local_p2p_node.clone(),
local_node_record.clone(),
tracker.clone(),
signer,
peer_table.clone(),
Expand Down
13 changes: 10 additions & 3 deletions cmd/ethrex/l2/initializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use eyre::OptionExt;
use secp256k1::SecretKey;
use spawned_concurrency::tasks::GenServerHandle;
use std::{fs::read_to_string, path::Path, sync::Arc, time::Duration};
use tokio::sync::RwLock;
use tokio::task::JoinSet;
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use tracing::{info, warn};
Expand All @@ -41,7 +42,7 @@ async fn init_rpc_api(
l2_opts: &L2Options,
peer_handler: Option<PeerHandler>,
local_p2p_node: Node,
local_node_record: NodeRecord,
local_node_record: Arc<RwLock<NodeRecord>>,
store: Store,
blockchain: Arc<Blockchain>,
syncer: Option<Arc<SyncManager>>,
Expand Down Expand Up @@ -209,7 +210,11 @@ pub async fn init_l2(

let local_p2p_node = get_local_p2p_node(&opts.node_opts, &signer);

let local_node_record = get_local_node_record(&datadir, &local_p2p_node, &signer);
let local_node_record = Arc::new(RwLock::new(get_local_node_record(
&datadir,
&local_p2p_node,
&signer,
)));

// TODO: Check every module starts properly.
let tracker = TaskTracker::new();
Expand All @@ -223,6 +228,7 @@ pub async fn init_l2(
let peer_table = PeerTable::spawn(opts.node_opts.target_peers);
let p2p_context = P2PContext::new(
local_p2p_node.clone(),
local_node_record.clone(),
tracker.clone(),
signer,
peer_table.clone(),
Expand Down Expand Up @@ -341,7 +347,8 @@ pub async fn init_l2(
cancel_token.cancel();
if based {
let peer_handler = peer_handler.ok_or_eyre("Peer handler not initialized")?;
let node_config = NodeConfigFile::new(peer_handler.peer_table, local_node_record).await;
let record = local_node_record.read().await.clone();
let node_config = NodeConfigFile::new(peer_handler.peer_table, record).await;
store_node_config_file(node_config, node_config_path).await;
}
tokio::time::sleep(Duration::from_secs(1)).await;
Expand Down
3 changes: 2 additions & 1 deletion crates/l2/networking/rpc/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use std::{
sync::{Arc, Mutex},
time::Duration,
};
use tokio::sync::RwLock;
use tokio::{net::TcpListener, sync::Mutex as TokioMutex};
use tower_http::cors::CorsLayer;
use tracing::{debug, info};
Expand Down Expand Up @@ -76,7 +77,7 @@ pub async fn start_api(
blockchain: Arc<Blockchain>,
jwt_secret: Bytes,
local_p2p_node: Node,
local_node_record: NodeRecord,
local_node_record: Arc<RwLock<NodeRecord>>,
syncer: Option<Arc<SyncManager>>,
peer_handler: Option<PeerHandler>,
client_version: String,
Expand Down
72 changes: 58 additions & 14 deletions crates/networking/p2p/discv4/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
},
};
use bytes::BytesMut;
use ethrex_common::{H256, H512, types::ForkId};
use ethrex_common::{H256, H512, types::BlockHeader, types::ForkId};
use ethrex_storage::{Store, error::StoreError};
use futures::StreamExt;
use rand::rngs::OsRng;
Expand All @@ -26,8 +26,9 @@ use spawned_concurrency::{
send_message_on, spawn_listener,
},
};
use spawned_rt::tasks::BroadcastStream;
use std::{net::SocketAddr, sync::Arc, time::Duration};
use tokio::net::UdpSocket;
use tokio::{net::UdpSocket, sync::RwLock};
use tokio_util::udp::UdpFramed;
use tracing::{debug, error, info, trace};

Expand Down Expand Up @@ -72,6 +73,7 @@ pub enum InMessage {
Prune,
ChangeFindNodeMessage,
Shutdown,
NewHead(Box<BlockHeader>),
}

#[derive(Debug, Clone)]
Expand All @@ -82,7 +84,7 @@ pub enum OutMessage {
#[derive(Debug)]
pub struct DiscoveryServer {
local_node: Node,
local_node_record: NodeRecord,
local_node_record: Arc<RwLock<NodeRecord>>,
signer: SecretKey,
udp_socket: Arc<UdpSocket>,
store: Store,
Expand All @@ -96,21 +98,14 @@ impl DiscoveryServer {
pub async fn spawn(
storage: Store,
local_node: Node,
local_node_record: Arc<RwLock<NodeRecord>>,
signer: SecretKey,
udp_socket: Arc<UdpSocket>,
mut peer_table: PeerTable,
bootnodes: Vec<Node>,
) -> Result<(), DiscoveryServerError> {
info!("Starting Discovery Server");

let mut local_node_record = NodeRecord::from_node(&local_node, 1, &signer)
.expect("Failed to create local node record");
if let Ok(fork_id) = storage.get_fork_id().await {
local_node_record
.set_fork_id(fork_id, &signer)
.expect("Failed to set fork_id on local node record");
}

let mut discovery_server = Self {
local_node: local_node.clone(),
local_node_record,
Expand Down Expand Up @@ -338,7 +333,7 @@ impl DiscoveryServer {
udp_port: node.udp_port,
tcp_port: node.tcp_port,
};
let enr_seq = self.local_node_record.seq;
let enr_seq = self.local_node_record.read().await.seq;
let ping = Message::Ping(PingMessage::new(from, to, expiration).with_enr_seq(enr_seq));
ping.encode_with_header(&mut buf, &self.signer);
let ping_hash: [u8; 32] = buf[..32]
Expand All @@ -360,7 +355,7 @@ impl DiscoveryServer {
tcp_port: node.tcp_port,
};

let enr_seq = self.local_node_record.seq;
let enr_seq = self.local_node_record.read().await.seq;

let pong = Message::Pong(PongMessage::new(to, ping_hash, expiration).with_enr_seq(enr_seq));

Expand Down Expand Up @@ -395,7 +390,10 @@ impl DiscoveryServer {
) -> Result<(), DiscoveryServerError> {
let node_record = &self.local_node_record;

let msg = Message::ENRResponse(ENRResponseMessage::new(request_hash, node_record.clone()));
let msg = Message::ENRResponse(ENRResponseMessage::new(
request_hash,
node_record.read().await.clone(),
));

self.send(msg, from).await?;

Expand Down Expand Up @@ -629,6 +627,31 @@ impl DiscoveryServer {
|e| error!(sending = ?message, addr = ?addr, err=?e, "Error sending message"),
)?)
}

/// Update Local node record with correct fork_id when a new canonical head is received.
async fn update_local_node_record(
&self,
latest_block_header: BlockHeader,
) -> Result<(), DiscoveryServerError> {
let mut node_record = self.local_node_record.read().await.clone();
let pairs = node_record.decode_pairs();

tracing::trace!(fork_id=?&pairs.eth, latest_block_number=latest_block_header.number, latest_block_timestamp=latest_block_header.timestamp, "Updating ForkId");
if let Some(fork_id) = pairs.eth
&& fork_id.fork_next <= latest_block_header.timestamp
{
let latest_fork_id = self.store.get_fork_id().await?;
if node_record
.set_fork_id(latest_fork_id, &self.signer)
.inspect_err(|e| error!(err = ?e, "Failed to update node record with fork id"))
.is_err()
{
return Ok(());
}
*self.local_node_record.write().await = node_record;
}
Ok(())
}
}

impl GenServer for DiscoveryServer {
Expand Down Expand Up @@ -658,6 +681,21 @@ impl GenServer for DiscoveryServer {
}
}),
);

let head_rx = BroadcastStream::new(self.store.subscribe_chain_head_update());
spawn_listener(
handle.clone(),
head_rx.filter_map(|header| async {
match header {
Ok(header) => Some(InMessage::NewHead(Box::new(header))),
Err(e) => {
debug!(error=?e, "Error receiving latest canonical head");
None
}
}
}),
);

send_interval(
REVALIDATION_CHECK_INTERVAL,
handle.clone(),
Expand Down Expand Up @@ -725,6 +763,12 @@ impl GenServer for DiscoveryServer {
Self::CastMsg::ChangeFindNodeMessage => {
self.find_node_message = Self::random_message(&self.signer);
}
Self::CastMsg::NewHead(header) => {
let _ = self
.update_local_node_record(*header)
.await
.inspect_err(|e| error!(err=?e, "Error updating local node record"));
}
Self::CastMsg::Shutdown => return CastResponse::Stop,
}
CastResponse::NoReply
Expand Down
11 changes: 9 additions & 2 deletions crates/networking/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
p2p::SUPPORTED_SNAP_CAPABILITIES,
},
tx_broadcaster::{TxBroadcaster, TxBroadcasterError},
types::Node,
types::{Node, NodeRecord},
};
use ethrex_blockchain::Blockchain;
use ethrex_storage::Store;
Expand All @@ -27,7 +27,10 @@ use std::{
sync::{Arc, atomic::Ordering},
time::{Duration, SystemTime},
};
use tokio::net::{TcpListener, TcpSocket, UdpSocket};
use tokio::{
net::{TcpListener, TcpSocket, UdpSocket},
sync::RwLock,
};
use tokio_util::task::TaskTracker;
use tracing::{error, info};

Expand All @@ -42,6 +45,7 @@ pub struct P2PContext {
pub blockchain: Arc<Blockchain>,
pub(crate) broadcast: PeerConnBroadcastSender,
pub local_node: Node,
pub local_node_record: Arc<RwLock<NodeRecord>>,
pub client_version: String,
#[cfg(feature = "l2")]
pub based_context: Option<P2PBasedContext>,
Expand All @@ -52,6 +56,7 @@ impl P2PContext {
#[allow(clippy::too_many_arguments)]
pub async fn new(
local_node: Node,
local_node_record: Arc<RwLock<NodeRecord>>,
tracker: TaskTracker,
signer: SecretKey,
peer_table: PeerTable,
Expand Down Expand Up @@ -81,6 +86,7 @@ impl P2PContext {

Ok(P2PContext {
local_node,
local_node_record,
tracker,
signer,
table: peer_table,
Expand Down Expand Up @@ -113,6 +119,7 @@ pub async fn start_network(context: P2PContext, bootnodes: Vec<Node>) -> Result<
DiscoveryServer::spawn(
context.storage.clone(),
context.local_node.clone(),
context.local_node_record.clone(),
context.signer,
udp_socket.clone(),
context.table.clone(),
Expand Down
Loading
Loading