From c873e0c7f6a072339c720e033505b77512836f29 Mon Sep 17 00:00:00 2001 From: startup-dreamer Date: Tue, 4 Mar 2025 00:08:15 +0530 Subject: [PATCH] feat: tried to add bincode-serialization feature --- Cargo.lock | 2 + crates/exex/exex/Cargo.toml | 5 +++ crates/exex/exex/src/context.rs | 9 +++-- crates/exex/exex/src/dyn_context.rs | 6 +-- crates/exex/exex/src/lib.rs | 5 +++ crates/exex/exex/src/manager.rs | 6 +-- crates/exex/exex/src/notifications.rs | 9 +++-- crates/exex/exex/src/wal/error.rs | 2 +- crates/exex/exex/src/wal/mod.rs | 8 ++-- crates/exex/exex/src/wal/storage.rs | 48 ++++++++++++++++++------ crates/node/builder/src/launch/engine.rs | 3 +- crates/node/builder/src/launch/exex.rs | 6 ++- 12 files changed, 77 insertions(+), 32 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0ee84d949af1..ea3a9ca646ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7869,6 +7869,8 @@ dependencies = [ "reth-tracing", "rmp-serde", "secp256k1 0.30.0", + "serde_json", + "serde_with", "tempfile", "thiserror 2.0.11", "tokio", diff --git a/crates/exex/exex/Cargo.toml b/crates/exex/exex/Cargo.toml index c06005200d24..4804635dcb32 100644 --- a/crates/exex/exex/Cargo.toml +++ b/crates/exex/exex/Cargo.toml @@ -18,6 +18,8 @@ reth-chainspec.workspace = true reth-config.workspace = true reth-evm.workspace = true reth-exex-types = { workspace = true, features = ["serde", "serde-bincode-compat"] } +serde_json.workspace = true +serde_with.workspace = true reth-fs-util.workspace = true reth-metrics.workspace = true reth-node-api.workspace = true @@ -81,3 +83,6 @@ serde = [ "reth-primitives-traits/serde", "reth-prune-types/serde", ] +bincode-serialization = [ + "reth-exex-types/serde-bincode-compat", +] diff --git a/crates/exex/exex/src/context.rs b/crates/exex/exex/src/context.rs index 9b4b5876f795..2d7927443168 100644 --- a/crates/exex/exex/src/context.rs +++ b/crates/exex/exex/src/context.rs @@ -10,7 +10,7 @@ use reth_provider::BlockReader; use reth_tasks::TaskExecutor; use std::fmt::Debug; use tokio::sync::mpsc::{error::SendError, UnboundedSender}; - +use alloy_primitives::private::serde; /// Captures the context that an `ExEx` has access to. /// /// This type wraps various node components that the `ExEx` has access to. @@ -64,7 +64,7 @@ where Node: FullNodeComponents, Node::Provider: Debug + BlockReader, Node::Executor: Debug, - Node::Types: NodeTypes, + Node::Types: NodeTypes, { /// Returns dynamic version of the context pub fn into_dyn(self) -> ExExContextDyn> { @@ -75,7 +75,7 @@ where impl ExExContext where Node: FullNodeComponents, - Node::Types: NodeTypes, + Node::Types: NodeTypes, { /// Returns the transaction pool of the node. pub fn pool(&self) -> &Node::Pool { @@ -143,6 +143,8 @@ where #[cfg(test)] mod tests { use crate::ExExContext; + use alloy_primitives::private::serde; + use reth_node_api::{NodePrimitives, NodeTypes}; use reth_exex_types::ExExHead; use reth_node_api::FullNodeComponents; use reth_provider::BlockReader; @@ -158,6 +160,7 @@ mod tests { impl ExEx where Node::Provider: BlockReader, + Node::Types: NodeTypes, { async fn _test_bounds(mut self) -> eyre::Result<()> { self.ctx.pool(); diff --git a/crates/exex/exex/src/dyn_context.rs b/crates/exex/exex/src/dyn_context.rs index de01f70e5703..ab763b46b2f2 100644 --- a/crates/exex/exex/src/dyn_context.rs +++ b/crates/exex/exex/src/dyn_context.rs @@ -9,7 +9,7 @@ use reth_primitives::EthPrimitives; use reth_provider::BlockReader; use std::fmt::Debug; use tokio::sync::mpsc; - +use alloy_primitives::private::serde; use crate::{ExExContext, ExExEvent, ExExNotificationsStream}; // TODO(0xurb) - add `node` after abstractions @@ -52,7 +52,7 @@ impl Debug for ExExContextDyn { impl From> for ExExContextDyn> where - Node: FullNodeComponents>, + Node: FullNodeComponents>, Node::Provider: Debug + BlockReader, Node::Executor: Debug, { @@ -60,7 +60,7 @@ where let config = ctx.config.map_chainspec(|chainspec| { Box::new(chainspec) as Box>> }); - let notifications = Box::new(ctx.notifications) as Box<_>; + let notifications = Box::new(ctx.notifications); Self { head: ctx.head, diff --git a/crates/exex/exex/src/lib.rs b/crates/exex/exex/src/lib.rs index d5da6a18faa5..bc131267d6f4 100644 --- a/crates/exex/exex/src/lib.rs +++ b/crates/exex/exex/src/lib.rs @@ -88,6 +88,11 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] #![cfg_attr(not(test), warn(unused_crate_dependencies))] +// Acknowledge intentional dependencies +// #[cfg(feature = "bincode-serialization")] +use rmp_serde as _; +use serde_with as _; + mod backfill; pub use backfill::*; diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index a7480ab74866..d17d5bbe3a07 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -30,7 +30,7 @@ use tokio::sync::{ watch, }; use tokio_util::sync::{PollSendError, PollSender, ReusableBoxFuture}; - +use alloy_primitives::private::serde; /// Default max size of the internal state notifications buffer. /// /// 1024 notifications in the buffer is 3.5 hours of mainnet blocks, @@ -348,7 +348,7 @@ where impl ExExManager where P: HeaderProvider, - N: NodePrimitives, + N: NodePrimitives + serde::Serialize + serde::de::DeserializeOwned, { /// Finalizes the WAL according to the passed finalized header. /// @@ -422,7 +422,7 @@ where impl Future for ExExManager where P: HeaderProvider + Unpin + 'static, - N: NodePrimitives, + N: NodePrimitives + serde::Serialize + serde::de::DeserializeOwned, { type Output = eyre::Result<()>; diff --git a/crates/exex/exex/src/notifications.rs b/crates/exex/exex/src/notifications.rs index 892b48181bb4..8d55bf968699 100644 --- a/crates/exex/exex/src/notifications.rs +++ b/crates/exex/exex/src/notifications.rs @@ -15,6 +15,7 @@ use std::{ task::{ready, Context, Poll}, }; use tokio::sync::mpsc::Receiver; +use alloy_primitives::private::serde; /// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. If the /// stream is configured with a head via [`ExExNotifications::set_with_head`] or @@ -105,7 +106,7 @@ where impl ExExNotificationsStream for ExExNotifications where P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static, - E: BlockExecutorProvider> + E: BlockExecutorProvider + serde::Serialize + serde::de::DeserializeOwned> + Clone + Unpin + 'static, @@ -157,7 +158,7 @@ where impl Stream for ExExNotifications where P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static, - E: BlockExecutorProvider> + E: BlockExecutorProvider + serde::Serialize + serde::de::DeserializeOwned> + Clone + Unpin + 'static, @@ -301,7 +302,7 @@ where impl ExExNotificationsWithHead where P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static, - E: BlockExecutorProvider> + E: BlockExecutorProvider + serde::Serialize + serde::de::DeserializeOwned> + Clone + Unpin + 'static, @@ -381,7 +382,7 @@ where impl Stream for ExExNotificationsWithHead where P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static, - E: BlockExecutorProvider> + E: BlockExecutorProvider + serde::Serialize + serde::de::DeserializeOwned> + Clone + Unpin + 'static, diff --git a/crates/exex/exex/src/wal/error.rs b/crates/exex/exex/src/wal/error.rs index b091890f6ff8..ebfb2cb7d8a5 100644 --- a/crates/exex/exex/src/wal/error.rs +++ b/crates/exex/exex/src/wal/error.rs @@ -25,5 +25,5 @@ pub enum WalError { FileNotFound(u32), /// Decode error #[error("failed to decode notification {0} from {1}: {2}")] - Decode(u32, PathBuf, rmp_serde::decode::Error), + Decode(u32, PathBuf, #[source] Box), } diff --git a/crates/exex/exex/src/wal/mod.rs b/crates/exex/exex/src/wal/mod.rs index c2272951005e..ae3a8c41c244 100644 --- a/crates/exex/exex/src/wal/mod.rs +++ b/crates/exex/exex/src/wal/mod.rs @@ -24,7 +24,7 @@ use alloy_primitives::B256; use parking_lot::{RwLock, RwLockReadGuard}; use reth_exex_types::ExExNotification; use reth_tracing::tracing::{debug, instrument}; - +use alloy_primitives::private::serde; /// WAL is a write-ahead log (WAL) that stores the notifications sent to ExExes. /// /// WAL is backed by a directory of binary files represented by [`Storage`] and a block cache @@ -42,7 +42,7 @@ pub struct Wal { impl Wal where - N: NodePrimitives, + N: NodePrimitives + serde::Serialize + serde::de::DeserializeOwned, { /// Creates a new instance of [`Wal`]. pub fn new(directory: impl AsRef) -> WalResult { @@ -93,7 +93,7 @@ struct WalInner { impl WalInner where - N: NodePrimitives, + N: NodePrimitives + serde::Serialize + serde::de::DeserializeOwned, { fn new(directory: impl AsRef) -> WalResult { let mut wal = Self { @@ -214,7 +214,7 @@ pub struct WalHandle { impl WalHandle where - N: NodePrimitives, + N: NodePrimitives + serde::Serialize + serde::de::DeserializeOwned, { /// Returns the notification for the given committed block hash if it exists. pub fn get_committed_notification_by_block_hash( diff --git a/crates/exex/exex/src/wal/storage.rs b/crates/exex/exex/src/wal/storage.rs index bab835505fba..a218078e1eb2 100644 --- a/crates/exex/exex/src/wal/storage.rs +++ b/crates/exex/exex/src/wal/storage.rs @@ -10,6 +10,7 @@ use reth_node_api::NodePrimitives; use reth_primitives::EthPrimitives; use reth_tracing::tracing::debug; use tracing::instrument; +use alloy_primitives::private::serde; static FILE_EXTENSION: &str = "wal"; @@ -26,7 +27,7 @@ pub struct Storage { impl Storage where - N: NodePrimitives, + N: NodePrimitives + serde::Serialize + serde::de::DeserializeOwned, { /// Creates a new instance of [`Storage`] backed by the file at the given path and creates /// it doesn't exist. @@ -141,12 +142,24 @@ where }; let size = file.metadata().map_err(|err| WalError::FileMetadata(file_id, err))?.len(); - // Deserialize using the bincode- and msgpack-compatible serde wrapper - let notification: reth_exex_types::serde_bincode_compat::ExExNotification<'_, N> = - rmp_serde::decode::from_read(&mut file) - .map_err(|err| WalError::Decode(file_id, file_path, err))?; + #[cfg(feature = "bincode-serialization")] + { + // Deserialize using the bincode- and msgpack-compatible serde wrapper + let notification: reth_exex_types::serde_bincode_compat::ExExNotification<'_, N> = + rmp_serde::decode::from_read(&mut file) + .map_err(|err| WalError::Decode(file_id, file_path, err))?; - Ok(Some((notification.into(), size))) + Ok(Some((notification.into(), size))) + } + + #[cfg(not(feature = "bincode-serialization"))] + { + // Deserialize using JSON format + let notification: ExExNotification = serde_json::from_reader(&mut file) + .map_err(|err| WalError::Decode(file_id, file_path, err.into()))?; + + Ok(Some((notification, size))) + } } /// Writes the notification to the file with the given ID. @@ -163,13 +176,24 @@ where let file_path = self.file_path(file_id); debug!(target: "exex::wal::storage", ?file_path, "Writing notification to WAL"); - // Serialize using the bincode- and msgpack-compatible serde wrapper - let notification = - reth_exex_types::serde_bincode_compat::ExExNotification::::from(notification); + #[cfg(feature = "bincode-serialization")] + { + // Serialize using the bincode- and msgpack-compatible serde wrapper + let notification = + reth_exex_types::serde_bincode_compat::ExExNotification::::from(notification); - reth_fs_util::atomic_write_file(&file_path, |file| { - rmp_serde::encode::write(file, ¬ification) - })?; + reth_fs_util::atomic_write_file(&file_path, |file| { + rmp_serde::encode::write(file, ¬ification) + })?; + } + + #[cfg(not(feature = "bincode-serialization"))] + { + // Serialize using JSON format + reth_fs_util::atomic_write_file(&file_path, |file| { + serde_json::to_writer(file, notification) + })?; + } Ok(file_path.metadata().map_err(|err| WalError::FileMetadata(file_id, err))?.len()) } diff --git a/crates/node/builder/src/launch/engine.rs b/crates/node/builder/src/launch/engine.rs index 0bad17244c7f..f2480c77f8dc 100644 --- a/crates/node/builder/src/launch/engine.rs +++ b/crates/node/builder/src/launch/engine.rs @@ -32,7 +32,7 @@ use reth_tracing::tracing::{debug, error, info}; use std::sync::Arc; use tokio::sync::{mpsc::unbounded_channel, oneshot}; use tokio_stream::wrappers::UnboundedReceiverStream; - +use alloy_primitives::private::serde; use crate::{ common::{Attached, LaunchContextWith, WithConfigs}, hooks::NodeHooks, @@ -88,6 +88,7 @@ where LocalPayloadAttributesBuilder: PayloadAttributesBuilder< <::Engine as PayloadTypes>::PayloadAttributes, >, + ::Primitives: serde::Serialize + serde::de::DeserializeOwned, { type Node = NodeHandle, AO>; diff --git a/crates/node/builder/src/launch/exex.rs b/crates/node/builder/src/launch/exex.rs index 337a92cf5234..9141746d5225 100644 --- a/crates/node/builder/src/launch/exex.rs +++ b/crates/node/builder/src/launch/exex.rs @@ -14,6 +14,7 @@ use reth_provider::CanonStateSubscriptions; use reth_tracing::tracing::{debug, info}; use std::{fmt, fmt::Debug}; use tracing::Instrument; +use alloy_primitives::private::serde; use crate::{common::WithConfigs, exex::BoxedLaunchExEx}; @@ -42,7 +43,10 @@ impl ExExLauncher { /// installed. pub async fn launch( self, - ) -> eyre::Result>>> { + ) -> eyre::Result>>> + where + PrimitivesTy: serde::Serialize + serde::de::DeserializeOwned, + { let Self { head, extensions, components, config_container } = self; let head = BlockNumHash::new(head.number, head.hash);