diff --git a/crates/anvil-polkadot/src/api_server/filters.rs b/crates/anvil-polkadot/src/api_server/filters.rs index e3e71584bd8dd..90f00e6b4ba24 100644 --- a/crates/anvil-polkadot/src/api_server/filters.rs +++ b/crates/anvil-polkadot/src/api_server/filters.rs @@ -1,10 +1,23 @@ -use crate::api_server::error::{Result, ToRpcResponseResult}; +use crate::{ + api_server::{ + error::{Error, Result, ToRpcResponseResult}, + txpool_helpers::extract_tx_info, + }, + substrate_node::service::TransactionPoolHandle, +}; use anvil_core::eth::subscription::SubscriptionId; use anvil_rpc::response::ResponseResult; -use futures::{FutureExt, Stream, StreamExt}; +use futures::{FutureExt, StreamExt}; use pallet_revive_eth_rpc::client::Client as EthRpcClient; -use polkadot_sdk::pallet_revive::evm::{BlockNumberOrTag, Filter, Log}; -use std::{collections::HashMap, sync::Arc, task::Poll, time::Duration}; +use polkadot_sdk::{ + pallet_revive::evm::{BlockNumberOrTag, Filter, HashesOrTransactionInfos, Log}, + sc_service::TransactionPool, +}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::Duration, +}; use subxt::utils::H256; use tokio::{sync::Mutex, time::Instant}; use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError}; @@ -13,6 +26,8 @@ use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError}; /// Filters that haven't been polled within this duration will be evicted. pub const ACTIVE_FILTER_TIMEOUT_SECS: u64 = 60 * 5; +pub const LOG_TARGET: &str = "node::filter"; + /// Maps filter IDs to tuples of filter and deadline. type FilterMap = Arc>>; @@ -58,20 +73,24 @@ impl Filters { let mut filters = self.active_filters.lock().await; if let Some((filter, deadline)) = filters.get_mut(id) { let response = match filter { + EthFilter::Blocks(block_filter) => { + let blocks = block_filter.drain_blocks().await; + Ok(blocks).to_rpc_result() + } EthFilter::Logs(logs_filter) => { let logs = logs_filter.drain_logs().await; Ok(logs).to_rpc_result() } - _ => filter - .next() - .await - .unwrap_or_else(|| ResponseResult::success(Vec::<()>::new())), + EthFilter::PendingTransactions(tx_filter) => { + let txs = tx_filter.drain_transactions().await; + Ok(txs).to_rpc_result() + } }; *deadline = self.next_deadline(); return response; } } - warn!(target: "node::filter", "No filter found for {}", id); + warn!(target: LOG_TARGET, "No filter found for {}", id); ResponseResult::success(Vec::<()>::new()) } @@ -91,7 +110,7 @@ impl Filters { /// Removes and returns the filter associated with the given identifier. pub async fn uninstall_filter(&self, id: &str) -> Option { - trace!(target: "node::filter", "Uninstalling filter id {}", id); + trace!(target: LOG_TARGET, "Uninstalling filter id {}", id); self.active_filters.lock().await.remove(id).map(|(f, _)| f) } @@ -101,12 +120,12 @@ impl Filters { /// stale filters that haven't been polled recently. Evicted filters are permanently /// removed and cannot be recovered. pub async fn evict(&self) { - trace!(target: "node::filter", "Evicting stale filters"); + trace!(target: LOG_TARGET, "Evicting stale filters"); let now = Instant::now(); let mut active_filters = self.active_filters.lock().await; active_filters.retain(|id, (_, deadline)| { if now > *deadline { - trace!(target: "node::filter",?id, "Evicting stale filter"); + trace!(target: LOG_TARGET,?id, "Evicting stale filter"); return false; } true @@ -157,43 +176,52 @@ pub enum EthFilter { /// Emits the hash (H256) of each new block as it's added to the chain. /// Subscribers receive notifications through the broadcast channel. When polled, /// returns all block hashes produced since the last poll. - Blocks(BlockNotifications), + Blocks(BlockFilter), /// Log filter that tracks contract event logs. /// /// Filters logs based on block range, addresses, and topics. Combines historic /// logs (from the initial query range) with real-time logs from newly produced /// blocks. The filter applies topic matching with OR logic between topic alternatives /// and validates block ranges for incoming blocks. - Logs(Box), + Logs(LogsFilter), + /// Pending transactions filter that tracks new transactions. + /// + /// Returns mined transactions since last poll + transactions that are + /// ready but have not been mined yet. + PendingTransactions(PendingTransactionsFilter), } -impl Stream for EthFilter { - type Item = ResponseResult; - - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - let pin = self.get_mut(); - match pin { - Self::Blocks(block_notifications) => { - let mut new_blocks = Vec::new(); - while let Poll::Ready(Some(result)) = block_notifications.poll_next_unpin(cx) { - match result { - Ok(block_hash) => new_blocks.push(block_hash), - Err(lagged) => { - // BroadcastStream handles lagging for us - // Just log and continue - warn!(target: "node::filter", "Block filter lagged, skipped messages {:?}", lagged); - continue; - } - } +/// Filter for tracking new block hashes. +pub struct BlockFilter { + block_notifications: BlockNotifications, +} + +impl BlockFilter { + pub fn new(block_notifier: BlockNotifications) -> Self { + Self { block_notifications: block_notifier } + } + + /// Drains all new block hashes since the last poll. + /// + /// Returns all block hashes that were broadcast since the last call to this method. + /// Handles lagged notifications gracefully by logging and continuing. + async fn drain_blocks(&mut self) -> Vec { + let mut new_blocks = Vec::new(); + + while let Some(result) = self.block_notifications.next().now_or_never().flatten() { + match result { + Ok(block_hash) => new_blocks.push(block_hash), + Err(BroadcastStreamRecvError::Lagged(count)) => { + warn!( + target: LOG_TARGET, + "Block filter lagged, skipped {} block notifications", + count + ); } - Poll::Ready(Some(Ok(new_blocks).to_rpc_result())) } - // handled directly in get_filter_changes - Self::Logs(_) => Poll::Pending, } + + new_blocks } } @@ -258,7 +286,7 @@ impl LogsFilter { Ok(block_hash) => block_hashes.push(block_hash), Err(BroadcastStreamRecvError::Lagged(blocks)) => { // Channel overflowed - some blocks were skipped - warn!(target: "node::filter", "Logs filter lagged, skipped {} block notifications", blocks); + warn!(target: LOG_TARGET, "Logs filter lagged, skipped {} block notifications", blocks); // Continue draining what's left in the channel continue; } @@ -316,3 +344,131 @@ impl LogsFilter { true } } + +/// Filter for pending transactions +/// +/// Monitors the transaction pool and returns newly pending transaction hashes +/// when polled. Transactions that have been included in old blocks are automatically filtered out. +/// +/// The filter maintains state of previously seen transactions to ensure each +/// transaction is reported only once, even if it remains in the pending pool +/// across multiple polls. +pub struct PendingTransactionsFilter { + /// Set of transaction hashes already reported to the client + already_seen: HashSet, + /// Stream of new block notifications for detecting mined transactions + block_notifications: BroadcastStream, + /// Reference to the transaction pool for querying ready transactions + tx_pool: Arc, + /// Ethereum RPC client for fetching block transaction data + eth_rpc_client: EthRpcClient, +} +impl PendingTransactionsFilter { + pub fn new( + block_notifier: BroadcastStream, + tx_pool: Arc, + eth_rpc_client: EthRpcClient, + ) -> Self { + Self { + already_seen: tx_pool + .ready() + .filter_map(|tx| extract_tx_info(&tx.data).map(|(_, _, tx_info)| tx_info.hash)) + .collect(), + block_notifications: block_notifier, + tx_pool, + eth_rpc_client, + } + } + + /// Drains all new pending transaction hashes since the last poll. + /// + /// This method: + /// 1. Queries the current ready transaction pool + /// 2. Drains block notifications to identify mined transactions + /// 3. Returns only new transactions (not previously seen and not mined) + /// + /// The filter state is updated to remember all currently pending transactions, + /// ensuring they won't be reported again on subsequent polls. + async fn drain_transactions(&mut self) -> Vec { + // Get current ready transactions + let current_ready: HashSet = self + .tx_pool + .ready() + .filter_map(|tx| { + extract_tx_info(&tx.data).map(|(_, _, tx_info)| tx_info.hash).or_else(|| { + warn!(target: LOG_TARGET, "Failed to extract transaction info from ready pool"); + None + }) + }) + .collect(); + + // Get transactions that have been included in blocks already + let mut included_transactions = HashSet::new(); + while let Some(result) = self.block_notifications.next().now_or_never().flatten() { + match result { + Ok(block_hash) => match self.fetch_block_transactions(&block_hash).await { + Ok(tx_hashes) => included_transactions.extend(tx_hashes), + Err(e) => { + warn!( + target: LOG_TARGET, + "Failed to fetch transactions for block {:?}: {}", + block_hash, e + ); + } + }, + Err(BroadcastStreamRecvError::Lagged(blocks)) => { + // Channel overflowed - some blocks were skipped + warn!(target: LOG_TARGET, "Logs filter lagged, skipped {} block notifications", blocks); + // Continue draining what's left in the channel + continue; + } + } + } + + // New from pool: transactions in ready pool we haven't seen before + let new_from_pool: HashSet = + current_ready.difference(&self.already_seen).copied().collect(); + let excluded: HashSet = self.already_seen.union(&new_from_pool).copied().collect(); + let new_from_blocks: HashSet = + included_transactions.difference(&excluded).copied().collect(); + let new_pending: Vec = new_from_pool.union(&new_from_blocks).copied().collect(); + // Remove mined transactions from already_seen + for tx_hash in &included_transactions { + self.already_seen.remove(tx_hash); + } + + // Only track transactions that are still pending (not mined) + let still_pending: HashSet = + current_ready.difference(&included_transactions).copied().collect(); + self.already_seen.extend(still_pending); + new_pending + } + + /// Fetches all transaction hashes from a given block. + /// + /// Takes a substrate block hash, fetches the block, converts it to an EVM block, + /// and extracts all transaction hashes regardless of whether they're returned + /// as hashes or full transaction objects. + async fn fetch_block_transactions(&self, substrate_block_hash: &H256) -> Result> { + let substrate_block = + self.eth_rpc_client.block_by_hash(substrate_block_hash).await?.ok_or( + Error::InternalError(format!( + "Could not find block with hash: {substrate_block_hash}" + )), + )?; + let block = self + .eth_rpc_client + .evm_block(substrate_block, false) + .await + .ok_or(Error::InternalError("Could not convert to an evm block".to_string()))?; + let tx_hashes = match block.transactions { + HashesOrTransactionInfos::Hashes(hashes) => hashes, + // Considering that we called evm_block with hydrated false we will + // never receive TransactionInfos but handled it anyways. + HashesOrTransactionInfos::TransactionInfos(infos) => { + infos.iter().map(|ti| ti.hash).collect() + } + }; + Ok(tx_hashes) + } +} diff --git a/crates/anvil-polkadot/src/api_server/server.rs b/crates/anvil-polkadot/src/api_server/server.rs index 6076bc7415850..34da423efbfc6 100644 --- a/crates/anvil-polkadot/src/api_server/server.rs +++ b/crates/anvil-polkadot/src/api_server/server.rs @@ -2,7 +2,10 @@ use crate::{ api_server::{ ApiRequest, error::{Error, Result, ToRpcResponseResult}, - filters::{BlockNotifications, EthFilter, Filters, LogsFilter, eviction_task}, + filters::{ + BlockFilter, BlockNotifications, EthFilter, Filters, LogsFilter, + PendingTransactionsFilter, eviction_task, + }, revive_conversions::{ AlloyU256, ReviveAddress, ReviveBlockId, ReviveBlockNumberOrTag, ReviveBytes, ReviveFilter, ReviveTrace, ReviveTracerType, SubstrateU256, @@ -425,7 +428,7 @@ impl ApiServer { EthRequest::EthGetFilterChanges(id) => self.get_filter_changes(&id).await, EthRequest::EthNewBlockFilter(_) => self.new_block_filter().await.to_rpc_result(), EthRequest::EthNewPendingTransactionFilter(_) => { - Err::<(), _>(Error::RpcUnimplemented).to_rpc_result() + self.new_pending_transactions_filter().await.to_rpc_result() } EthRequest::EthUninstallFilter(id) => self.uninstall_filter(&id).await.to_rpc_result(), _ => Err::<(), _>(Error::RpcUnimplemented).to_rpc_result(), @@ -1376,7 +1379,9 @@ impl ApiServer { /// Creates a filter to notify about new blocks async fn new_block_filter(&self) -> Result { node_info!("eth_newBlockFilter"); - let filter = EthFilter::Blocks(BlockNotifications::new(self.new_block_notifications()?)); + let filter = EthFilter::Blocks(BlockFilter::new(BlockNotifications::new( + self.new_block_notifications()?, + ))); Ok(self.filters.add_filter(filter).await) } @@ -1392,16 +1397,26 @@ impl ApiServer { self.filters.get_filter_changes(id).await } + async fn new_pending_transactions_filter(&self) -> Result { + node_info!("eth_newPendingTransactionFilter"); + let filter = EthFilter::PendingTransactions(PendingTransactionsFilter::new( + BlockNotifications::new(self.new_block_notifications()?), + self.tx_pool.clone(), + self.eth_rpc_client.clone(), + )); + Ok(self.filters.add_filter(filter).await) + } + async fn new_filter(&self, filter: evm::Filter) -> Result { node_info!("eth_newFilter"); - let eth_filter = EthFilter::Logs(Box::new( + let eth_filter = EthFilter::Logs( LogsFilter::new( BlockNotifications::new(self.new_block_notifications()?), self.eth_rpc_client.clone(), filter, ) .await?, - )); + ); Ok(self.filters.add_filter(eth_filter).await) } diff --git a/crates/anvil-polkadot/tests/it/filters.rs b/crates/anvil-polkadot/tests/it/filters.rs index 1a442740e36c1..1514d984c5e30 100644 --- a/crates/anvil-polkadot/tests/it/filters.rs +++ b/crates/anvil-polkadot/tests/it/filters.rs @@ -955,3 +955,243 @@ async fn test_get_filter_logs_returns_all_matching_logs() { assert_eq!(all_logs.len(), 5); } + +// Pending transactions filter +#[tokio::test(flavor = "multi_thread")] +async fn test_pending_tx_filter_basic() { + let anvil_node_config = AnvilNodeConfig::test_config(); + let substrate_node_config = SubstrateNodeConfig::new(&anvil_node_config); + let mut node = TestNode::new(anvil_node_config.clone(), substrate_node_config).await.unwrap(); + unwrap_response::<()>(node.eth_rpc(EthRequest::SetAutomine(true)).await.unwrap()).unwrap(); + + let alith = Account::from(subxt_signer::eth::dev::alith()); + let baltathar = Account::from(subxt_signer::eth::dev::baltathar()); + let transfer_amount = U256::from_str_radix("100000000000", 10).unwrap(); + let transaction = TransactionRequest::default() + .value(transfer_amount) + .from(Address::from(ReviveAddress::new(alith.address()))) + .to(Address::from(ReviveAddress::new(baltathar.address()))); + + // Create a pending transaction filter + let filter_id = unwrap_response::( + node.eth_rpc(EthRequest::EthNewPendingTransactionFilter(())).await.unwrap(), + ) + .unwrap(); + + // Send a transaction (should enter pending pool) + let tx_hash = node.send_transaction(transaction.clone()).await.unwrap(); + + // Poll filter - should return the pending transaction + let pending_txs = unwrap_response::>( + node.eth_rpc(EthRequest::EthGetFilterChanges(filter_id.clone())).await.unwrap(), + ) + .unwrap(); + + assert_eq!(pending_txs.len(), 1); + assert_eq!(pending_txs[0], tx_hash); + + // Poll again - should return empty (no new pending transactions) + let pending_txs = unwrap_response::>( + node.eth_rpc(EthRequest::EthGetFilterChanges(filter_id.clone())).await.unwrap(), + ) + .unwrap(); + + assert!(pending_txs.is_empty()); + + let tx_hash = node.send_transaction(transaction.nonce(1)).await.unwrap(); + let pending_txs = unwrap_response::>( + node.eth_rpc(EthRequest::EthGetFilterChanges(filter_id)).await.unwrap(), + ) + .unwrap(); + assert_eq!(pending_txs[0], tx_hash); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_pending_tx_filter_mixed_pending_and_mined() { + let anvil_node_config = AnvilNodeConfig::test_config(); + let substrate_node_config = SubstrateNodeConfig::new(&anvil_node_config); + let mut node = TestNode::new(anvil_node_config.clone(), substrate_node_config).await.unwrap(); + + let alith = Account::from(subxt_signer::eth::dev::alith()); + let baltathar = Account::from(subxt_signer::eth::dev::baltathar()); + let transfer_amount = U256::from_str_radix("100000000000", 10).unwrap(); + let transaction = TransactionRequest::default() + .value(transfer_amount) + .from(Address::from(ReviveAddress::new(alith.address()))) + .to(Address::from(ReviveAddress::new(baltathar.address()))); + + // Create a pending transaction filter + let filter_id = unwrap_response::( + node.eth_rpc(EthRequest::EthNewPendingTransactionFilter(())).await.unwrap(), + ) + .unwrap(); + + let tx_hash1 = node.send_transaction(transaction.clone()).await.unwrap(); + let tx_hash2 = node.send_transaction(transaction.clone().nonce(1)).await.unwrap(); + // Mine block (includes tx1 and tx2) + unwrap_response::<()>(node.eth_rpc(EthRequest::Mine(None, None)).await.unwrap()).unwrap(); + let tx_hash3 = node.send_transaction(transaction.clone().nonce(2)).await.unwrap(); + let tx_hash4 = node.send_transaction(transaction.clone().nonce(3)).await.unwrap(); + + let pending_txs = unwrap_response::>( + node.eth_rpc(EthRequest::EthGetFilterChanges(filter_id)).await.unwrap(), + ) + .unwrap(); + + assert_eq!(pending_txs.len(), 4); + assert!(pending_txs.contains(&tx_hash3)); + assert!(pending_txs.contains(&tx_hash4)); + assert!(pending_txs.contains(&tx_hash1)); + assert!(pending_txs.contains(&tx_hash2)); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_pending_tx_filter_multiple_filters_independent() { + let anvil_node_config = AnvilNodeConfig::test_config(); + let substrate_node_config = SubstrateNodeConfig::new(&anvil_node_config); + let mut node = TestNode::new(anvil_node_config.clone(), substrate_node_config).await.unwrap(); + + let alith = Account::from(subxt_signer::eth::dev::alith()); + let baltathar = Account::from(subxt_signer::eth::dev::baltathar()); + let transfer_amount = U256::from_str_radix("100000000000", 10).unwrap(); + let transaction = TransactionRequest::default() + .value(transfer_amount) + .from(Address::from(ReviveAddress::new(alith.address()))) + .to(Address::from(ReviveAddress::new(baltathar.address()))); + + // Create first filter + let filter1 = unwrap_response::( + node.eth_rpc(EthRequest::EthNewPendingTransactionFilter(())).await.unwrap(), + ) + .unwrap(); + + // Send transaction + let tx_hash1 = node.send_transaction(transaction.clone()).await.unwrap(); + + // Create second filter (after tx1) + let filter2 = unwrap_response::( + node.eth_rpc(EthRequest::EthNewPendingTransactionFilter(())).await.unwrap(), + ) + .unwrap(); + + // Send another transaction + let tx_hash2 = node.send_transaction(transaction.clone().nonce(1)).await.unwrap(); + + // Poll filter1 - should see both tx1 and tx2 + let pending_txs1 = unwrap_response::>( + node.eth_rpc(EthRequest::EthGetFilterChanges(filter1.clone())).await.unwrap(), + ) + .unwrap(); + + // Poll filter2 - should see tx1 (existing) and tx2 (new) + let pending_txs2 = unwrap_response::>( + node.eth_rpc(EthRequest::EthGetFilterChanges(filter2.clone())).await.unwrap(), + ) + .unwrap(); + + assert_eq!(pending_txs1.len(), 2); + assert!(pending_txs1.contains(&tx_hash1)); + assert!(pending_txs1.contains(&tx_hash2)); + + assert_eq!(pending_txs2.len(), 1); + assert!(pending_txs2.contains(&tx_hash2)); + assert!(!pending_txs2.contains(&tx_hash1)); + + // Poll both again - should be empty + let pending_txs1 = unwrap_response::>( + node.eth_rpc(EthRequest::EthGetFilterChanges(filter1)).await.unwrap(), + ) + .unwrap(); + + let pending_txs2 = unwrap_response::>( + node.eth_rpc(EthRequest::EthGetFilterChanges(filter2)).await.unwrap(), + ) + .unwrap(); + + assert!(pending_txs1.is_empty()); + assert!(pending_txs2.is_empty()); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_pending_tx_filter_multiple_blocks_mined() { + let anvil_node_config = AnvilNodeConfig::test_config(); + let substrate_node_config = SubstrateNodeConfig::new(&anvil_node_config); + let mut node = TestNode::new(anvil_node_config.clone(), substrate_node_config).await.unwrap(); + + let alith = Account::from(subxt_signer::eth::dev::alith()); + let baltathar = Account::from(subxt_signer::eth::dev::baltathar()); + let transfer_amount = U256::from_str_radix("100000000000", 10).unwrap(); + let transaction = TransactionRequest::default() + .value(transfer_amount) + .from(Address::from(ReviveAddress::new(alith.address()))) + .to(Address::from(ReviveAddress::new(baltathar.address()))); + + // Create filter + let filter_id = unwrap_response::( + node.eth_rpc(EthRequest::EthNewPendingTransactionFilter(())).await.unwrap(), + ) + .unwrap(); + + // Send transactions + let tx_hash1 = node.send_transaction(transaction.clone()).await.unwrap(); + let tx_hash2 = node.send_transaction(transaction.clone().nonce(1)).await.unwrap(); + + // Mine first block + unwrap_response::<()>(node.eth_rpc(EthRequest::Mine(None, None)).await.unwrap()).unwrap(); + + // Send more transactions + let tx_hash3 = node.send_transaction(transaction.clone().nonce(2)).await.unwrap(); + let tx_hash4 = node.send_transaction(transaction.clone().nonce(3)).await.unwrap(); + unwrap_response::<()>(node.eth_rpc(EthRequest::Mine(Some(U256::from(4)), None)).await.unwrap()) + .unwrap(); + let pending_txs = unwrap_response::>( + node.eth_rpc(EthRequest::EthGetFilterChanges(filter_id)).await.unwrap(), + ) + .unwrap(); + + assert_eq!(pending_txs.len(), 4); + assert!(pending_txs.contains(&tx_hash1)); + assert!(pending_txs.contains(&tx_hash2)); + assert!(pending_txs.contains(&tx_hash3)); + assert!(pending_txs.contains(&tx_hash4)); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_pending_tx_filter_future_to_ready_transition() { + let anvil_node_config = AnvilNodeConfig::test_config(); + let substrate_node_config = SubstrateNodeConfig::new(&anvil_node_config); + let mut node = TestNode::new(anvil_node_config.clone(), substrate_node_config).await.unwrap(); + + let alith = Account::from(subxt_signer::eth::dev::alith()); + let baltathar = Account::from(subxt_signer::eth::dev::baltathar()); + let transfer_amount = U256::from_str_radix("100000000000", 10).unwrap(); + let transaction = TransactionRequest::default() + .value(transfer_amount) + .from(Address::from(ReviveAddress::new(alith.address()))) + .to(Address::from(ReviveAddress::new(baltathar.address()))); + // Create filter + let filter_id = unwrap_response::( + node.eth_rpc(EthRequest::EthNewPendingTransactionFilter(())).await.unwrap(), + ) + .unwrap(); + + let future_hash = node.send_transaction(transaction.clone().nonce(5)).await.unwrap(); + // Poll - should be empty (tx is in future pool, not ready pool) + let pending_txs = unwrap_response::>( + node.eth_rpc(EthRequest::EthGetFilterChanges(filter_id.clone())).await.unwrap(), + ) + .unwrap(); + assert!(pending_txs.is_empty()); + + for nonce in 0..5 { + node.send_transaction(transaction.clone().nonce(nonce)).await.unwrap(); + } + let pending_txs = unwrap_response::>( + node.eth_rpc(EthRequest::EthGetFilterChanges(filter_id)).await.unwrap(), + ) + .unwrap(); + + // Should contain the future tx + the 5 gap-filling txs + assert_eq!(pending_txs.len(), 6); + assert!(pending_txs.contains(&future_hash)); +} diff --git a/crates/anvil-polkadot/tests/it/standard_rpc.rs b/crates/anvil-polkadot/tests/it/standard_rpc.rs index 0a3dfb633619c..75a7bbd5df04e 100644 --- a/crates/anvil-polkadot/tests/it/standard_rpc.rs +++ b/crates/anvil-polkadot/tests/it/standard_rpc.rs @@ -667,7 +667,7 @@ async fn test_fee_history() { let base_fees = [1_000_000, 999981, 999962, 999944, 999925, 999906, 999888, 999869, 999851, 999832, 999832] .into_iter() - .map(|i| pallet_revive::U256::from(i)) + .map(pallet_revive::U256::from) .collect::>(); assert_eq!(base_fees, fee_history.base_fee_per_gas); }