Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions crates/rpc/rpc-eth-api/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use alloy_json_rpc::RpcObject;
use alloy_rpc_types_eth::{Filter, FilterChanges, FilterId, Log, PendingTransactionFilterKind};
use jsonrpsee::{core::RpcResult, proc_macros::rpc};
use reth_rpc_eth_types::logs_utils::Cursor;
use std::future::Future;

/// Rpc Interface for poll-based ethereum filter API.
Expand Down Expand Up @@ -39,6 +40,14 @@ pub trait EthFilterApi<T: RpcObject> {
/// Returns logs matching given filter object.
#[method(name = "getLogs")]
async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>>;

/// Returns logs matching given filter object, alongside a cursor for pagination.
#[method(name = "getLogsWithCursor")]
async fn logs_with_cursor(
&self,
filter: Filter,
cursor: Option<Cursor>,
) -> RpcResult<(Vec<Log>, Option<Cursor>)>;
}

/// Limits for logs queries
Expand Down
1 change: 1 addition & 0 deletions crates/rpc/rpc-eth-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ schnellru.workspace = true
rand.workspace = true
tracing.workspace = true
itertools.workspace = true
data-encoding.workspace = true

[dev-dependencies]
serde_json.workspace = true
Expand Down
201 changes: 146 additions & 55 deletions crates/rpc/rpc-eth-types/src/logs_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@ use alloy_consensus::TxReceipt;
use alloy_eips::{eip2718::Encodable2718, BlockNumHash};
use alloy_primitives::TxHash;
use alloy_rpc_types_eth::{Filter, Log};
use data_encoding::BASE64;
use derive_more::Debug;
use reth_chainspec::ChainInfo;
use reth_errors::ProviderError;
use reth_primitives_traits::{BlockBody, RecoveredBlock, SignedTransaction};
use reth_storage_api::{BlockReader, ProviderBlock};
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use thiserror::Error;

use std::{fmt, str::FromStr, sync::Arc};

/// Returns all matching of a block's receipts when the transaction hashes are known.
pub fn matching_block_logs_with_tx_hashes<'a, I, R>(
Expand Down Expand Up @@ -74,75 +79,161 @@ pub fn append_matching_block_logs<P>(
receipts: &[P::Receipt],
removed: bool,
block_timestamp: u64,
) -> Result<(), ProviderError>
max_logs: Option<usize>,
mut offset: u64,
) -> Result<Option<u64>, ProviderError>
where
P: BlockReader<Transaction: SignedTransaction>,
{
// Tracks the index of a log in the entire block.
let mut log_index: u64 = 0;

// Lazy loaded number of the first transaction in the block.
// This is useful for blocks with multiple matching logs because it
// prevents re-querying the block body indices.
let mut loaded_first_tx_num = None;

// Iterate over receipts and append matching logs.
for (receipt_idx, receipt) in receipts.iter().enumerate() {
// The transaction hash of the current receipt.
// flatten receipts and logs into one iterator for easy offseting
let mut iter = receipts
.iter()
.enumerate()
.flat_map(|(receipt_idx, receipt)| receipt.logs().iter().map(move |log| (receipt_idx, log)))
.skip(offset as usize)
.peekable();

// Iterate over flattened logs, appending matching logs.
while let Some((receipt_idx, log)) = iter.next() {
let mut transaction_hash = None;

for log in receipt.logs() {
if filter.matches(log) {
// if this is the first match in the receipt's logs, look up the transaction hash
if transaction_hash.is_none() {
transaction_hash = match &provider_or_block {
ProviderOrBlock::Block(block) => {
block.body().transactions().get(receipt_idx).map(|t| t.trie_hash())
}
ProviderOrBlock::Provider(provider) => {
let first_tx_num = match loaded_first_tx_num {
Some(num) => num,
None => {
let block_body_indices = provider
.block_body_indices(block_num_hash.number)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(
block_num_hash.number,
))?;
loaded_first_tx_num = Some(block_body_indices.first_tx_num);
block_body_indices.first_tx_num
}
};

// This is safe because Transactions and Receipts have the same
// keys.
let transaction_id = first_tx_num + receipt_idx as u64;
let transaction =
provider.transaction_by_id(transaction_id)?.ok_or_else(|| {
ProviderError::TransactionNotFound(transaction_id.into())
})?;

Some(transaction.trie_hash())
}
};
}
if filter.matches(log) {
// if this is the first match in the receipt's logs, look up the transaction hash
if transaction_hash.is_none() {
transaction_hash = match &provider_or_block {
ProviderOrBlock::Block(block) => {
block.body().transactions().get(receipt_idx).map(|t| t.trie_hash())
}
ProviderOrBlock::Provider(provider) => {
let first_tx_num = match loaded_first_tx_num {
Some(num) => num,
None => {
let block_body_indices = provider
.block_body_indices(block_num_hash.number)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(
block_num_hash.number,
))?;
loaded_first_tx_num = Some(block_body_indices.first_tx_num);
block_body_indices.first_tx_num
}
};

let log = Log {
inner: log.clone(),
block_hash: Some(block_num_hash.hash),
block_number: Some(block_num_hash.number),
transaction_hash,
// The transaction and receipt index is always the same.
transaction_index: Some(receipt_idx as u64),
log_index: Some(log_index),
removed,
block_timestamp: Some(block_timestamp),
// This is safe because Transactions and Receipts have the same
// keys.
let transaction_id = first_tx_num + receipt_idx as u64;
let transaction =
provider.transaction_by_id(transaction_id)?.ok_or_else(|| {
ProviderError::TransactionNotFound(transaction_id.into())
})?;

Some(transaction.trie_hash())
}
};
all_logs.push(log);
}
log_index += 1;

let log = Log {
inner: log.clone(),
block_hash: Some(block_num_hash.hash),
block_number: Some(block_num_hash.number),
transaction_hash,
// The transaction and receipt index is always the same.
transaction_index: Some(receipt_idx as u64),
log_index: Some(offset),
removed,
block_timestamp: Some(block_timestamp),
};
all_logs.push(log);

if let Some(max_logs) = max_logs &&
all_logs.len() >= max_logs
{
// check if we are at the end of block logs
// update cursor to next log_idx
return Ok(iter.peek().map(|_| offset + 1));
}
}
offset += 1;
}

Ok(None)
}

/// Cursor type for paginating getLog requests
#[derive(Clone, Debug)]
pub struct Cursor {
/// Block number to resume pagination from
pub block_num: u64,
/// `log_index` within block to resume pagination from
pub log_idx: u64,
}

impl Cursor {
/// Convert cursor to bytes
pub fn to_bytes(&self) -> [u8; 16] {
let block_number_bytes = self.block_num.to_be_bytes();
let next_log_idx_bytes = self.log_idx.to_be_bytes();
let mut cursor_bytes: [u8; 16] = [0; 16];
cursor_bytes[..8].copy_from_slice(&block_number_bytes);
cursor_bytes[8..].copy_from_slice(&next_log_idx_bytes);

cursor_bytes
}

/// Convert byte slice into a cursor
pub fn from_bytes(bytes: &[u8; 16]) -> Result<Self, CursorError> {
let block_num_bytes: [u8; 8] = bytes[..8].try_into().map_err(|_| CursorError)?;
let log_idx_bytes: [u8; 8] = bytes[8..].try_into().map_err(|_| CursorError)?;

let block_num = u64::from_be_bytes(block_num_bytes);
let log_idx = u64::from_be_bytes(log_idx_bytes);

Ok(Self { block_num, log_idx })
}
}

/// Error thrown when a cursor parsing failure occurs
#[derive(Error, Debug)]
#[error("cursor error")]
pub struct CursorError;

impl FromStr for Cursor {
type Err = CursorError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut bytes = [0; 16];
BASE64.decode_mut(s.as_bytes(), &mut bytes).map_err(|_| CursorError)?;

Self::from_bytes(&bytes)
}
}

impl fmt::Display for Cursor {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
BASE64.encode_write(&self.to_bytes(), f)
}
}

impl Serialize for Cursor {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.collect_str(self)
}
}

impl<'de> Deserialize<'de> for Cursor {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
String::deserialize(deserializer)?.parse().map_err(serde::de::Error::custom)
}
Ok(())
}

/// Computes the block range based on the filter range and current block numbers.
Expand Down
Loading
Loading