diff --git a/Cargo.lock b/Cargo.lock index d8e4e42ed5a..5f062efa2b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10390,6 +10390,7 @@ dependencies = [ "alloy-rpc-types-eth", "alloy-sol-types", "alloy-transport", + "data-encoding", "derive_more", "futures", "itertools 0.14.0", diff --git a/crates/rpc/rpc-eth-api/src/filter.rs b/crates/rpc/rpc-eth-api/src/filter.rs index f6de812b3f5..f7bc98ce23f 100644 --- a/crates/rpc/rpc-eth-api/src/filter.rs +++ b/crates/rpc/rpc-eth-api/src/filter.rs @@ -3,6 +3,7 @@ use alloy_json_rpc::RpcObject; use alloy_rpc_types_eth::{Filter, FilterChanges, FilterId, Log, PendingTransactionFilterKind}; use jsonrpsee::{core::RpcResult, proc_macros::rpc}; +use reth_rpc_eth_types::logs_utils::Cursor; use std::future::Future; /// Rpc Interface for poll-based ethereum filter API. @@ -39,6 +40,14 @@ pub trait EthFilterApi { /// Returns logs matching given filter object. #[method(name = "getLogs")] async fn logs(&self, filter: Filter) -> RpcResult>; + + /// Returns logs matching given filter object, alongside a cursor for pagination. + #[method(name = "getLogsWithCursor")] + async fn logs_with_cursor( + &self, + filter: Filter, + cursor: Option, + ) -> RpcResult<(Vec, Option)>; } /// Limits for logs queries diff --git a/crates/rpc/rpc-eth-types/Cargo.toml b/crates/rpc/rpc-eth-types/Cargo.toml index 7eed1aa3db1..51dec6dd79c 100644 --- a/crates/rpc/rpc-eth-types/Cargo.toml +++ b/crates/rpc/rpc-eth-types/Cargo.toml @@ -62,6 +62,7 @@ schnellru.workspace = true rand.workspace = true tracing.workspace = true itertools.workspace = true +data-encoding.workspace = true [dev-dependencies] serde_json.workspace = true diff --git a/crates/rpc/rpc-eth-types/src/logs_utils.rs b/crates/rpc/rpc-eth-types/src/logs_utils.rs index 1d93de4bb1f..95dbc6fe3db 100644 --- a/crates/rpc/rpc-eth-types/src/logs_utils.rs +++ b/crates/rpc/rpc-eth-types/src/logs_utils.rs @@ -6,11 +6,16 @@ use alloy_consensus::TxReceipt; use alloy_eips::{eip2718::Encodable2718, BlockNumHash}; use alloy_primitives::TxHash; use alloy_rpc_types_eth::{Filter, Log}; +use data_encoding::BASE64; +use derive_more::Debug; use reth_chainspec::ChainInfo; use reth_errors::ProviderError; use reth_primitives_traits::{BlockBody, RecoveredBlock, SignedTransaction}; use reth_storage_api::{BlockReader, ProviderBlock}; -use std::sync::Arc; +use serde::{Deserialize, Serialize}; +use thiserror::Error; + +use std::{fmt, str::FromStr, sync::Arc}; /// Returns all matching of a block's receipts when the transaction hashes are known. pub fn matching_block_logs_with_tx_hashes<'a, I, R>( @@ -74,75 +79,161 @@ pub fn append_matching_block_logs

( receipts: &[P::Receipt], removed: bool, block_timestamp: u64, -) -> Result<(), ProviderError> + max_logs: Option, + mut offset: u64, +) -> Result, ProviderError> where P: BlockReader, { - // Tracks the index of a log in the entire block. - let mut log_index: u64 = 0; - // Lazy loaded number of the first transaction in the block. // This is useful for blocks with multiple matching logs because it // prevents re-querying the block body indices. let mut loaded_first_tx_num = None; - // Iterate over receipts and append matching logs. - for (receipt_idx, receipt) in receipts.iter().enumerate() { - // The transaction hash of the current receipt. + // flatten receipts and logs into one iterator for easy offseting + let mut iter = receipts + .iter() + .enumerate() + .flat_map(|(receipt_idx, receipt)| receipt.logs().iter().map(move |log| (receipt_idx, log))) + .skip(offset as usize) + .peekable(); + + // Iterate over flattened logs, appending matching logs. + while let Some((receipt_idx, log)) = iter.next() { let mut transaction_hash = None; - for log in receipt.logs() { - if filter.matches(log) { - // if this is the first match in the receipt's logs, look up the transaction hash - if transaction_hash.is_none() { - transaction_hash = match &provider_or_block { - ProviderOrBlock::Block(block) => { - block.body().transactions().get(receipt_idx).map(|t| t.trie_hash()) - } - ProviderOrBlock::Provider(provider) => { - let first_tx_num = match loaded_first_tx_num { - Some(num) => num, - None => { - let block_body_indices = provider - .block_body_indices(block_num_hash.number)? - .ok_or(ProviderError::BlockBodyIndicesNotFound( - block_num_hash.number, - ))?; - loaded_first_tx_num = Some(block_body_indices.first_tx_num); - block_body_indices.first_tx_num - } - }; - - // This is safe because Transactions and Receipts have the same - // keys. - let transaction_id = first_tx_num + receipt_idx as u64; - let transaction = - provider.transaction_by_id(transaction_id)?.ok_or_else(|| { - ProviderError::TransactionNotFound(transaction_id.into()) - })?; - - Some(transaction.trie_hash()) - } - }; - } + if filter.matches(log) { + // if this is the first match in the receipt's logs, look up the transaction hash + if transaction_hash.is_none() { + transaction_hash = match &provider_or_block { + ProviderOrBlock::Block(block) => { + block.body().transactions().get(receipt_idx).map(|t| t.trie_hash()) + } + ProviderOrBlock::Provider(provider) => { + let first_tx_num = match loaded_first_tx_num { + Some(num) => num, + None => { + let block_body_indices = provider + .block_body_indices(block_num_hash.number)? + .ok_or(ProviderError::BlockBodyIndicesNotFound( + block_num_hash.number, + ))?; + loaded_first_tx_num = Some(block_body_indices.first_tx_num); + block_body_indices.first_tx_num + } + }; - let log = Log { - inner: log.clone(), - block_hash: Some(block_num_hash.hash), - block_number: Some(block_num_hash.number), - transaction_hash, - // The transaction and receipt index is always the same. - transaction_index: Some(receipt_idx as u64), - log_index: Some(log_index), - removed, - block_timestamp: Some(block_timestamp), + // This is safe because Transactions and Receipts have the same + // keys. + let transaction_id = first_tx_num + receipt_idx as u64; + let transaction = + provider.transaction_by_id(transaction_id)?.ok_or_else(|| { + ProviderError::TransactionNotFound(transaction_id.into()) + })?; + + Some(transaction.trie_hash()) + } }; - all_logs.push(log); } - log_index += 1; + + let log = Log { + inner: log.clone(), + block_hash: Some(block_num_hash.hash), + block_number: Some(block_num_hash.number), + transaction_hash, + // The transaction and receipt index is always the same. + transaction_index: Some(receipt_idx as u64), + log_index: Some(offset), + removed, + block_timestamp: Some(block_timestamp), + }; + all_logs.push(log); + + if let Some(max_logs) = max_logs && + all_logs.len() >= max_logs + { + // check if we are at the end of block logs + // update cursor to next log_idx + return Ok(iter.peek().map(|_| offset + 1)); + } } + offset += 1; + } + + Ok(None) +} + +/// Cursor type for paginating getLog requests +#[derive(Clone, Debug)] +pub struct Cursor { + /// Block number to resume pagination from + pub block_num: u64, + /// `log_index` within block to resume pagination from + pub log_idx: u64, +} + +impl Cursor { + /// Convert cursor to bytes + pub fn to_bytes(&self) -> [u8; 16] { + let block_number_bytes = self.block_num.to_be_bytes(); + let next_log_idx_bytes = self.log_idx.to_be_bytes(); + let mut cursor_bytes: [u8; 16] = [0; 16]; + cursor_bytes[..8].copy_from_slice(&block_number_bytes); + cursor_bytes[8..].copy_from_slice(&next_log_idx_bytes); + + cursor_bytes + } + + /// Convert byte slice into a cursor + pub fn from_bytes(bytes: &[u8; 16]) -> Result { + let block_num_bytes: [u8; 8] = bytes[..8].try_into().map_err(|_| CursorError)?; + let log_idx_bytes: [u8; 8] = bytes[8..].try_into().map_err(|_| CursorError)?; + + let block_num = u64::from_be_bytes(block_num_bytes); + let log_idx = u64::from_be_bytes(log_idx_bytes); + + Ok(Self { block_num, log_idx }) + } +} + +/// Error thrown when a cursor parsing failure occurs +#[derive(Error, Debug)] +#[error("cursor error")] +pub struct CursorError; + +impl FromStr for Cursor { + type Err = CursorError; + + fn from_str(s: &str) -> Result { + let mut bytes = [0; 16]; + BASE64.decode_mut(s.as_bytes(), &mut bytes).map_err(|_| CursorError)?; + + Self::from_bytes(&bytes) + } +} + +impl fmt::Display for Cursor { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + BASE64.encode_write(&self.to_bytes(), f) + } +} + +impl Serialize for Cursor { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.collect_str(self) + } +} + +impl<'de> Deserialize<'de> for Cursor { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + String::deserialize(deserializer)?.parse().map_err(serde::de::Error::custom) } - Ok(()) } /// Computes the block range based on the filter range and current block numbers. diff --git a/crates/rpc/rpc/src/eth/filter.rs b/crates/rpc/rpc/src/eth/filter.rs index b8268033519..f4aea302923 100644 --- a/crates/rpc/rpc/src/eth/filter.rs +++ b/crates/rpc/rpc/src/eth/filter.rs @@ -23,7 +23,7 @@ use reth_rpc_eth_api::{ RpcNodeCoreExt, RpcTransaction, }; use reth_rpc_eth_types::{ - logs_utils::{self, append_matching_block_logs, ProviderOrBlock}, + logs_utils::{self, append_matching_block_logs, Cursor, ProviderOrBlock}, EthApiError, EthFilterConfig, EthStateCache, EthSubscriptionIdProvider, }; use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult}; @@ -63,7 +63,7 @@ where limits: QueryLimits, ) -> impl Future>> + Send { trace!(target: "rpc::eth", "Serving eth_getLogs"); - self.logs_for_filter(filter, limits).map_err(|e| e.into()) + self.logs_for_filter(filter, limits, None, false).map_err(|e| e.into()) } } @@ -284,9 +284,11 @@ where from_block_number, to_block_number, self.inner.query_limits, + None, + false, ) .await?; - Ok(FilterChanges::Logs(logs)) + Ok(FilterChanges::Logs(logs.0)) } } } @@ -310,7 +312,8 @@ where } }; - self.logs_for_filter(filter, self.inner.query_limits).await + let res = self.logs_for_filter(filter, self.inner.query_limits, None, false).await?; + Ok(res.0) } /// Returns logs matching given filter object. @@ -318,8 +321,13 @@ where &self, filter: Filter, limits: QueryLimits, - ) -> Result, EthFilterError> { - self.inner.clone().logs_for_filter(filter, limits).await + cursor: Option, + single_block_can_exceed_limit: bool, + ) -> Result<(Vec, Option), EthFilterError> { + self.inner + .clone() + .logs_for_filter(filter, limits, cursor, single_block_can_exceed_limit) + .await } } @@ -407,7 +415,37 @@ where /// Handler for `eth_getLogs` async fn logs(&self, filter: Filter) -> RpcResult> { trace!(target: "rpc::eth", "Serving eth_getLogs"); - Ok(self.logs_for_filter(filter, self.inner.query_limits).await?) + let (logs, _cursor) = + self.logs_for_filter(filter, self.inner.query_limits, None, false).await?; + + // if cursor.is_some() { + // debug!( + // target: "rpc::eth::filter", + // max_logs_per_response = self.inner.query_limits.max_logs_per_response, + // from_block, + // to_block = num_hash.number.saturating_sub(1), + // "Query exceeded max logs per response limit" + // ); + // return Err(EthFilterError::QueryExceedsMaxResults { + // max_logs: max_logs_per_response, + // from_block, + // to_block: num_hash.number.saturating_sub(1), + // }); + // } + Ok(logs) + } + + /// Returns logs matching given filter object, paginated by a cursor. + /// + /// Handler for `eth_getLogsWithCursor` + async fn logs_with_cursor( + &self, + filter: Filter, + cursor: Option, + ) -> RpcResult<(Vec, Option)> { + trace!(target: "rpc::eth", "Serving eth_getLogsWithCursor"); + + Ok(self.logs_for_filter(filter, self.inner.query_limits, cursor, true).await?) } } @@ -462,7 +500,9 @@ where self: Arc, filter: Filter, limits: QueryLimits, - ) -> Result, EthFilterError> { + cursor: Option, + single_block_can_exceed_limit: bool, + ) -> Result<(Vec, Option), EthFilterError> { match filter.block_option { FilterBlockOption::AtBlockHash(block_hash) => { // for all matching logs in the block @@ -475,7 +515,7 @@ where let block_num_hash = BlockNumHash::new(header.number(), block_hash); // we also need to ensure that the receipts are available and return an error if - // not, in case the block hash been reorged + // not, in case the block has been reorged let (receipts, maybe_block) = self .eth_cache() .get_receipts_and_maybe_block(block_num_hash.hash) @@ -493,9 +533,11 @@ where &receipts, false, header.timestamp(), + None, + 0, )?; - Ok(all_logs) + Ok((all_logs, None)) } FilterBlockOption::Range { from_block, to_block } => { // Handle special case where from block is pending @@ -503,7 +545,7 @@ where let to_block = to_block.unwrap_or(BlockNumberOrTag::Pending); if !(to_block.is_pending() || to_block.is_number()) { // always empty range - return Ok(Vec::new()); + return Ok((Vec::new(), None)); } // Try to get pending block and receipts if let Ok(Some(pending_block)) = self.eth_api.local_pending_block().await { @@ -511,7 +553,7 @@ where to_block < pending_block.block.number() { // this block range is empty based on the user input - return Ok(Vec::new()); + return Ok((Vec::new(), None)); } let info = self.provider().chain_info()?; @@ -528,8 +570,10 @@ where &pending_block.receipts, false, // removed = false for pending blocks timestamp, + None, + 0, )?; - return Ok(all_logs); + return Ok((all_logs, None)); } } } @@ -549,14 +593,21 @@ where f > info.best_number { // start block higher than local head, can return empty - return Ok(Vec::new()); + return Ok((Vec::new(), None)); } let (from_block_number, to_block_number) = logs_utils::get_filter_block_range(from, to, start_block, info); - self.get_logs_in_block_range(filter, from_block_number, to_block_number, limits) - .await + self.get_logs_in_block_range( + filter, + from_block_number, + to_block_number, + limits, + cursor, + single_block_can_exceed_limit, + ) + .await } } } @@ -596,7 +647,9 @@ where from_block: u64, to_block: u64, limits: QueryLimits, - ) -> Result, EthFilterError> { + cursor: Option, + single_block_can_exceed_limit: bool, + ) -> Result<(Vec, Option), EthFilterError> { trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range"); // perform boundary checks first @@ -613,8 +666,16 @@ where let (tx, rx) = oneshot::channel(); let this = self.clone(); self.task_spawner.spawn_blocking(Box::pin(async move { - let res = - this.get_logs_in_block_range_inner(&filter, from_block, to_block, limits).await; + let res = this + .get_logs_in_block_range_inner( + &filter, + from_block, + to_block, + limits, + cursor, + single_block_can_exceed_limit, + ) + .await; let _ = tx.send(res); })); @@ -632,10 +693,12 @@ where async fn get_logs_in_block_range_inner( self: Arc, filter: &Filter, - from_block: u64, + mut from_block: u64, to_block: u64, limits: QueryLimits, - ) -> Result, EthFilterError> { + mut cursor: Option, + single_block_can_exceed_limit: bool, + ) -> Result<(Vec, Option), EthFilterError> { let mut all_logs = Vec::new(); let mut matching_headers = Vec::new(); @@ -681,13 +744,21 @@ where self.max_headers_range, chain_tip, ); - + let is_multi_block_range = from_block != to_block; // iterate through the range mode to get receipts and blocks while let Some(ReceiptBlockResult { receipts, recovered_block, header }) = range_mode.next().await? { + // extract block_number & log_index from cursor + let mut cursor_log_idx = 0; + if let Some(Cursor { block_num, log_idx }) = cursor { + from_block = block_num; + cursor_log_idx = log_idx; + } + let num_hash = header.num_hash(); - append_matching_block_logs( + + let offset = append_matching_block_logs( &mut all_logs, recovered_block .map(ProviderOrBlock::Block) @@ -697,14 +768,20 @@ where &receipts, false, header.timestamp(), + // size check but only if range is multiple blocks, so we always return all + // logs of a single block + (is_multi_block_range || !single_block_can_exceed_limit) + .then_some(limits.max_logs_per_response) + .flatten(), + cursor_log_idx, )?; // size check but only if range is multiple blocks, so we always return all // logs of a single block - let is_multi_block_range = from_block != to_block; if let Some(max_logs_per_response) = limits.max_logs_per_response && + single_block_can_exceed_limit && is_multi_block_range && - all_logs.len() > max_logs_per_response + cursor.is_some() { debug!( target: "rpc::eth::filter", @@ -720,9 +797,23 @@ where to_block: num_hash.number.saturating_sub(1), }); } + + // update the cursor to the offest of the log within the same block + // when the logs are truncated to match maximum length + cursor = offset.map(|offset| Cursor { block_num: num_hash.number, log_idx: offset }); + + // set cursor to next block + if let Some(max_logs_per_response) = limits.max_logs_per_response && + all_logs.len() >= max_logs_per_response && + let Some(ReceiptBlockResult { header, .. }) = range_mode.next().await? + { + let num_hash = header.num_hash(); + + cursor = Some(Cursor { block_num: num_hash.number, log_idx: 0 }); + } } - Ok(all_logs) + Ok((all_logs, cursor)) } } @@ -906,6 +997,9 @@ pub enum EthFilterError { /// End block of the suggested retry range (last successfully processed block) to_block: u64, }, + /// Error with cursor for pagination + #[error("problem decoding supplied cursor")] + CursorError, /// Error serving request in `eth_` namespace. #[error(transparent)] EthAPIError(#[from] EthApiError), @@ -930,6 +1024,9 @@ impl From for jsonrpsee::types::error::ErrorObject<'static> { EthFilterError::QueryExceedsMaxResults { .. }) => { rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string()) } + EthFilterError::CursorError => { + rpc_error_with_code(jsonrpsee::types::error::PARSE_ERROR_CODE, "cursor error") + } } } }