Skip to content

Commit b6b6f0c

Browse files
Implement pending transactions filter (#428)
* Implement pending transactions filter --------- Signed-off-by: Alexandru Cihodaru <[email protected]>
1 parent ba8c689 commit b6b6f0c

File tree

4 files changed

+456
-45
lines changed

4 files changed

+456
-45
lines changed

crates/anvil-polkadot/src/api_server/filters.rs

Lines changed: 195 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,23 @@
1-
use crate::api_server::error::{Result, ToRpcResponseResult};
1+
use crate::{
2+
api_server::{
3+
error::{Error, Result, ToRpcResponseResult},
4+
txpool_helpers::extract_tx_info,
5+
},
6+
substrate_node::service::TransactionPoolHandle,
7+
};
28
use anvil_core::eth::subscription::SubscriptionId;
39
use anvil_rpc::response::ResponseResult;
4-
use futures::{FutureExt, Stream, StreamExt};
10+
use futures::{FutureExt, StreamExt};
511
use pallet_revive_eth_rpc::client::Client as EthRpcClient;
6-
use polkadot_sdk::pallet_revive::evm::{BlockNumberOrTag, Filter, Log};
7-
use std::{collections::HashMap, sync::Arc, task::Poll, time::Duration};
12+
use polkadot_sdk::{
13+
pallet_revive::evm::{BlockNumberOrTag, Filter, HashesOrTransactionInfos, Log},
14+
sc_service::TransactionPool,
15+
};
16+
use std::{
17+
collections::{HashMap, HashSet},
18+
sync::Arc,
19+
time::Duration,
20+
};
821
use subxt::utils::H256;
922
use tokio::{sync::Mutex, time::Instant};
1023
use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
@@ -13,6 +26,8 @@ use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
1326
/// Filters that haven't been polled within this duration will be evicted.
1427
pub const ACTIVE_FILTER_TIMEOUT_SECS: u64 = 60 * 5;
1528

29+
pub const LOG_TARGET: &str = "node::filter";
30+
1631
/// Maps filter IDs to tuples of filter and deadline.
1732
type FilterMap = Arc<Mutex<HashMap<String, (EthFilter, Instant)>>>;
1833

@@ -58,20 +73,24 @@ impl Filters {
5873
let mut filters = self.active_filters.lock().await;
5974
if let Some((filter, deadline)) = filters.get_mut(id) {
6075
let response = match filter {
76+
EthFilter::Blocks(block_filter) => {
77+
let blocks = block_filter.drain_blocks().await;
78+
Ok(blocks).to_rpc_result()
79+
}
6180
EthFilter::Logs(logs_filter) => {
6281
let logs = logs_filter.drain_logs().await;
6382
Ok(logs).to_rpc_result()
6483
}
65-
_ => filter
66-
.next()
67-
.await
68-
.unwrap_or_else(|| ResponseResult::success(Vec::<()>::new())),
84+
EthFilter::PendingTransactions(tx_filter) => {
85+
let txs = tx_filter.drain_transactions().await;
86+
Ok(txs).to_rpc_result()
87+
}
6988
};
7089
*deadline = self.next_deadline();
7190
return response;
7291
}
7392
}
74-
warn!(target: "node::filter", "No filter found for {}", id);
93+
warn!(target: LOG_TARGET, "No filter found for {}", id);
7594
ResponseResult::success(Vec::<()>::new())
7695
}
7796

@@ -91,7 +110,7 @@ impl Filters {
91110

92111
/// Removes and returns the filter associated with the given identifier.
93112
pub async fn uninstall_filter(&self, id: &str) -> Option<EthFilter> {
94-
trace!(target: "node::filter", "Uninstalling filter id {}", id);
113+
trace!(target: LOG_TARGET, "Uninstalling filter id {}", id);
95114
self.active_filters.lock().await.remove(id).map(|(f, _)| f)
96115
}
97116

@@ -101,12 +120,12 @@ impl Filters {
101120
/// stale filters that haven't been polled recently. Evicted filters are permanently
102121
/// removed and cannot be recovered.
103122
pub async fn evict(&self) {
104-
trace!(target: "node::filter", "Evicting stale filters");
123+
trace!(target: LOG_TARGET, "Evicting stale filters");
105124
let now = Instant::now();
106125
let mut active_filters = self.active_filters.lock().await;
107126
active_filters.retain(|id, (_, deadline)| {
108127
if now > *deadline {
109-
trace!(target: "node::filter",?id, "Evicting stale filter");
128+
trace!(target: LOG_TARGET,?id, "Evicting stale filter");
110129
return false;
111130
}
112131
true
@@ -157,43 +176,52 @@ pub enum EthFilter {
157176
/// Emits the hash (H256) of each new block as it's added to the chain.
158177
/// Subscribers receive notifications through the broadcast channel. When polled,
159178
/// returns all block hashes produced since the last poll.
160-
Blocks(BlockNotifications),
179+
Blocks(BlockFilter),
161180
/// Log filter that tracks contract event logs.
162181
///
163182
/// Filters logs based on block range, addresses, and topics. Combines historic
164183
/// logs (from the initial query range) with real-time logs from newly produced
165184
/// blocks. The filter applies topic matching with OR logic between topic alternatives
166185
/// and validates block ranges for incoming blocks.
167-
Logs(Box<LogsFilter>),
186+
Logs(LogsFilter),
187+
/// Pending transactions filter that tracks new transactions.
188+
///
189+
/// Returns mined transactions since last poll + transactions that are
190+
/// ready but have not been mined yet.
191+
PendingTransactions(PendingTransactionsFilter),
168192
}
169193

170-
impl Stream for EthFilter {
171-
type Item = ResponseResult;
172-
173-
fn poll_next(
174-
self: std::pin::Pin<&mut Self>,
175-
cx: &mut std::task::Context<'_>,
176-
) -> std::task::Poll<Option<Self::Item>> {
177-
let pin = self.get_mut();
178-
match pin {
179-
Self::Blocks(block_notifications) => {
180-
let mut new_blocks = Vec::new();
181-
while let Poll::Ready(Some(result)) = block_notifications.poll_next_unpin(cx) {
182-
match result {
183-
Ok(block_hash) => new_blocks.push(block_hash),
184-
Err(lagged) => {
185-
// BroadcastStream handles lagging for us
186-
// Just log and continue
187-
warn!(target: "node::filter", "Block filter lagged, skipped messages {:?}", lagged);
188-
continue;
189-
}
190-
}
194+
/// Filter for tracking new block hashes.
195+
pub struct BlockFilter {
196+
block_notifications: BlockNotifications,
197+
}
198+
199+
impl BlockFilter {
200+
pub fn new(block_notifier: BlockNotifications) -> Self {
201+
Self { block_notifications: block_notifier }
202+
}
203+
204+
/// Drains all new block hashes since the last poll.
205+
///
206+
/// Returns all block hashes that were broadcast since the last call to this method.
207+
/// Handles lagged notifications gracefully by logging and continuing.
208+
async fn drain_blocks(&mut self) -> Vec<H256> {
209+
let mut new_blocks = Vec::new();
210+
211+
while let Some(result) = self.block_notifications.next().now_or_never().flatten() {
212+
match result {
213+
Ok(block_hash) => new_blocks.push(block_hash),
214+
Err(BroadcastStreamRecvError::Lagged(count)) => {
215+
warn!(
216+
target: LOG_TARGET,
217+
"Block filter lagged, skipped {} block notifications",
218+
count
219+
);
191220
}
192-
Poll::Ready(Some(Ok(new_blocks).to_rpc_result()))
193221
}
194-
// handled directly in get_filter_changes
195-
Self::Logs(_) => Poll::Pending,
196222
}
223+
224+
new_blocks
197225
}
198226
}
199227

@@ -258,7 +286,7 @@ impl LogsFilter {
258286
Ok(block_hash) => block_hashes.push(block_hash),
259287
Err(BroadcastStreamRecvError::Lagged(blocks)) => {
260288
// Channel overflowed - some blocks were skipped
261-
warn!(target: "node::filter", "Logs filter lagged, skipped {} block notifications", blocks);
289+
warn!(target: LOG_TARGET, "Logs filter lagged, skipped {} block notifications", blocks);
262290
// Continue draining what's left in the channel
263291
continue;
264292
}
@@ -316,3 +344,131 @@ impl LogsFilter {
316344
true
317345
}
318346
}
347+
348+
/// Filter for pending transactions
349+
///
350+
/// Monitors the transaction pool and returns newly pending transaction hashes
351+
/// when polled. Transactions that have been included in old blocks are automatically filtered out.
352+
///
353+
/// The filter maintains state of previously seen transactions to ensure each
354+
/// transaction is reported only once, even if it remains in the pending pool
355+
/// across multiple polls.
356+
pub struct PendingTransactionsFilter {
357+
/// Set of transaction hashes already reported to the client
358+
already_seen: HashSet<H256>,
359+
/// Stream of new block notifications for detecting mined transactions
360+
block_notifications: BroadcastStream<H256>,
361+
/// Reference to the transaction pool for querying ready transactions
362+
tx_pool: Arc<TransactionPoolHandle>,
363+
/// Ethereum RPC client for fetching block transaction data
364+
eth_rpc_client: EthRpcClient,
365+
}
366+
impl PendingTransactionsFilter {
367+
pub fn new(
368+
block_notifier: BroadcastStream<H256>,
369+
tx_pool: Arc<TransactionPoolHandle>,
370+
eth_rpc_client: EthRpcClient,
371+
) -> Self {
372+
Self {
373+
already_seen: tx_pool
374+
.ready()
375+
.filter_map(|tx| extract_tx_info(&tx.data).map(|(_, _, tx_info)| tx_info.hash))
376+
.collect(),
377+
block_notifications: block_notifier,
378+
tx_pool,
379+
eth_rpc_client,
380+
}
381+
}
382+
383+
/// Drains all new pending transaction hashes since the last poll.
384+
///
385+
/// This method:
386+
/// 1. Queries the current ready transaction pool
387+
/// 2. Drains block notifications to identify mined transactions
388+
/// 3. Returns only new transactions (not previously seen and not mined)
389+
///
390+
/// The filter state is updated to remember all currently pending transactions,
391+
/// ensuring they won't be reported again on subsequent polls.
392+
async fn drain_transactions(&mut self) -> Vec<H256> {
393+
// Get current ready transactions
394+
let current_ready: HashSet<H256> = self
395+
.tx_pool
396+
.ready()
397+
.filter_map(|tx| {
398+
extract_tx_info(&tx.data).map(|(_, _, tx_info)| tx_info.hash).or_else(|| {
399+
warn!(target: LOG_TARGET, "Failed to extract transaction info from ready pool");
400+
None
401+
})
402+
})
403+
.collect();
404+
405+
// Get transactions that have been included in blocks already
406+
let mut included_transactions = HashSet::new();
407+
while let Some(result) = self.block_notifications.next().now_or_never().flatten() {
408+
match result {
409+
Ok(block_hash) => match self.fetch_block_transactions(&block_hash).await {
410+
Ok(tx_hashes) => included_transactions.extend(tx_hashes),
411+
Err(e) => {
412+
warn!(
413+
target: LOG_TARGET,
414+
"Failed to fetch transactions for block {:?}: {}",
415+
block_hash, e
416+
);
417+
}
418+
},
419+
Err(BroadcastStreamRecvError::Lagged(blocks)) => {
420+
// Channel overflowed - some blocks were skipped
421+
warn!(target: LOG_TARGET, "Logs filter lagged, skipped {} block notifications", blocks);
422+
// Continue draining what's left in the channel
423+
continue;
424+
}
425+
}
426+
}
427+
428+
// New from pool: transactions in ready pool we haven't seen before
429+
let new_from_pool: HashSet<H256> =
430+
current_ready.difference(&self.already_seen).copied().collect();
431+
let excluded: HashSet<H256> = self.already_seen.union(&new_from_pool).copied().collect();
432+
let new_from_blocks: HashSet<H256> =
433+
included_transactions.difference(&excluded).copied().collect();
434+
let new_pending: Vec<H256> = new_from_pool.union(&new_from_blocks).copied().collect();
435+
// Remove mined transactions from already_seen
436+
for tx_hash in &included_transactions {
437+
self.already_seen.remove(tx_hash);
438+
}
439+
440+
// Only track transactions that are still pending (not mined)
441+
let still_pending: HashSet<H256> =
442+
current_ready.difference(&included_transactions).copied().collect();
443+
self.already_seen.extend(still_pending);
444+
new_pending
445+
}
446+
447+
/// Fetches all transaction hashes from a given block.
448+
///
449+
/// Takes a substrate block hash, fetches the block, converts it to an EVM block,
450+
/// and extracts all transaction hashes regardless of whether they're returned
451+
/// as hashes or full transaction objects.
452+
async fn fetch_block_transactions(&self, substrate_block_hash: &H256) -> Result<Vec<H256>> {
453+
let substrate_block =
454+
self.eth_rpc_client.block_by_hash(substrate_block_hash).await?.ok_or(
455+
Error::InternalError(format!(
456+
"Could not find block with hash: {substrate_block_hash}"
457+
)),
458+
)?;
459+
let block = self
460+
.eth_rpc_client
461+
.evm_block(substrate_block, false)
462+
.await
463+
.ok_or(Error::InternalError("Could not convert to an evm block".to_string()))?;
464+
let tx_hashes = match block.transactions {
465+
HashesOrTransactionInfos::Hashes(hashes) => hashes,
466+
// Considering that we called evm_block with hydrated false we will
467+
// never receive TransactionInfos but handled it anyways.
468+
HashesOrTransactionInfos::TransactionInfos(infos) => {
469+
infos.iter().map(|ti| ti.hash).collect()
470+
}
471+
};
472+
Ok(tx_hashes)
473+
}
474+
}

crates/anvil-polkadot/src/api_server/server.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ use crate::{
22
api_server::{
33
ApiRequest,
44
error::{Error, Result, ToRpcResponseResult},
5-
filters::{BlockNotifications, EthFilter, Filters, LogsFilter, eviction_task},
5+
filters::{
6+
BlockFilter, BlockNotifications, EthFilter, Filters, LogsFilter,
7+
PendingTransactionsFilter, eviction_task,
8+
},
69
revive_conversions::{
710
AlloyU256, ReviveAddress, ReviveBlockId, ReviveBlockNumberOrTag, ReviveBytes,
811
ReviveFilter, ReviveTrace, ReviveTracerType, SubstrateU256,
@@ -425,7 +428,7 @@ impl ApiServer {
425428
EthRequest::EthGetFilterChanges(id) => self.get_filter_changes(&id).await,
426429
EthRequest::EthNewBlockFilter(_) => self.new_block_filter().await.to_rpc_result(),
427430
EthRequest::EthNewPendingTransactionFilter(_) => {
428-
Err::<(), _>(Error::RpcUnimplemented).to_rpc_result()
431+
self.new_pending_transactions_filter().await.to_rpc_result()
429432
}
430433
EthRequest::EthUninstallFilter(id) => self.uninstall_filter(&id).await.to_rpc_result(),
431434
_ => Err::<(), _>(Error::RpcUnimplemented).to_rpc_result(),
@@ -1376,7 +1379,9 @@ impl ApiServer {
13761379
/// Creates a filter to notify about new blocks
13771380
async fn new_block_filter(&self) -> Result<String> {
13781381
node_info!("eth_newBlockFilter");
1379-
let filter = EthFilter::Blocks(BlockNotifications::new(self.new_block_notifications()?));
1382+
let filter = EthFilter::Blocks(BlockFilter::new(BlockNotifications::new(
1383+
self.new_block_notifications()?,
1384+
)));
13801385
Ok(self.filters.add_filter(filter).await)
13811386
}
13821387

@@ -1392,16 +1397,26 @@ impl ApiServer {
13921397
self.filters.get_filter_changes(id).await
13931398
}
13941399

1400+
async fn new_pending_transactions_filter(&self) -> Result<String> {
1401+
node_info!("eth_newPendingTransactionFilter");
1402+
let filter = EthFilter::PendingTransactions(PendingTransactionsFilter::new(
1403+
BlockNotifications::new(self.new_block_notifications()?),
1404+
self.tx_pool.clone(),
1405+
self.eth_rpc_client.clone(),
1406+
));
1407+
Ok(self.filters.add_filter(filter).await)
1408+
}
1409+
13951410
async fn new_filter(&self, filter: evm::Filter) -> Result<String> {
13961411
node_info!("eth_newFilter");
1397-
let eth_filter = EthFilter::Logs(Box::new(
1412+
let eth_filter = EthFilter::Logs(
13981413
LogsFilter::new(
13991414
BlockNotifications::new(self.new_block_notifications()?),
14001415
self.eth_rpc_client.clone(),
14011416
filter,
14021417
)
14031418
.await?,
1404-
));
1419+
);
14051420
Ok(self.filters.add_filter(eth_filter).await)
14061421
}
14071422

0 commit comments

Comments
 (0)