From d9b56b761a245d7bc82977b6f5269f3ea30d7b8a Mon Sep 17 00:00:00 2001 From: James Date: Wed, 10 Sep 2025 08:54:32 -0400 Subject: [PATCH 1/2] fix: improve handling of malformmated blobs :) --- Cargo.toml | 2 +- crates/blobber/src/cache.rs | 78 ++++++++++++++++++++------------ crates/blobber/src/error.rs | 64 ++++++++++++++------------ crates/blobber/src/fetch.rs | 89 +++++++++++++++++++------------------ crates/blobber/src/lib.rs | 2 +- 5 files changed, 132 insertions(+), 103 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 57d6a31..221d40f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["crates/*"] resolver = "2" [workspace.package] -version = "0.10.3" +version = "0.10.4" edition = "2024" rust-version = "1.88" authors = ["init4"] diff --git a/crates/blobber/src/cache.rs b/crates/blobber/src/cache.rs index a5afcd2..dd2c2c7 100644 --- a/crates/blobber/src/cache.rs +++ b/crates/blobber/src/cache.rs @@ -1,4 +1,4 @@ -use crate::{BlobFetcherError, Blobs, FetchResult}; +use crate::{BlobberError, BlobberResult, Blobs, FetchResult}; use alloy::consensus::{SidecarCoder, SimpleCoder, Transaction as _}; use alloy::eips::eip7691::MAX_BLOBS_PER_BLOCK_ELECTRA; use alloy::eips::merge::EPOCH_SLOTS; @@ -13,7 +13,7 @@ use std::{ time::Duration, }; use tokio::sync::{mpsc, oneshot}; -use tracing::{Instrument, debug, error, info, instrument}; +use tracing::{Instrument, debug_span, error, info, instrument, trace}; const BLOB_CACHE_SIZE: u32 = (MAX_BLOBS_PER_BLOCK_ELECTRA * EPOCH_SLOTS) as u32; const CACHE_REQUEST_CHANNEL_SIZE: usize = (MAX_BLOBS_PER_BLOCK_ELECTRA * 2) as usize; @@ -54,7 +54,7 @@ impl CacheHandle { slot: usize, tx_hash: B256, version_hashes: Vec, - ) -> FetchResult { + ) -> BlobberResult { let (resp, receiver) = oneshot::channel(); self.send(CacheInst::Retrieve { @@ -66,7 +66,7 @@ impl CacheHandle { }) .await; - receiver.await.map_err(|_| BlobFetcherError::missing_sidecar(tx_hash)) + receiver.await.map_err(|_| BlobberError::missing_sidecar(tx_hash)) } /// Fetch the blobs using [`Self::fetch_blobs`] and decode them to get the @@ -76,12 +76,12 @@ impl CacheHandle { slot: usize, extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, mut coder: C, - ) -> FetchResult { + ) -> BlobberResult { let tx_hash = extract.tx_hash(); let versioned_hashes = extract .tx .as_eip4844() - .ok_or_else(BlobFetcherError::non_4844_transaction)? + .ok_or_else(BlobberError::non_4844_transaction)? .blob_versioned_hashes() .expect("tx is eip4844"); @@ -89,11 +89,11 @@ impl CacheHandle { coder .decode_all(blobs.as_ref()) - .ok_or_else(BlobFetcherError::blob_decode_error)? + .ok_or_else(BlobberError::blob_decode_error)? .into_iter() .find(|data| keccak256(data) == extract.block_data_hash()) .map(Into::into) - .ok_or_else(|| BlobFetcherError::block_data_not_found(tx_hash)) + .ok_or_else(|| BlobberError::block_data_not_found(tx_hash)) } /// Fetch the blobs using [`Self::fetch_blobs`] and decode them using @@ -102,12 +102,21 @@ impl CacheHandle { &self, slot: usize, extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, - ) -> FetchResult { + ) -> BlobberResult { self.fetch_and_decode_with_coder(slot, extract, SimpleCoder::default()).await } /// Fetch the blobs, decode them using the provided coder, and construct a /// Zenith block from the header and data. + /// + /// # Returns + /// + /// - `Ok(ZenithBlock)` if the block was successfully fetched and + /// decoded. + /// - `Ok(ZenithBlock)` with an EMPTY BLOCK if the block_data could not be + /// decoded (e.g., due to a malformatted blob). + /// - `Err(FetchError)` if there was an unrecoverable error fetching the + /// blobs. pub async fn signet_block_with_coder( &self, host_block_number: u64, @@ -116,13 +125,28 @@ impl CacheHandle { coder: C, ) -> FetchResult { let header = extract.ru_header(host_block_number); - self.fetch_and_decode_with_coder(slot, extract, coder) - .await - .map(|buf| ZenithBlock::from_header_and_data(header, buf)) + let block_data = match self.fetch_and_decode_with_coder(slot, extract, coder).await { + Ok(buf) => buf, + Err(BlobberError::Decode(_)) => { + trace!("Failed to decode block data"); + Bytes::default() + } + Err(BlobberError::Fetch(err)) => return Err(err), + }; + Ok(ZenithBlock::from_header_and_data(header, block_data)) } /// Fetch the blobs, decode them using [`SimpleCoder`], and construct a /// Zenith block from the header and data. + /// + /// # Returns + /// + /// - `Ok(ZenithBlock)` if the block was successfully fetched and + /// decoded. + /// - `Ok(ZenithBlock)` with an EMPTY BLOCK if the block_data could not be + /// decoded (e.g., due to a malformatted blob). + /// - `Err(FetchError)` if there was an unrecoverable error fetching the + /// blobs. pub async fn signet_block( &self, host_block_number: u64, @@ -159,7 +183,7 @@ impl BlobCacher { slot: usize, tx_hash: B256, versioned_hashes: Vec, - ) -> FetchResult { + ) -> BlobberResult { // Cache hit if let Some(blobs) = self.cache.lock().unwrap().get(&(slot, tx_hash)) { info!(target: "signet_blobber::BlobCacher", "Cache hit"); @@ -169,23 +193,21 @@ impl BlobCacher { // Cache miss, use the fetcher to retrieve blobs // Retry fetching blobs up to `FETCH_RETRIES` times for attempt in 1..=FETCH_RETRIES { - let blobs = self.fetcher.fetch_blobs(slot, tx_hash, &versioned_hashes).await; - - match blobs { - Ok(blobs) => { - self.cache.lock().unwrap().insert((slot, tx_hash), blobs.clone()); - return Ok(blobs); - } - Err(BlobFetcherError::Ignorable(e)) => { - debug!(target: "signet_blobber::BlobCacher", attempt, %e, "Blob fetch attempt failed."); - tokio::time::sleep(BETWEEN_RETRIES).await; - continue; - } - Err(e) => return Err(e), // unrecoverable error - } + let Ok(blobs) = self + .fetcher + .fetch_blobs(slot, tx_hash, &versioned_hashes) + .instrument(debug_span!("fetch_blobs_loop", attempt)) + .await + else { + tokio::time::sleep(BETWEEN_RETRIES).await; + continue; + }; + + self.cache.lock().unwrap().insert((slot, tx_hash), blobs.clone()); + return Ok(blobs); } error!(target: "signet_blobber::BlobCacher", "All fetch attempts failed"); - Err(BlobFetcherError::missing_sidecar(tx_hash)) + Err(BlobberError::missing_sidecar(tx_hash)) } /// Processes the cache instructions. diff --git a/crates/blobber/src/error.rs b/crates/blobber/src/error.rs index 47b9ad9..a0ae78b 100644 --- a/crates/blobber/src/error.rs +++ b/crates/blobber/src/error.rs @@ -1,13 +1,19 @@ use alloy::{eips::eip2718::Eip2718Error, primitives::B256}; use reth::transaction_pool::BlobStoreError; -/// Fetch Result -pub type FetchResult = std::result::Result; +/// Result using [`BlobFetcherError`] as the default error type. +pub type BlobberResult = std::result::Result; + +/// Result using [`FetchError`] as the default error type. +pub type FetchResult = BlobberResult; + +/// Result using [`DecodeError`] as the default error type. +pub type DecodeResult = BlobberResult; /// Unrecoverable blob fetching errors. These result in the node shutting /// down. They occur when the blobstore is down or the sidecar is unretrievable. #[derive(Debug, thiserror::Error)] -pub enum UnrecoverableBlobError { +pub enum FetchError { /// Reqwest error #[error(transparent)] Reqwest(#[from] reqwest::Error), @@ -30,7 +36,7 @@ pub enum UnrecoverableBlobError { /// Ignorable blob fetching errors. These result in the block being skipped. #[derive(Debug, thiserror::Error, Copy, Clone)] -pub enum IgnorableBlobError { +pub enum DecodeError { /// Incorrect transaction type error #[error("Non-4844 transaction")] Non4844Transaction, @@ -50,86 +56,86 @@ pub enum IgnorableBlobError { /// Blob fetching errors #[derive(Debug, thiserror::Error)] -pub enum BlobFetcherError { +pub enum BlobberError { /// Unrecoverable blob fetching error #[error(transparent)] - Unrecoverable(#[from] UnrecoverableBlobError), + Fetch(#[from] FetchError), /// Ignorable blob fetching error #[error(transparent)] - Ignorable(#[from] IgnorableBlobError), + Decode(#[from] DecodeError), } -impl BlobFetcherError { +impl BlobberError { /// Returns true if the error is ignorable - pub const fn is_ignorable(&self) -> bool { - matches!(self, Self::Ignorable(_)) + pub const fn is_decode(&self) -> bool { + matches!(self, Self::Decode(_)) } /// Returns true if the error is unrecoverable - pub const fn is_unrecoverable(&self) -> bool { - matches!(self, Self::Unrecoverable(_)) + pub const fn is_fetch(&self) -> bool { + matches!(self, Self::Fetch(_)) } /// Non-4844 transaction error pub fn non_4844_transaction() -> Self { - IgnorableBlobError::Non4844Transaction.into() + DecodeError::Non4844Transaction.into() } /// Blob decode error pub fn blob_decode_error() -> Self { - IgnorableBlobError::BlobDecodeError.into() + DecodeError::BlobDecodeError.into() } /// Blob decode error pub fn block_decode_error(err: Eip2718Error) -> Self { - IgnorableBlobError::BlockDecodeError(err).into() + DecodeError::BlockDecodeError(err).into() } /// Blob decoded, but expected hash not found pub fn block_data_not_found(tx: B256) -> Self { - IgnorableBlobError::BlockDataNotFound(tx).into() + DecodeError::BlockDataNotFound(tx).into() } /// Missing sidecar error pub fn missing_sidecar(tx: B256) -> Self { - UnrecoverableBlobError::MissingSidecar(tx).into() + FetchError::MissingSidecar(tx).into() } /// Blob store error pub fn blob_store(err: BlobStoreError) -> Self { - UnrecoverableBlobError::BlobStore(err).into() + FetchError::BlobStore(err).into() } } -impl From for UnrecoverableBlobError { +impl From for FetchError { fn from(err: BlobStoreError) -> Self { match err { - BlobStoreError::MissingSidecar(tx) => UnrecoverableBlobError::MissingSidecar(tx), - _ => UnrecoverableBlobError::BlobStore(err), + BlobStoreError::MissingSidecar(tx) => FetchError::MissingSidecar(tx), + _ => FetchError::BlobStore(err), } } } -impl From for BlobFetcherError { +impl From for BlobberError { fn from(err: BlobStoreError) -> Self { - Self::Unrecoverable(err.into()) + Self::Fetch(err.into()) } } -impl From for BlobFetcherError { +impl From for BlobberError { fn from(err: reqwest::Error) -> Self { - Self::Unrecoverable(err.into()) + Self::Fetch(err.into()) } } -impl From for BlobFetcherError { +impl From for BlobberError { fn from(err: Eip2718Error) -> Self { - Self::Ignorable(err.into()) + Self::Decode(err.into()) } } -impl From for BlobFetcherError { +impl From for BlobberError { fn from(err: url::ParseError) -> Self { - Self::Unrecoverable(UnrecoverableBlobError::UrlParse(err)) + Self::Fetch(FetchError::UrlParse(err)) } } diff --git a/crates/blobber/src/fetch.rs b/crates/blobber/src/fetch.rs index f5426e8..d19aeb1 100644 --- a/crates/blobber/src/fetch.rs +++ b/crates/blobber/src/fetch.rs @@ -1,5 +1,5 @@ use crate::{ - BlobFetcherBuilder, BlobFetcherError, FetchResult, error::UnrecoverableBlobError, + BlobFetcherBuilder, BlobberError, BlobberResult, FetchResult, error::FetchError, shim::ExtractableChainShim, utils::extract_blobs_from_bundle, }; use alloy::{ @@ -135,31 +135,6 @@ where Self { pool, explorer, client: cl_client, cl_url, pylon_url, slot_calculator } } - /// Get blobs from either the pool or the network and decode them, - /// searching for the expected hash - async fn get_and_decode_blobs( - &self, - slot: usize, - extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, - ) -> FetchResult> { - debug_assert!(extract.tx.is_eip4844(), "Transaction must be of type EIP-4844"); - let hash = extract.tx.tx_hash(); - let versioned_hashes = extract - .tx - .as_eip4844() - .expect("tx is eip4844") - .blob_versioned_hashes() - .expect("tx is eip4844"); - let bz = self.fetch_blobs(slot, extract.tx_hash(), versioned_hashes).await?; - - SimpleCoder::default() - .decode_all(bz.as_ref()) - .ok_or_else(BlobFetcherError::blob_decode_error)? - .into_iter() - .find(|data| keccak256(data) == extract.block_data_hash()) - .ok_or_else(|| BlobFetcherError::block_data_not_found(*hash)) - } - /// Fetch blobs from the local txpool, or fall back to remote sources #[instrument(skip(self))] pub(crate) async fn fetch_blobs( @@ -185,14 +160,14 @@ where Ok(blobs) } else => { - Err(BlobFetcherError::missing_sidecar(tx_hash)) + Err(FetchError::MissingSidecar(tx_hash)) } } } /// Return a blob from the local pool or an error fn get_blobs_from_pool(&self, tx: TxHash) -> FetchResult { - self.pool.get_blob(tx)?.map(Into::into).ok_or_else(|| BlobFetcherError::missing_sidecar(tx)) + self.pool.get_blob(tx)?.map(Into::into).ok_or_else(|| FetchError::MissingSidecar(tx)) } /// Returns the blob from the explorer @@ -207,9 +182,7 @@ where #[instrument(skip_all)] async fn get_blobs_from_pylon(&self, tx: TxHash) -> FetchResult { let Some(url) = &self.pylon_url else { - return Err(BlobFetcherError::Unrecoverable( - UnrecoverableBlobError::ConsensusClientUrlNotSet, - )); + return Err(FetchError::ConsensusClientUrlNotSet); }; let url = url.join(&format!("sidecar/{tx}"))?; @@ -229,14 +202,12 @@ where versioned_hashes: &[B256], ) -> FetchResult { let Some(url) = &self.cl_url else { - return Err(BlobFetcherError::Unrecoverable( - UnrecoverableBlobError::ConsensusClientUrlNotSet, - )); + return Err(FetchError::ConsensusClientUrlNotSet); }; - let url = url.join(&format!("/eth/v1/beacon/blob_sidecars/{slot}")).map_err(|err| { - BlobFetcherError::Unrecoverable(UnrecoverableBlobError::UrlParse(err)) - })?; + let url = url + .join(&format!("/eth/v1/beacon/blob_sidecars/{slot}")) + .map_err(FetchError::UrlParse)?; let response = self.client.get(url).header("accept", "application/json").send().await?; @@ -245,6 +216,31 @@ where extract_blobs_from_bundle(response, versioned_hashes) } + /// Get blobs from either the pool or the network and decode them, + /// searching for the expected hash + async fn get_and_decode_blobs( + &self, + slot: usize, + extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, + ) -> BlobberResult> { + debug_assert!(extract.tx.is_eip4844(), "Transaction must be of type EIP-4844"); + let hash = extract.tx.tx_hash(); + let versioned_hashes = extract + .tx + .as_eip4844() + .expect("tx is eip4844") + .blob_versioned_hashes() + .expect("tx is eip4844"); + let bz = self.fetch_blobs(slot, extract.tx_hash(), versioned_hashes).await?; + + SimpleCoder::default() + .decode_all(bz.as_ref()) + .ok_or_else(BlobberError::blob_decode_error)? + .into_iter() + .find(|data| keccak256(data) == extract.block_data_hash()) + .ok_or_else(|| BlobberError::block_data_not_found(*hash)) + } + /// Get the Zenith block from the extracted event. /// For 4844 transactions, this fetches the transaction's blobs and decodes them. /// For any other type of transactions, it returns a Non4844Transaction error. @@ -254,9 +250,9 @@ where extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, host_block_number: u64, host_block_timestamp: u64, - ) -> FetchResult { + ) -> BlobberResult { if !extract.is_eip4844() { - return Err(BlobFetcherError::non_4844_transaction()); + return Err(BlobberError::non_4844_transaction()); } let header = extract.ru_header(host_block_number); @@ -266,8 +262,14 @@ where .slot_ending_at(host_block_timestamp) .expect("host chain has started"); - let block_data = self.get_and_decode_blobs(slot, extract).await?; - Ok(ZenithBlock::from_header_and_data(header, block_data)) + match self.get_and_decode_blobs(slot, extract).await { + Ok(data) => Ok(ZenithBlock::from_header_and_data(header, data)), + Err(BlobberError::Decode(err)) => { + trace!(%err, "ignorable error in block extraction."); + Ok(ZenithBlock::from_header_and_data(header, vec![])) + } + Err(e) => return Err(e), + } } /// Fetch the [`ZenithBlock`] specified by the outputs. @@ -281,7 +283,7 @@ where pub async fn block_from_outputs( &self, outputs: &Extracts<'_, ExtractableChainShim<'_>>, - ) -> FetchResult> { + ) -> BlobberResult> { if !outputs.contains_block() { return Ok(None); } @@ -295,8 +297,7 @@ where { Ok(block) => Ok(block), Err(err) => { - if err.is_ignorable() { - trace!(%err, "ignorable error in block extraction"); + if err.is_decode() { Ok(None) // ignore ignorable errors } else { Err(err) diff --git a/crates/blobber/src/lib.rs b/crates/blobber/src/lib.rs index f4717f4..66321b5 100644 --- a/crates/blobber/src/lib.rs +++ b/crates/blobber/src/lib.rs @@ -21,7 +21,7 @@ mod config; pub use config::BlobFetcherConfig; mod error; -pub use error::{BlobFetcherError, FetchResult}; +pub use error::{BlobberError, BlobberResult, DecodeError, DecodeResult, FetchError, FetchResult}; mod fetch; pub use fetch::{BlobFetcher, Blobs}; From 500a9dd0720be2040b3518d0ba5d8772a65e01de Mon Sep 17 00:00:00 2001 From: James Date: Wed, 10 Sep 2025 12:13:44 -0400 Subject: [PATCH 2/2] refactor: split coder from blobber --- crates/blobber/src/{ => blobs}/builder.rs | 23 +-- crates/blobber/src/{ => blobs}/cache.rs | 79 ++++------ crates/blobber/src/{ => blobs}/config.rs | 0 crates/blobber/src/blobs/error.rs | 38 +++++ crates/blobber/src/{ => blobs}/fetch.rs | 167 ++++++---------------- crates/blobber/src/blobs/mod.rs | 14 ++ crates/blobber/src/coder/error.rs | 24 ++++ crates/blobber/src/coder/mod.rs | 5 + crates/blobber/src/coder/trait.rs | 104 ++++++++++++++ crates/blobber/src/error.rs | 64 +-------- crates/blobber/src/lib.rs | 19 ++- 11 files changed, 268 insertions(+), 269 deletions(-) rename crates/blobber/src/{ => blobs}/builder.rs (86%) rename crates/blobber/src/{ => blobs}/cache.rs (81%) rename crates/blobber/src/{ => blobs}/config.rs (100%) create mode 100644 crates/blobber/src/blobs/error.rs rename crates/blobber/src/{ => blobs}/fetch.rs (66%) create mode 100644 crates/blobber/src/blobs/mod.rs create mode 100644 crates/blobber/src/coder/error.rs create mode 100644 crates/blobber/src/coder/mod.rs create mode 100644 crates/blobber/src/coder/trait.rs diff --git a/crates/blobber/src/builder.rs b/crates/blobber/src/blobs/builder.rs similarity index 86% rename from crates/blobber/src/builder.rs rename to crates/blobber/src/blobs/builder.rs index 2ab29cf..8571f9a 100644 --- a/crates/blobber/src/builder.rs +++ b/crates/blobber/src/blobs/builder.rs @@ -1,5 +1,4 @@ use crate::{BlobCacher, BlobFetcher, BlobFetcherConfig}; -use init4_bin_base::utils::calc::SlotCalculator; use reth::transaction_pool::TransactionPool; use url::Url; @@ -35,7 +34,6 @@ pub struct BlobFetcherBuilder { client: Option, cl_url: Option, pylon_url: Option, - slot_calculator: Option, } impl BlobFetcherBuilder { @@ -47,7 +45,6 @@ impl BlobFetcherBuilder { client: self.client, cl_url: self.cl_url, pylon_url: self.pylon_url, - slot_calculator: self.slot_calculator, } } @@ -107,22 +104,6 @@ impl BlobFetcherBuilder { self.pylon_url = Some(pylon_url.to_string()); Ok(self) } - - /// Set the slot calculator to use for the extractor. - pub const fn with_slot_calculator( - mut self, - slot_calculator: SlotCalculator, - ) -> BlobFetcherBuilder { - self.slot_calculator = Some(slot_calculator); - self - } - - /// Set the slot calculator to use for the extractor, using the Pecornino - /// host configuration. - pub const fn with_pecornino_slots(mut self) -> BlobFetcherBuilder { - self.slot_calculator = Some(SlotCalculator::pecorino_host()); - self - } } impl BlobFetcherBuilder { @@ -141,9 +122,7 @@ impl BlobFetcherBuilder { let explorer = foundry_blob_explorers::Client::new_with_client(explorer_url, client.clone()); - let slot_calculator = self.slot_calculator.ok_or(BuilderError::MissingSlotCalculator)?; - - Ok(BlobFetcher::new(pool, explorer, client, cl_url, pylon_url, slot_calculator)) + Ok(BlobFetcher::new(pool, explorer, client, cl_url, pylon_url)) } /// Build a [`BlobCacher`] with the provided parameters. diff --git a/crates/blobber/src/cache.rs b/crates/blobber/src/blobs/cache.rs similarity index 81% rename from crates/blobber/src/cache.rs rename to crates/blobber/src/blobs/cache.rs index dd2c2c7..942c5b7 100644 --- a/crates/blobber/src/cache.rs +++ b/crates/blobber/src/blobs/cache.rs @@ -1,13 +1,15 @@ -use crate::{BlobberError, BlobberResult, Blobs, FetchResult}; +use crate::{BlobFetcher, BlobberError, BlobberResult, Blobs, FetchResult}; use alloy::consensus::{SidecarCoder, SimpleCoder, Transaction as _}; use alloy::eips::eip7691::MAX_BLOBS_PER_BLOCK_ELECTRA; use alloy::eips::merge::EPOCH_SLOTS; use alloy::primitives::{B256, Bytes, keccak256}; +use core::fmt; use reth::transaction_pool::TransactionPool; use reth::{network::cache::LruMap, primitives::Receipt}; use signet_extract::ExtractedEvent; use signet_zenith::Zenith::BlockSubmitted; use signet_zenith::ZenithBlock; +use std::marker::PhantomData; use std::{ sync::{Arc, Mutex}, time::Duration, @@ -37,11 +39,13 @@ enum CacheInst { /// Handle for the cache. #[derive(Debug, Clone)] -pub struct CacheHandle { +pub struct CacheHandle { sender: mpsc::Sender, + + _coder: PhantomData, } -impl CacheHandle { +impl CacheHandle { /// Sends a cache instruction. async fn send(&self, inst: CacheInst) { let _ = self.sender.send(inst).await; @@ -71,12 +75,14 @@ impl CacheHandle { /// Fetch the blobs using [`Self::fetch_blobs`] and decode them to get the /// Zenith block data using the provided coder. - pub async fn fetch_and_decode_with_coder( + pub async fn fetch_and_decode( &self, slot: usize, extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, - mut coder: C, - ) -> BlobberResult { + ) -> BlobberResult + where + Coder: SidecarCoder + Default, + { let tx_hash = extract.tx_hash(); let versioned_hashes = extract .tx @@ -87,23 +93,13 @@ impl CacheHandle { let blobs = self.fetch_blobs(slot, tx_hash, versioned_hashes.to_owned()).await?; - coder + Coder::default() .decode_all(blobs.as_ref()) .ok_or_else(BlobberError::blob_decode_error)? .into_iter() .find(|data| keccak256(data) == extract.block_data_hash()) .map(Into::into) - .ok_or_else(|| BlobberError::block_data_not_found(tx_hash)) - } - - /// Fetch the blobs using [`Self::fetch_blobs`] and decode them using - /// [`SimpleCoder`] to get the Zenith block data. - pub async fn fech_and_decode( - &self, - slot: usize, - extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, - ) -> BlobberResult { - self.fetch_and_decode_with_coder(slot, extract, SimpleCoder::default()).await + .ok_or_else(|| BlobberError::block_data_not_found(extract.block_data_hash())) } /// Fetch the blobs, decode them using the provided coder, and construct a @@ -117,15 +113,17 @@ impl CacheHandle { /// decoded (e.g., due to a malformatted blob). /// - `Err(FetchError)` if there was an unrecoverable error fetching the /// blobs. - pub async fn signet_block_with_coder( + pub async fn signet_block( &self, host_block_number: u64, slot: usize, extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, - coder: C, - ) -> FetchResult { + ) -> FetchResult + where + Coder: SidecarCoder + Default, + { let header = extract.ru_header(host_block_number); - let block_data = match self.fetch_and_decode_with_coder(slot, extract, coder).await { + let block_data = match self.fetch_and_decode(slot, extract).await { Ok(buf) => buf, Err(BlobberError::Decode(_)) => { trace!("Failed to decode block data"); @@ -135,44 +133,24 @@ impl CacheHandle { }; Ok(ZenithBlock::from_header_and_data(header, block_data)) } - - /// Fetch the blobs, decode them using [`SimpleCoder`], and construct a - /// Zenith block from the header and data. - /// - /// # Returns - /// - /// - `Ok(ZenithBlock)` if the block was successfully fetched and - /// decoded. - /// - `Ok(ZenithBlock)` with an EMPTY BLOCK if the block_data could not be - /// decoded (e.g., due to a malformatted blob). - /// - `Err(FetchError)` if there was an unrecoverable error fetching the - /// blobs. - pub async fn signet_block( - &self, - host_block_number: u64, - slot: usize, - extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, - ) -> FetchResult { - self.signet_block_with_coder(host_block_number, slot, extract, SimpleCoder::default()).await - } } /// Retrieves blobs and stores them in a cache for later use. pub struct BlobCacher { - fetcher: crate::BlobFetcher, + fetcher: BlobFetcher, cache: Mutex>, } -impl core::fmt::Debug for BlobCacher { - fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { +impl fmt::Debug for BlobCacher { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("BlobCacher").field("fetcher", &self.fetcher).finish_non_exhaustive() } } impl BlobCacher { /// Creates a new `BlobCacher` with the provided extractor and cache size. - pub fn new(fetcher: crate::BlobFetcher) -> Self { + pub fn new(fetcher: BlobFetcher) -> Self { Self { fetcher, cache: LruMap::new(BLOB_CACHE_SIZE).into() } } @@ -237,10 +215,10 @@ impl BlobCacher { /// /// # Panics /// This function will panic if the cache task fails to spawn. - pub fn spawn(self) -> CacheHandle { + pub fn spawn(self) -> CacheHandle { let (sender, inst) = mpsc::channel(CACHE_REQUEST_CHANNEL_SIZE); tokio::spawn(Arc::new(self).task_future(inst)); - CacheHandle { sender } + CacheHandle { sender, _coder: PhantomData } } } @@ -256,7 +234,6 @@ mod tests { rlp::encode, signers::{SignerSync, local::PrivateKeySigner}, }; - use init4_bin_base::utils::calc::SlotCalculator; use reth::primitives::Transaction; use reth_transaction_pool::{ PoolTransaction, TransactionOrigin, @@ -272,7 +249,6 @@ mod tests { let test = signet_constants::KnownChains::Test; let constants: SignetSystemConstants = test.try_into().unwrap(); - let calc = SlotCalculator::new(0, 0, 12); let explorer_url = "https://api.holesky.blobscan.com/"; let client = reqwest::Client::builder().use_rustls_tls(); @@ -308,9 +284,8 @@ mod tests { .with_explorer_url(explorer_url) .with_client_builder(client) .unwrap() - .with_slot_calculator(calc) .build_cache()?; - let handle = cache.spawn(); + let handle = cache.spawn::(); let got = handle .fetch_blobs( diff --git a/crates/blobber/src/config.rs b/crates/blobber/src/blobs/config.rs similarity index 100% rename from crates/blobber/src/config.rs rename to crates/blobber/src/blobs/config.rs diff --git a/crates/blobber/src/blobs/error.rs b/crates/blobber/src/blobs/error.rs new file mode 100644 index 0000000..accd8df --- /dev/null +++ b/crates/blobber/src/blobs/error.rs @@ -0,0 +1,38 @@ +use alloy::primitives::B256; +use reth::transaction_pool::BlobStoreError; + +/// Result using [`FetchError`] as the default error type. +pub type FetchResult = Result; + +/// Unrecoverable blob fetching errors. These result in the node shutting +/// down. They occur when the blobstore is down or the sidecar is unretrievable. +#[derive(Debug, thiserror::Error)] +pub enum FetchError { + /// Reqwest error + #[error(transparent)] + Reqwest(#[from] reqwest::Error), + /// Missing sidecar error + #[error("Cannot retrieve sidecar for {0} from any source")] + MissingSidecar(B256), + /// Reth blobstore error. + #[error(transparent)] + BlobStore(BlobStoreError), + /// Url parse error. + #[error(transparent)] + UrlParse(#[from] url::ParseError), + /// Consensus client URL not set error. + #[error("Consensus client URL not set")] + ConsensusClientUrlNotSet, + /// Pylon client URL not set error. + #[error("Pylon client URL not set")] + PylonClientUrlNotSet, +} + +impl From for FetchError { + fn from(err: BlobStoreError) -> Self { + match err { + BlobStoreError::MissingSidecar(tx) => FetchError::MissingSidecar(tx), + _ => FetchError::BlobStore(err), + } + } +} diff --git a/crates/blobber/src/fetch.rs b/crates/blobber/src/blobs/fetch.rs similarity index 66% rename from crates/blobber/src/fetch.rs rename to crates/blobber/src/blobs/fetch.rs index d19aeb1..d6e20c9 100644 --- a/crates/blobber/src/fetch.rs +++ b/crates/blobber/src/blobs/fetch.rs @@ -1,22 +1,13 @@ -use crate::{ - BlobFetcherBuilder, BlobberError, BlobberResult, FetchResult, error::FetchError, - shim::ExtractableChainShim, utils::extract_blobs_from_bundle, -}; +use crate::{BlobFetcherBuilder, FetchError, FetchResult, utils::extract_blobs_from_bundle}; use alloy::{ - consensus::{Blob, SidecarCoder, SimpleCoder, Transaction as _}, - eips::eip7594::BlobTransactionSidecarVariant, - primitives::{B256, TxHash, keccak256}, -}; -use init4_bin_base::utils::calc::SlotCalculator; -use reth::{ - primitives::Receipt, rpc::types::beacon::sidecar::BeaconBlobBundle, - transaction_pool::TransactionPool, + consensus::{Blob, BlobTransactionSidecar}, + eips::eip7594::{BlobTransactionSidecarEip7594, BlobTransactionSidecarVariant}, + primitives::{B256, TxHash}, }; -use signet_extract::{ExtractedEvent, Extracts}; -use signet_zenith::{Zenith::BlockSubmitted, ZenithBlock}; +use reth::{rpc::types::beacon::sidecar::BeaconBlobBundle, transaction_pool::TransactionPool}; use std::{ops::Deref, sync::Arc}; use tokio::select; -use tracing::{instrument, trace}; +use tracing::instrument; /// Blobs which may be a local shared sidecar, or a list of blobs from an /// external source. @@ -30,6 +21,42 @@ pub enum Blobs { Other(Arc>), } +impl From> for Blobs { + fn from(blobs: Vec) -> Self { + Self::Other(Arc::new(blobs)) + } +} + +impl From>> for Blobs { + fn from(blobs: Arc>) -> Self { + Blobs::Other(blobs) + } +} + +impl From for Blobs { + fn from(sidecar: BlobTransactionSidecarVariant) -> Self { + Self::FromPool(Arc::new(sidecar)) + } +} + +impl From> for Blobs { + fn from(sidecar: Arc) -> Self { + Self::FromPool(sidecar) + } +} + +impl From for Blobs { + fn from(sidecar: BlobTransactionSidecar) -> Self { + Self::FromPool(Arc::new(BlobTransactionSidecarVariant::Eip4844(sidecar))) + } +} + +impl From for Blobs { + fn from(sidecar: BlobTransactionSidecarEip7594) -> Self { + Self::FromPool(Arc::new(BlobTransactionSidecarVariant::Eip7594(sidecar))) + } +} + impl AsRef> for Blobs { fn as_ref(&self) -> &Vec { match self { @@ -76,18 +103,6 @@ impl Blobs { } } -impl From> for Blobs { - fn from(sidecar: Arc) -> Self { - Blobs::FromPool(sidecar) - } -} - -impl From> for Blobs { - fn from(blobs: Vec) -> Self { - Blobs::Other(Arc::new(blobs)) - } -} - /// Decoder is generic over a Pool and handles fetching and decoding blob /// transactions. Decoder attempts to fetch from the Pool first and then /// queries an explorer if it can't find the blob. When Decoder does find a @@ -98,7 +113,6 @@ pub struct BlobFetcher { client: reqwest::Client, cl_url: Option, pylon_url: Option, - slot_calculator: SlotCalculator, } impl core::fmt::Debug for BlobFetcher { @@ -130,9 +144,8 @@ where cl_client: reqwest::Client, cl_url: Option, pylon_url: Option, - slot_calculator: SlotCalculator, ) -> Self { - Self { pool, explorer, client: cl_client, cl_url, pylon_url, slot_calculator } + Self { pool, explorer, client: cl_client, cl_url, pylon_url } } /// Fetch blobs from the local txpool, or fall back to remote sources @@ -215,103 +228,13 @@ where extract_blobs_from_bundle(response, versioned_hashes) } - - /// Get blobs from either the pool or the network and decode them, - /// searching for the expected hash - async fn get_and_decode_blobs( - &self, - slot: usize, - extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, - ) -> BlobberResult> { - debug_assert!(extract.tx.is_eip4844(), "Transaction must be of type EIP-4844"); - let hash = extract.tx.tx_hash(); - let versioned_hashes = extract - .tx - .as_eip4844() - .expect("tx is eip4844") - .blob_versioned_hashes() - .expect("tx is eip4844"); - let bz = self.fetch_blobs(slot, extract.tx_hash(), versioned_hashes).await?; - - SimpleCoder::default() - .decode_all(bz.as_ref()) - .ok_or_else(BlobberError::blob_decode_error)? - .into_iter() - .find(|data| keccak256(data) == extract.block_data_hash()) - .ok_or_else(|| BlobberError::block_data_not_found(*hash)) - } - - /// Get the Zenith block from the extracted event. - /// For 4844 transactions, this fetches the transaction's blobs and decodes them. - /// For any other type of transactions, it returns a Non4844Transaction error. - #[tracing::instrument(skip(self, extract), fields(tx = %extract.tx_hash(), url = self.explorer.baseurl()))] - async fn get_signet_block( - &self, - extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>, - host_block_number: u64, - host_block_timestamp: u64, - ) -> BlobberResult { - if !extract.is_eip4844() { - return Err(BlobberError::non_4844_transaction()); - } - - let header = extract.ru_header(host_block_number); - - let slot = self - .slot_calculator - .slot_ending_at(host_block_timestamp) - .expect("host chain has started"); - - match self.get_and_decode_blobs(slot, extract).await { - Ok(data) => Ok(ZenithBlock::from_header_and_data(header, data)), - Err(BlobberError::Decode(err)) => { - trace!(%err, "ignorable error in block extraction."); - Ok(ZenithBlock::from_header_and_data(header, vec![])) - } - Err(e) => return Err(e), - } - } - - /// Fetch the [`ZenithBlock`] specified by the outputs. - /// - /// ## Returns - /// - /// - `Ok(None)` - If the outputs do not contain a [`BlockSubmitted`]. - /// - `Ok(Some(block))` - If the block was successfully fetched and decoded. - /// - `Err(err)` - If an error occurred while fetching or decoding the - /// block. - pub async fn block_from_outputs( - &self, - outputs: &Extracts<'_, ExtractableChainShim<'_>>, - ) -> BlobberResult> { - if !outputs.contains_block() { - return Ok(None); - } - - let tx = outputs.submitted.as_ref().expect("checked by contains_block"); - - match self - .get_signet_block(tx, outputs.host_block_number(), outputs.host_block_timestamp()) - .await - .map(Some) - { - Ok(block) => Ok(block), - Err(err) => { - if err.is_decode() { - Ok(None) // ignore ignorable errors - } else { - Err(err) - } - } - } - } } #[cfg(test)] mod tests { use super::*; use alloy::{ - consensus::{SidecarBuilder, SignableTransaction as _, TxEip2930}, + consensus::{SidecarBuilder, SignableTransaction as _, SimpleCoder, TxEip2930}, eips::Encodable2718, primitives::{TxKind, U256, bytes}, rlp::encode, @@ -332,7 +255,6 @@ mod tests { let test = signet_constants::KnownChains::Test; let constants: SignetSystemConstants = test.try_into().unwrap(); - let calc = SlotCalculator::new(0, 0, 12); let explorer_url = "https://api.holesky.blobscan.com/"; let client = reqwest::Client::builder().use_rustls_tls(); @@ -342,7 +264,6 @@ mod tests { .with_explorer_url(explorer_url) .with_client_builder(client) .unwrap() - .with_slot_calculator(calc) .build()?; let tx = Transaction::Eip2930(TxEip2930 { diff --git a/crates/blobber/src/blobs/mod.rs b/crates/blobber/src/blobs/mod.rs new file mode 100644 index 0000000..2c74c2d --- /dev/null +++ b/crates/blobber/src/blobs/mod.rs @@ -0,0 +1,14 @@ +mod builder; +pub use builder::{BlobFetcherBuilder, BuilderError as BlobFetcherBuilderError}; + +mod cache; +pub use cache::{BlobCacher, CacheHandle}; + +mod config; +pub use config::BlobFetcherConfig; + +mod error; +pub use error::{FetchError, FetchResult}; + +mod fetch; +pub use fetch::{BlobFetcher, Blobs}; diff --git a/crates/blobber/src/coder/error.rs b/crates/blobber/src/coder/error.rs new file mode 100644 index 0000000..16c3b24 --- /dev/null +++ b/crates/blobber/src/coder/error.rs @@ -0,0 +1,24 @@ +use alloy::{eips::eip2718::Eip2718Error, primitives::B256}; + +/// Result using [`DecodeError`] as the default error type. +pub type DecodeResult = Result; + +/// Ignorable blob fetching errors. These result in the block being skipped. +#[derive(Debug, thiserror::Error, Copy, Clone)] +pub enum DecodeError { + /// Incorrect transaction type error + #[error("Non-4844 transaction")] + Non4844Transaction, + /// Decoding error from the internal [`SimpleCoder`]. This indicates the + /// blobs are not formatted in the simple coder format. + /// + /// [`SimpleCoder`]: alloy::consensus::SimpleCoder + #[error("Decoding failed")] + BlobDecodeError, + /// Block data not found in decoded blob + #[error("Block data not found in decoded blob. Expected block hash: {0}")] + BlockDataNotFound(B256), + /// Error while decoding block from blob + #[error("Block decode error: {0}")] + BlockDecodeError(#[from] Eip2718Error), +} diff --git a/crates/blobber/src/coder/mod.rs b/crates/blobber/src/coder/mod.rs new file mode 100644 index 0000000..7669ca5 --- /dev/null +++ b/crates/blobber/src/coder/mod.rs @@ -0,0 +1,5 @@ +mod error; +pub use error::{DecodeError, DecodeResult}; + +mod r#trait; +pub use r#trait::SignetBlockDecoder; diff --git a/crates/blobber/src/coder/trait.rs b/crates/blobber/src/coder/trait.rs new file mode 100644 index 0000000..0044e4b --- /dev/null +++ b/crates/blobber/src/coder/trait.rs @@ -0,0 +1,104 @@ +use crate::{Blobs, DecodeError, DecodeResult}; +use alloy::{ + consensus::SidecarCoder, + primitives::{B256, Bytes, keccak256}, +}; +use signet_zenith::{Zenith, ZenithBlock}; + +/// A trait for decoding blocks from blob data. +pub trait SignetBlockDecoder { + /// Decodes a block from the given blob bytes. + fn decode_block( + &mut self, + blobs: Blobs, + header: Zenith::BlockHeader, + data_hash: B256, + ) -> DecodeResult; + + /// Decodes a block from the given blob bytes, or returns an empty block. + fn decode_block_or_default( + &mut self, + blobs: Blobs, + header: Zenith::BlockHeader, + data_hash: B256, + ) -> ZenithBlock { + self.decode_block(blobs, header, data_hash) + .unwrap_or_else(|_| ZenithBlock::from_header_and_data(header, Bytes::new())) + } +} + +impl SignetBlockDecoder for T +where + T: SidecarCoder, +{ + fn decode_block( + &mut self, + blobs: Blobs, + header: Zenith::BlockHeader, + data_hash: B256, + ) -> DecodeResult { + let block_data = self + .decode_all(blobs.as_ref()) + .ok_or(DecodeError::BlobDecodeError)? + .into_iter() + .find(|data| keccak256(data) == data_hash) + .map(Into::::into) + .ok_or(DecodeError::BlockDataNotFound(data_hash))?; + Ok(ZenithBlock::from_header_and_data(header, block_data)) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::{Blobs, utils::tests::PYLON_BLOB_RESPONSE}; + use alloy::{ + consensus::{BlobTransactionSidecar, SimpleCoder}, + primitives::{Address, B256, U256, b256}, + }; + use signet_zenith::Zenith; + + #[test] + fn it_decodes() { + let sidecar: BlobTransactionSidecar = + serde_json::from_str::(PYLON_BLOB_RESPONSE).unwrap(); + let blobs = Blobs::from(sidecar); + + let block = SimpleCoder::default() + .decode_block( + blobs, + Zenith::BlockHeader { + rollupChainId: U256::ZERO, + hostBlockNumber: U256::ZERO, + gasLimit: U256::ZERO, + rewardAddress: Address::ZERO, + blockDataHash: B256::ZERO, + }, + b256!("0xfd93968f4e7d4d4451f211980f2fec4f0c32e67fae63a70ca90024b54a70e9ee"), + ) + .unwrap(); + + assert_eq!(block.transactions().len(), 1); + } + + #[test] + fn it_decodes_defaultly() { + let sidecar: BlobTransactionSidecar = + serde_json::from_str::(PYLON_BLOB_RESPONSE).unwrap(); + let blobs = Blobs::from(sidecar); + + let block = SimpleCoder::default().decode_block_or_default( + blobs, + Zenith::BlockHeader { + rollupChainId: U256::ZERO, + hostBlockNumber: U256::ZERO, + gasLimit: U256::ZERO, + rewardAddress: Address::ZERO, + blockDataHash: B256::ZERO, + }, + B256::ZERO, + ); + + assert_eq!(block.transactions().len(), 0); + } +} diff --git a/crates/blobber/src/error.rs b/crates/blobber/src/error.rs index a0ae78b..e5b4142 100644 --- a/crates/blobber/src/error.rs +++ b/crates/blobber/src/error.rs @@ -1,59 +1,10 @@ +use crate::{DecodeError, FetchError}; use alloy::{eips::eip2718::Eip2718Error, primitives::B256}; use reth::transaction_pool::BlobStoreError; /// Result using [`BlobFetcherError`] as the default error type. pub type BlobberResult = std::result::Result; -/// Result using [`FetchError`] as the default error type. -pub type FetchResult = BlobberResult; - -/// Result using [`DecodeError`] as the default error type. -pub type DecodeResult = BlobberResult; - -/// Unrecoverable blob fetching errors. These result in the node shutting -/// down. They occur when the blobstore is down or the sidecar is unretrievable. -#[derive(Debug, thiserror::Error)] -pub enum FetchError { - /// Reqwest error - #[error(transparent)] - Reqwest(#[from] reqwest::Error), - /// Missing sidecar error - #[error("Cannot retrieve sidecar for {0} from any source")] - MissingSidecar(B256), - /// Reth blobstore error. - #[error(transparent)] - BlobStore(BlobStoreError), - /// Url parse error. - #[error(transparent)] - UrlParse(#[from] url::ParseError), - /// Consensus client URL not set error. - #[error("Consensus client URL not set")] - ConsensusClientUrlNotSet, - /// Pylon client URL not set error. - #[error("Pylon client URL not set")] - PylonClientUrlNotSet, -} - -/// Ignorable blob fetching errors. These result in the block being skipped. -#[derive(Debug, thiserror::Error, Copy, Clone)] -pub enum DecodeError { - /// Incorrect transaction type error - #[error("Non-4844 transaction")] - Non4844Transaction, - /// Decoding error from the internal [`SimpleCoder`]. This indicates the - /// blobs are not formatted in the simple coder format. - /// - /// [`SimpleCoder`]: alloy::consensus::SimpleCoder - #[error("Decoding failed")] - BlobDecodeError, - /// Block data not found in decoded blob - #[error("Block data not found in decoded blob. Expected block hash: {0}")] - BlockDataNotFound(B256), - /// Error while decoding block from blob - #[error("Block decode error: {0}")] - BlockDecodeError(#[from] Eip2718Error), -} - /// Blob fetching errors #[derive(Debug, thiserror::Error)] pub enum BlobberError { @@ -92,8 +43,8 @@ impl BlobberError { } /// Blob decoded, but expected hash not found - pub fn block_data_not_found(tx: B256) -> Self { - DecodeError::BlockDataNotFound(tx).into() + pub fn block_data_not_found(data_hash: B256) -> Self { + DecodeError::BlockDataNotFound(data_hash).into() } /// Missing sidecar error @@ -107,15 +58,6 @@ impl BlobberError { } } -impl From for FetchError { - fn from(err: BlobStoreError) -> Self { - match err { - BlobStoreError::MissingSidecar(tx) => FetchError::MissingSidecar(tx), - _ => FetchError::BlobStore(err), - } - } -} - impl From for BlobberError { fn from(err: BlobStoreError) -> Self { Self::Fetch(err.into()) diff --git a/crates/blobber/src/lib.rs b/crates/blobber/src/lib.rs index 66321b5..4d4b7bd 100644 --- a/crates/blobber/src/lib.rs +++ b/crates/blobber/src/lib.rs @@ -11,20 +11,17 @@ #![deny(unused_must_use, rust_2018_idioms)] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -mod builder; -pub use builder::{BlobFetcherBuilder, BuilderError as BlobFetcherBuilderError}; +mod blobs; +pub use blobs::{ + BlobCacher, BlobFetcher, BlobFetcherBuilder, BlobFetcherBuilderError, BlobFetcherConfig, Blobs, + CacheHandle, FetchError, FetchResult, +}; -mod cache; -pub use cache::{BlobCacher, CacheHandle}; - -mod config; -pub use config::BlobFetcherConfig; +mod coder; +pub use coder::{DecodeError, DecodeResult, SignetBlockDecoder}; mod error; -pub use error::{BlobberError, BlobberResult, DecodeError, DecodeResult, FetchError, FetchResult}; - -mod fetch; -pub use fetch::{BlobFetcher, Blobs}; +pub use error::{BlobberError, BlobberResult}; mod shim; pub use shim::ExtractableChainShim;