Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
234 changes: 195 additions & 39 deletions crates/anvil-polkadot/src/api_server/filters.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
use crate::api_server::error::{Result, ToRpcResponseResult};
use crate::{
api_server::{
error::{Error, Result, ToRpcResponseResult},
txpool_helpers::extract_tx_info,
},
substrate_node::service::TransactionPoolHandle,
};
use anvil_core::eth::subscription::SubscriptionId;
use anvil_rpc::response::ResponseResult;
use futures::{FutureExt, Stream, StreamExt};
use futures::{FutureExt, StreamExt};
use pallet_revive_eth_rpc::client::Client as EthRpcClient;
use polkadot_sdk::pallet_revive::evm::{BlockNumberOrTag, Filter, Log};
use std::{collections::HashMap, sync::Arc, task::Poll, time::Duration};
use polkadot_sdk::{
pallet_revive::evm::{BlockNumberOrTag, Filter, HashesOrTransactionInfos, Log},
sc_service::TransactionPool,
};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};
use subxt::utils::H256;
use tokio::{sync::Mutex, time::Instant};
use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
Expand All @@ -13,6 +26,8 @@ use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
/// Filters that haven't been polled within this duration will be evicted.
pub const ACTIVE_FILTER_TIMEOUT_SECS: u64 = 60 * 5;

pub const LOG_TARGET: &str = "node::filter";

/// Maps filter IDs to tuples of filter and deadline.
type FilterMap = Arc<Mutex<HashMap<String, (EthFilter, Instant)>>>;

Expand Down Expand Up @@ -58,20 +73,24 @@ impl Filters {
let mut filters = self.active_filters.lock().await;
if let Some((filter, deadline)) = filters.get_mut(id) {
let response = match filter {
EthFilter::Blocks(block_filter) => {
let blocks = block_filter.drain_blocks().await;
Ok(blocks).to_rpc_result()
}
EthFilter::Logs(logs_filter) => {
let logs = logs_filter.drain_logs().await;
Ok(logs).to_rpc_result()
}
_ => filter
.next()
.await
.unwrap_or_else(|| ResponseResult::success(Vec::<()>::new())),
EthFilter::PendingTransactions(tx_filter) => {
let txs = tx_filter.drain_transactions().await;
Ok(txs).to_rpc_result()
}
};
*deadline = self.next_deadline();
return response;
}
}
warn!(target: "node::filter", "No filter found for {}", id);
warn!(target: LOG_TARGET, "No filter found for {}", id);
ResponseResult::success(Vec::<()>::new())
}

Expand All @@ -91,7 +110,7 @@ impl Filters {

/// Removes and returns the filter associated with the given identifier.
pub async fn uninstall_filter(&self, id: &str) -> Option<EthFilter> {
trace!(target: "node::filter", "Uninstalling filter id {}", id);
trace!(target: LOG_TARGET, "Uninstalling filter id {}", id);
self.active_filters.lock().await.remove(id).map(|(f, _)| f)
}

Expand All @@ -101,12 +120,12 @@ impl Filters {
/// stale filters that haven't been polled recently. Evicted filters are permanently
/// removed and cannot be recovered.
pub async fn evict(&self) {
trace!(target: "node::filter", "Evicting stale filters");
trace!(target: LOG_TARGET, "Evicting stale filters");
let now = Instant::now();
let mut active_filters = self.active_filters.lock().await;
active_filters.retain(|id, (_, deadline)| {
if now > *deadline {
trace!(target: "node::filter",?id, "Evicting stale filter");
trace!(target: LOG_TARGET,?id, "Evicting stale filter");
return false;
}
true
Expand Down Expand Up @@ -157,43 +176,52 @@ pub enum EthFilter {
/// Emits the hash (H256) of each new block as it's added to the chain.
/// Subscribers receive notifications through the broadcast channel. When polled,
/// returns all block hashes produced since the last poll.
Blocks(BlockNotifications),
Blocks(BlockFilter),
/// Log filter that tracks contract event logs.
///
/// Filters logs based on block range, addresses, and topics. Combines historic
/// logs (from the initial query range) with real-time logs from newly produced
/// blocks. The filter applies topic matching with OR logic between topic alternatives
/// and validates block ranges for incoming blocks.
Logs(Box<LogsFilter>),
Logs(LogsFilter),
/// Pending transactions filter that tracks new transactions.
///
/// Returns mined transactions since last poll + transactions that are
/// ready but have not been mined yet.
PendingTransactions(PendingTransactionsFilter),
}

impl Stream for EthFilter {
type Item = ResponseResult;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let pin = self.get_mut();
match pin {
Self::Blocks(block_notifications) => {
let mut new_blocks = Vec::new();
while let Poll::Ready(Some(result)) = block_notifications.poll_next_unpin(cx) {
match result {
Ok(block_hash) => new_blocks.push(block_hash),
Err(lagged) => {
// BroadcastStream handles lagging for us
// Just log and continue
warn!(target: "node::filter", "Block filter lagged, skipped messages {:?}", lagged);
continue;
}
}
/// Filter for tracking new block hashes.
pub struct BlockFilter {
block_notifications: BlockNotifications,
}

impl BlockFilter {
pub fn new(block_notifier: BlockNotifications) -> Self {
Self { block_notifications: block_notifier }
}

/// Drains all new block hashes since the last poll.
///
/// Returns all block hashes that were broadcast since the last call to this method.
/// Handles lagged notifications gracefully by logging and continuing.
async fn drain_blocks(&mut self) -> Vec<H256> {
let mut new_blocks = Vec::new();

while let Some(result) = self.block_notifications.next().now_or_never().flatten() {
match result {
Ok(block_hash) => new_blocks.push(block_hash),
Err(BroadcastStreamRecvError::Lagged(count)) => {
warn!(
target: LOG_TARGET,
"Block filter lagged, skipped {} block notifications",
count
);
}
Poll::Ready(Some(Ok(new_blocks).to_rpc_result()))
}
// handled directly in get_filter_changes
Self::Logs(_) => Poll::Pending,
}

new_blocks
}
}

Expand Down Expand Up @@ -258,7 +286,7 @@ impl LogsFilter {
Ok(block_hash) => block_hashes.push(block_hash),
Err(BroadcastStreamRecvError::Lagged(blocks)) => {
// Channel overflowed - some blocks were skipped
warn!(target: "node::filter", "Logs filter lagged, skipped {} block notifications", blocks);
warn!(target: LOG_TARGET, "Logs filter lagged, skipped {} block notifications", blocks);
// Continue draining what's left in the channel
continue;
}
Expand Down Expand Up @@ -316,3 +344,131 @@ impl LogsFilter {
true
}
}

/// Filter for pending transactions
///
/// Monitors the transaction pool and returns newly pending transaction hashes
/// when polled. Transactions that have been included in old blocks are automatically filtered out.
///
/// The filter maintains state of previously seen transactions to ensure each
/// transaction is reported only once, even if it remains in the pending pool
/// across multiple polls.
pub struct PendingTransactionsFilter {
/// Set of transaction hashes already reported to the client
already_seen: HashSet<H256>,
/// Stream of new block notifications for detecting mined transactions
block_notifications: BroadcastStream<H256>,
/// Reference to the transaction pool for querying ready transactions
tx_pool: Arc<TransactionPoolHandle>,
/// Ethereum RPC client for fetching block transaction data
eth_rpc_client: EthRpcClient,
}
impl PendingTransactionsFilter {
pub fn new(
block_notifier: BroadcastStream<H256>,
tx_pool: Arc<TransactionPoolHandle>,
eth_rpc_client: EthRpcClient,
) -> Self {
Self {
already_seen: tx_pool
.ready()
.filter_map(|tx| extract_tx_info(&tx.data).map(|(_, _, tx_info)| tx_info.hash))
.collect(),
block_notifications: block_notifier,
tx_pool,
eth_rpc_client,
}
}

/// Drains all new pending transaction hashes since the last poll.
///
/// This method:
/// 1. Queries the current ready transaction pool
/// 2. Drains block notifications to identify mined transactions
/// 3. Returns only new transactions (not previously seen and not mined)
///
/// The filter state is updated to remember all currently pending transactions,
/// ensuring they won't be reported again on subsequent polls.
async fn drain_transactions(&mut self) -> Vec<H256> {
// Get current ready transactions
let current_ready: HashSet<H256> = self
.tx_pool
.ready()
.filter_map(|tx| {
extract_tx_info(&tx.data).map(|(_, _, tx_info)| tx_info.hash).or_else(|| {
warn!(target: LOG_TARGET, "Failed to extract transaction info from ready pool");
None
})
})
.collect();

// Get transactions that have been included in blocks already
let mut included_transactions = HashSet::new();
while let Some(result) = self.block_notifications.next().now_or_never().flatten() {
match result {
Ok(block_hash) => match self.fetch_block_transactions(&block_hash).await {
Ok(tx_hashes) => included_transactions.extend(tx_hashes),
Err(e) => {
warn!(
target: LOG_TARGET,
"Failed to fetch transactions for block {:?}: {}",
block_hash, e
);
}
},
Err(BroadcastStreamRecvError::Lagged(blocks)) => {
// Channel overflowed - some blocks were skipped
warn!(target: LOG_TARGET, "Logs filter lagged, skipped {} block notifications", blocks);
// Continue draining what's left in the channel
continue;
}
}
}

// New from pool: transactions in ready pool we haven't seen before
let new_from_pool: HashSet<H256> =
current_ready.difference(&self.already_seen).copied().collect();
let excluded: HashSet<H256> = self.already_seen.union(&new_from_pool).copied().collect();
let new_from_blocks: HashSet<H256> =
included_transactions.difference(&excluded).copied().collect();
let new_pending: Vec<H256> = new_from_pool.union(&new_from_blocks).copied().collect();
// Remove mined transactions from already_seen
for tx_hash in &included_transactions {
self.already_seen.remove(tx_hash);
}

// Only track transactions that are still pending (not mined)
let still_pending: HashSet<H256> =
current_ready.difference(&included_transactions).copied().collect();
self.already_seen.extend(still_pending);
new_pending
}

/// Fetches all transaction hashes from a given block.
///
/// Takes a substrate block hash, fetches the block, converts it to an EVM block,
/// and extracts all transaction hashes regardless of whether they're returned
/// as hashes or full transaction objects.
async fn fetch_block_transactions(&self, substrate_block_hash: &H256) -> Result<Vec<H256>> {
let substrate_block =
self.eth_rpc_client.block_by_hash(substrate_block_hash).await?.ok_or(
Error::InternalError(format!(
"Could not find block with hash: {substrate_block_hash}"
)),
)?;
let block = self
.eth_rpc_client
.evm_block(substrate_block, false)
.await
.ok_or(Error::InternalError("Could not convert to an evm block".to_string()))?;
let tx_hashes = match block.transactions {
HashesOrTransactionInfos::Hashes(hashes) => hashes,
// Considering that we called evm_block with hydrated false we will
// never receive TransactionInfos but handled it anyways.
HashesOrTransactionInfos::TransactionInfos(infos) => {
infos.iter().map(|ti| ti.hash).collect()
}
};
Ok(tx_hashes)
}
}
25 changes: 20 additions & 5 deletions crates/anvil-polkadot/src/api_server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use crate::{
api_server::{
ApiRequest,
error::{Error, Result, ToRpcResponseResult},
filters::{BlockNotifications, EthFilter, Filters, LogsFilter, eviction_task},
filters::{
BlockFilter, BlockNotifications, EthFilter, Filters, LogsFilter,
PendingTransactionsFilter, eviction_task,
},
revive_conversions::{
AlloyU256, ReviveAddress, ReviveBlockId, ReviveBlockNumberOrTag, ReviveBytes,
ReviveFilter, ReviveTrace, ReviveTracerType, SubstrateU256,
Expand Down Expand Up @@ -425,7 +428,7 @@ impl ApiServer {
EthRequest::EthGetFilterChanges(id) => self.get_filter_changes(&id).await,
EthRequest::EthNewBlockFilter(_) => self.new_block_filter().await.to_rpc_result(),
EthRequest::EthNewPendingTransactionFilter(_) => {
Err::<(), _>(Error::RpcUnimplemented).to_rpc_result()
self.new_pending_transactions_filter().await.to_rpc_result()
}
EthRequest::EthUninstallFilter(id) => self.uninstall_filter(&id).await.to_rpc_result(),
_ => Err::<(), _>(Error::RpcUnimplemented).to_rpc_result(),
Expand Down Expand Up @@ -1376,7 +1379,9 @@ impl ApiServer {
/// Creates a filter to notify about new blocks
async fn new_block_filter(&self) -> Result<String> {
node_info!("eth_newBlockFilter");
let filter = EthFilter::Blocks(BlockNotifications::new(self.new_block_notifications()?));
let filter = EthFilter::Blocks(BlockFilter::new(BlockNotifications::new(
self.new_block_notifications()?,
)));
Ok(self.filters.add_filter(filter).await)
}

Expand All @@ -1392,16 +1397,26 @@ impl ApiServer {
self.filters.get_filter_changes(id).await
}

async fn new_pending_transactions_filter(&self) -> Result<String> {
node_info!("eth_newPendingTransactionFilter");
let filter = EthFilter::PendingTransactions(PendingTransactionsFilter::new(
BlockNotifications::new(self.new_block_notifications()?),
self.tx_pool.clone(),
self.eth_rpc_client.clone(),
));
Ok(self.filters.add_filter(filter).await)
}

async fn new_filter(&self, filter: evm::Filter) -> Result<String> {
node_info!("eth_newFilter");
let eth_filter = EthFilter::Logs(Box::new(
let eth_filter = EthFilter::Logs(
LogsFilter::new(
BlockNotifications::new(self.new_block_notifications()?),
self.eth_rpc_client.clone(),
filter,
)
.await?,
));
);
Ok(self.filters.add_filter(eth_filter).await)
}

Expand Down
Loading
Loading