Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add bincode-serialization/deserialization feature to exexs notifications #14809

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions crates/exex/exex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ reth-chainspec.workspace = true
reth-config.workspace = true
reth-evm.workspace = true
reth-exex-types = { workspace = true, features = ["serde", "serde-bincode-compat"] }
serde_json.workspace = true
serde_with.workspace = true
reth-fs-util.workspace = true
reth-metrics.workspace = true
reth-node-api.workspace = true
Expand Down Expand Up @@ -81,3 +83,6 @@ serde = [
"reth-primitives-traits/serde",
"reth-prune-types/serde",
]
bincode-serialization = [
"reth-exex-types/serde-bincode-compat",
]
9 changes: 6 additions & 3 deletions crates/exex/exex/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use reth_provider::BlockReader;
use reth_tasks::TaskExecutor;
use std::fmt::Debug;
use tokio::sync::mpsc::{error::SendError, UnboundedSender};

use alloy_primitives::private::serde;
/// Captures the context that an `ExEx` has access to.
///
/// This type wraps various node components that the `ExEx` has access to.
Expand Down Expand Up @@ -64,7 +64,7 @@ where
Node: FullNodeComponents,
Node::Provider: Debug + BlockReader,
Node::Executor: Debug,
Node::Types: NodeTypes<Primitives: NodePrimitives>,
Node::Types: NodeTypes<Primitives: NodePrimitives + serde::Serialize + serde::de::DeserializeOwned>,
{
/// Returns dynamic version of the context
pub fn into_dyn(self) -> ExExContextDyn<PrimitivesTy<Node::Types>> {
Expand All @@ -75,7 +75,7 @@ where
impl<Node> ExExContext<Node>
where
Node: FullNodeComponents,
Node::Types: NodeTypes<Primitives: NodePrimitives>,
Node::Types: NodeTypes<Primitives: NodePrimitives + serde::Serialize + serde::de::DeserializeOwned>,
{
/// Returns the transaction pool of the node.
pub fn pool(&self) -> &Node::Pool {
Expand Down Expand Up @@ -143,6 +143,8 @@ where
#[cfg(test)]
mod tests {
use crate::ExExContext;
use alloy_primitives::private::serde;
use reth_node_api::{NodePrimitives, NodeTypes};
use reth_exex_types::ExExHead;
use reth_node_api::FullNodeComponents;
use reth_provider::BlockReader;
Expand All @@ -158,6 +160,7 @@ mod tests {
impl<Node: FullNodeComponents> ExEx<Node>
where
Node::Provider: BlockReader,
Node::Types: NodeTypes<Primitives: NodePrimitives + serde::Serialize + serde::de::DeserializeOwned>,
{
async fn _test_bounds(mut self) -> eyre::Result<()> {
self.ctx.pool();
Expand Down
6 changes: 3 additions & 3 deletions crates/exex/exex/src/dyn_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use reth_primitives::EthPrimitives;
use reth_provider::BlockReader;
use std::fmt::Debug;
use tokio::sync::mpsc;

use alloy_primitives::private::serde;
use crate::{ExExContext, ExExEvent, ExExNotificationsStream};

// TODO(0xurb) - add `node` after abstractions
Expand Down Expand Up @@ -52,15 +52,15 @@ impl<N: NodePrimitives> Debug for ExExContextDyn<N> {

impl<Node> From<ExExContext<Node>> for ExExContextDyn<PrimitivesTy<Node::Types>>
where
Node: FullNodeComponents<Types: NodeTypes<Primitives: NodePrimitives>>,
Node: FullNodeComponents<Types: NodeTypes<Primitives: NodePrimitives + serde::Serialize + serde::de::DeserializeOwned>>,
Node::Provider: Debug + BlockReader,
Node::Executor: Debug,
{
fn from(ctx: ExExContext<Node>) -> Self {
let config = ctx.config.map_chainspec(|chainspec| {
Box::new(chainspec) as Box<dyn EthChainSpec<Header = HeaderTy<Node::Types>>>
});
let notifications = Box::new(ctx.notifications) as Box<_>;
let notifications = Box::new(ctx.notifications);

Self {
head: ctx.head,
Expand Down
5 changes: 5 additions & 0 deletions crates/exex/exex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]

// Acknowledge intentional dependencies
// #[cfg(feature = "bincode-serialization")]
use rmp_serde as _;
use serde_with as _;

mod backfill;
pub use backfill::*;

Expand Down
6 changes: 3 additions & 3 deletions crates/exex/exex/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tokio::sync::{
watch,
};
use tokio_util::sync::{PollSendError, PollSender, ReusableBoxFuture};

use alloy_primitives::private::serde;
/// Default max size of the internal state notifications buffer.
///
/// 1024 notifications in the buffer is 3.5 hours of mainnet blocks,
Expand Down Expand Up @@ -348,7 +348,7 @@ where
impl<P, N> ExExManager<P, N>
where
P: HeaderProvider,
N: NodePrimitives,
N: NodePrimitives + serde::Serialize + serde::de::DeserializeOwned,
{
/// Finalizes the WAL according to the passed finalized header.
///
Expand Down Expand Up @@ -422,7 +422,7 @@ where
impl<P, N> Future for ExExManager<P, N>
where
P: HeaderProvider + Unpin + 'static,
N: NodePrimitives,
N: NodePrimitives + serde::Serialize + serde::de::DeserializeOwned,
{
type Output = eyre::Result<()>;

Expand Down
9 changes: 5 additions & 4 deletions crates/exex/exex/src/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::{
task::{ready, Context, Poll},
};
use tokio::sync::mpsc::Receiver;
use alloy_primitives::private::serde;

/// A stream of [`ExExNotification`]s. The stream will emit notifications for all blocks. If the
/// stream is configured with a head via [`ExExNotifications::set_with_head`] or
Expand Down Expand Up @@ -105,7 +106,7 @@ where
impl<P, E> ExExNotificationsStream<E::Primitives> for ExExNotifications<P, E>
where
P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block> + serde::Serialize + serde::de::DeserializeOwned>
+ Clone
+ Unpin
+ 'static,
Expand Down Expand Up @@ -157,7 +158,7 @@ where
impl<P, E> Stream for ExExNotifications<P, E>
where
P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block> + serde::Serialize + serde::de::DeserializeOwned>
+ Clone
+ Unpin
+ 'static,
Expand Down Expand Up @@ -301,7 +302,7 @@ where
impl<P, E> ExExNotificationsWithHead<P, E>
where
P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block> + serde::Serialize + serde::de::DeserializeOwned>
+ Clone
+ Unpin
+ 'static,
Expand Down Expand Up @@ -381,7 +382,7 @@ where
impl<P, E> Stream for ExExNotificationsWithHead<P, E>
where
P: BlockReader + HeaderProvider + StateProviderFactory + Clone + Unpin + 'static,
E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block>>
E: BlockExecutorProvider<Primitives: NodePrimitives<Block = P::Block> + serde::Serialize + serde::de::DeserializeOwned>
+ Clone
+ Unpin
+ 'static,
Expand Down
2 changes: 1 addition & 1 deletion crates/exex/exex/src/wal/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ pub enum WalError {
FileNotFound(u32),
/// Decode error
#[error("failed to decode notification {0} from {1}: {2}")]
Decode(u32, PathBuf, rmp_serde::decode::Error),
Decode(u32, PathBuf, #[source] Box<dyn std::error::Error + Send + Sync>),
}
8 changes: 4 additions & 4 deletions crates/exex/exex/src/wal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use alloy_primitives::B256;
use parking_lot::{RwLock, RwLockReadGuard};
use reth_exex_types::ExExNotification;
use reth_tracing::tracing::{debug, instrument};

use alloy_primitives::private::serde;
/// WAL is a write-ahead log (WAL) that stores the notifications sent to ExExes.
///
/// WAL is backed by a directory of binary files represented by [`Storage`] and a block cache
Expand All @@ -42,7 +42,7 @@ pub struct Wal<N: NodePrimitives = EthPrimitives> {

impl<N> Wal<N>
where
N: NodePrimitives,
N: NodePrimitives + serde::Serialize + serde::de::DeserializeOwned,
{
/// Creates a new instance of [`Wal`].
pub fn new(directory: impl AsRef<Path>) -> WalResult<Self> {
Expand Down Expand Up @@ -93,7 +93,7 @@ struct WalInner<N: NodePrimitives> {

impl<N> WalInner<N>
where
N: NodePrimitives,
N: NodePrimitives + serde::Serialize + serde::de::DeserializeOwned,
{
fn new(directory: impl AsRef<Path>) -> WalResult<Self> {
let mut wal = Self {
Expand Down Expand Up @@ -214,7 +214,7 @@ pub struct WalHandle<N: NodePrimitives> {

impl<N> WalHandle<N>
where
N: NodePrimitives,
N: NodePrimitives + serde::Serialize + serde::de::DeserializeOwned,
{
/// Returns the notification for the given committed block hash if it exists.
pub fn get_committed_notification_by_block_hash(
Expand Down
48 changes: 36 additions & 12 deletions crates/exex/exex/src/wal/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use reth_node_api::NodePrimitives;
use reth_primitives::EthPrimitives;
use reth_tracing::tracing::debug;
use tracing::instrument;
use alloy_primitives::private::serde;

static FILE_EXTENSION: &str = "wal";

Expand All @@ -26,7 +27,7 @@ pub struct Storage<N: NodePrimitives = EthPrimitives> {

impl<N> Storage<N>
where
N: NodePrimitives,
N: NodePrimitives + serde::Serialize + serde::de::DeserializeOwned,
{
/// Creates a new instance of [`Storage`] backed by the file at the given path and creates
/// it doesn't exist.
Expand Down Expand Up @@ -141,12 +142,24 @@ where
};
let size = file.metadata().map_err(|err| WalError::FileMetadata(file_id, err))?.len();

// Deserialize using the bincode- and msgpack-compatible serde wrapper
let notification: reth_exex_types::serde_bincode_compat::ExExNotification<'_, N> =
rmp_serde::decode::from_read(&mut file)
.map_err(|err| WalError::Decode(file_id, file_path, err))?;
#[cfg(feature = "bincode-serialization")]
{
// Deserialize using the bincode- and msgpack-compatible serde wrapper
let notification: reth_exex_types::serde_bincode_compat::ExExNotification<'_, N> =
rmp_serde::decode::from_read(&mut file)
.map_err(|err| WalError::Decode(file_id, file_path, err))?;

Ok(Some((notification.into(), size)))
Ok(Some((notification.into(), size)))
}

#[cfg(not(feature = "bincode-serialization"))]
{
// Deserialize using JSON format
let notification: ExExNotification<N> = serde_json::from_reader(&mut file)
.map_err(|err| WalError::Decode(file_id, file_path, err.into()))?;

Ok(Some((notification, size)))
}
}

/// Writes the notification to the file with the given ID.
Expand All @@ -163,13 +176,24 @@ where
let file_path = self.file_path(file_id);
debug!(target: "exex::wal::storage", ?file_path, "Writing notification to WAL");

// Serialize using the bincode- and msgpack-compatible serde wrapper
let notification =
reth_exex_types::serde_bincode_compat::ExExNotification::<N>::from(notification);
#[cfg(feature = "bincode-serialization")]
{
// Serialize using the bincode- and msgpack-compatible serde wrapper
let notification =
reth_exex_types::serde_bincode_compat::ExExNotification::<N>::from(notification);

reth_fs_util::atomic_write_file(&file_path, |file| {
rmp_serde::encode::write(file, &notification)
})?;
reth_fs_util::atomic_write_file(&file_path, |file| {
rmp_serde::encode::write(file, &notification)
})?;
}

#[cfg(not(feature = "bincode-serialization"))]
{
// Serialize using JSON format
reth_fs_util::atomic_write_file(&file_path, |file| {
serde_json::to_writer(file, notification)
})?;
}

Ok(file_path.metadata().map_err(|err| WalError::FileMetadata(file_id, err))?.len())
}
Expand Down
3 changes: 2 additions & 1 deletion crates/node/builder/src/launch/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use reth_tracing::tracing::{debug, error, info};
use std::sync::Arc;
use tokio::sync::{mpsc::unbounded_channel, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;

use alloy_primitives::private::serde;
use crate::{
common::{Attached, LaunchContextWith, WithConfigs},
hooks::NodeHooks,
Expand Down Expand Up @@ -88,6 +88,7 @@ where
LocalPayloadAttributesBuilder<Types::ChainSpec>: PayloadAttributesBuilder<
<<Types as NodeTypesWithEngine>::Engine as PayloadTypes>::PayloadAttributes,
>,
<Types as reth_node_api::NodeTypes>::Primitives: serde::Serialize + serde::de::DeserializeOwned,
{
type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;

Expand Down
6 changes: 5 additions & 1 deletion crates/node/builder/src/launch/exex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use reth_provider::CanonStateSubscriptions;
use reth_tracing::tracing::{debug, info};
use std::{fmt, fmt::Debug};
use tracing::Instrument;
use alloy_primitives::private::serde;

use crate::{common::WithConfigs, exex::BoxedLaunchExEx};

Expand Down Expand Up @@ -42,7 +43,10 @@ impl<Node: FullNodeComponents + Clone> ExExLauncher<Node> {
/// installed.
pub async fn launch(
self,
) -> eyre::Result<Option<ExExManagerHandle<PrimitivesTy<Node::Types>>>> {
) -> eyre::Result<Option<ExExManagerHandle<PrimitivesTy<Node::Types>>>>
where
PrimitivesTy<Node::Types>: serde::Serialize + serde::de::DeserializeOwned,
{
let Self { head, extensions, components, config_container } = self;
let head = BlockNumHash::new(head.number, head.hash);

Expand Down
Loading