diff --git a/src/backend.rs b/src/backend.rs index 6a46f5f..5306320 100644 --- a/src/backend.rs +++ b/src/backend.rs @@ -13,6 +13,7 @@ use alloy_rpc_types::BlockId; use eyre::WrapErr; use futures::{ channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}, + pin_mut, stream::Stream, task::{Context, Poll}, Future, FutureExt, @@ -32,10 +33,12 @@ use std::{ path::Path, pin::Pin, sync::{ + atomic::{AtomicU8, Ordering}, mpsc::{channel as oneshot_channel, Sender as OneshotSender}, Arc, }, }; +use tokio::select; /// Logged when an error is indicative that the user is trying to fork from a non-archive node. pub const NON_ARCHIVE_NODE_WARNING: &str = "\ @@ -64,6 +67,14 @@ type AddressData = AddressHashMap; type StorageData = AddressHashMap; type BlockHashData = HashMap; +/// States for tracking which account endpoints should be used when account info +const ACCOUNT_FETCH_UNCHECKED: u8 = 0; +/// Endpoints supports the non standard eth_getAccountInfo which is more efficient than sending 3 +/// separate requests +const ACCOUNT_FETCH_SUPPORTS_ACC_INFO: u8 = 1; +/// Use regular individual getCode, getNonce, getBalance calls +const ACCOUNT_FETCH_SEPARATE_REQUESTS: u8 = 2; + struct AnyRequestFuture { sender: OneshotSender>, future: Pin> + Send>>, @@ -163,6 +174,8 @@ pub struct BackendHandler

{ /// The block to fetch data from. // This is an `Option` so that we can have less code churn in the functions below block_id: Option, + /// The mode for fetching account data + account_fetch_mode: Arc, } impl

BackendHandler

@@ -185,6 +198,7 @@ where queued_requests: Default::default(), incoming: rx, block_id, + account_fetch_mode: Arc::new(AtomicU8::new(ACCOUNT_FETCH_UNCHECKED)), } } @@ -281,16 +295,100 @@ where /// returns the future that fetches the account data fn get_account_req(&self, address: Address) -> ProviderRequest { trace!(target: "backendhandler", "preparing account request, address={:?}", address); + let provider = self.provider.clone(); let block_id = self.block_id.unwrap_or_default(); - let fut = Box::pin(async move { - let balance = provider.get_balance(address).block_id(block_id).into_future(); - let nonce = provider.get_transaction_count(address).block_id(block_id).into_future(); - let code = provider.get_code_at(address).block_id(block_id).into_future(); - let resp = tokio::try_join!(balance, nonce, code).map_err(Into::into); - (resp, address) - }); - ProviderRequest::Account(fut) + let mode = Arc::clone(&self.account_fetch_mode); + let fut = async move { + // depending on the tracked mode we can dispatch requests. + let initial_mode = mode.load(Ordering::Relaxed); + match initial_mode { + ACCOUNT_FETCH_UNCHECKED => { + // single request for accountinfo object + let acc_info_fut = + provider.get_account_info(address).block_id(block_id).into_future(); + + // tri request for account info + let balance_fut = + provider.get_balance(address).block_id(block_id).into_future(); + let nonce_fut = + provider.get_transaction_count(address).block_id(block_id).into_future(); + let code_fut = provider.get_code_at(address).block_id(block_id).into_future(); + let triple_fut = futures::future::try_join3(balance_fut, nonce_fut, code_fut); + pin_mut!(acc_info_fut, triple_fut); + + select! { + acc_info = &mut acc_info_fut => { + match acc_info { + Ok(info) => { + trace!(target: "backendhandler", "endpoint supports eth_getAccountInfo"); + mode.store(ACCOUNT_FETCH_SUPPORTS_ACC_INFO, Ordering::Relaxed); + Ok((info.balance, info.nonce, info.code)) + } + Err(err) => { + trace!(target: "backendhandler", ?err, "failed initial eth_getAccountInfo call"); + mode.store(ACCOUNT_FETCH_SEPARATE_REQUESTS, Ordering::Relaxed); + Ok(triple_fut.await?) + } + } + } + triple = &mut triple_fut => { + match triple { + Ok((balance, nonce, code)) => { + mode.store(ACCOUNT_FETCH_SEPARATE_REQUESTS, Ordering::Relaxed); + Ok((balance, nonce, code)) + } + Err(err) => Err(err.into()) + } + } + } + } + + ACCOUNT_FETCH_SUPPORTS_ACC_INFO => { + let mut res = provider + .get_account_info(address) + .block_id(block_id) + .into_future() + .await + .map(|info| (info.balance, info.nonce, info.code)); + + // it's possible that the configured endpoint load balances requests to multiple + // instances and not all support that endpoint so we should reset here + if res.is_err() { + mode.store(ACCOUNT_FETCH_SEPARATE_REQUESTS, Ordering::Relaxed); + + let balance_fut = + provider.get_balance(address).block_id(block_id).into_future(); + let nonce_fut = provider + .get_transaction_count(address) + .block_id(block_id) + .into_future(); + let code_fut = + provider.get_code_at(address).block_id(block_id).into_future(); + res = futures::future::try_join3(balance_fut, nonce_fut, code_fut).await; + } + + Ok(res?) + } + + ACCOUNT_FETCH_SEPARATE_REQUESTS => { + let balance_fut = + provider.get_balance(address).block_id(block_id).into_future(); + let nonce_fut = + provider.get_transaction_count(address).block_id(block_id).into_future(); + let code_fut = provider.get_code_at(address).block_id(block_id).into_future(); + + Ok(futures::future::try_join3(balance_fut, nonce_fut, code_fut).await?) + } + + _ => unreachable!("Invalid account fetch mode"), + } + }; + + ProviderRequest::Account(Box::pin(async move { + let result = fut.await; + (result, address) + })) } /// process a request for an account