diff --git a/Cargo.toml b/Cargo.toml index 19b2c45..c6af45b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,6 @@ integration = [] [dependencies] init4-bin-base = { version = "0.3.4", features = ["perms"] } -signet-bundle = { git = "https://github.com/init4tech/signet-sdk", rev = "b8251ff0fec7cb14ca87e6f95c14f56bc2593049" } signet-constants = { git = "https://github.com/init4tech/signet-sdk", rev = "b8251ff0fec7cb14ca87e6f95c14f56bc2593049" } signet-sim = { git = "https://github.com/init4tech/signet-sdk", rev = "b8251ff0fec7cb14ca87e6f95c14f56bc2593049" } signet-tx-cache = { git = "https://github.com/init4tech/signet-sdk", rev = "b8251ff0fec7cb14ca87e6f95c14f56bc2593049" } @@ -56,6 +55,5 @@ serde_json = "1.0" tokio = { version = "1.36.0", features = ["full", "macros", "rt-multi-thread"] } oauth2 = "5" -chrono = "0.4.41" tokio-stream = "0.1.17" url = "2.5.4" diff --git a/bin/builder.rs b/bin/builder.rs index 884a731..7ee26fa 100644 --- a/bin/builder.rs +++ b/bin/builder.rs @@ -1,18 +1,14 @@ use builder::{ config::BuilderConfig, service::serve_builder, - tasks::{ - block::sim::Simulator, - cache::{BundlePoller, TxPoller}, - metrics::MetricsTask, - submit::SubmitTask, - }, + tasks::{block::sim::Simulator, metrics::MetricsTask, submit::SubmitTask}, +}; +use init4_bin_base::{ + deps::tracing::{info, info_span}, + utils::from_env::FromEnv, }; -use init4_bin_base::{deps::tracing, utils::from_env::FromEnv}; -use signet_sim::SimCache; use signet_types::constants::SignetSystemConstants; use tokio::select; -use tracing::info_span; // Note: Must be set to `multi_thread` to support async tasks. // See: https://docs.rs/tokio/latest/tokio/attr.main.html @@ -21,40 +17,39 @@ async fn main() -> eyre::Result<()> { let _guard = init4_bin_base::init4(); let init_span_guard = info_span!("builder initialization"); + // Pull the configuration from the environment let config = BuilderConfig::from_env()?.clone(); let constants = SignetSystemConstants::pecorino(); - let token = config.oauth_token(); + // Spawn the EnvTask + let env_task = config.env_task(); + let (block_env, env_jh) = env_task.spawn(); + + // Spawn the cache system + let cache_system = config.spawn_cache_system(block_env.clone()); + + // Prep providers and contracts let (host_provider, quincey) = tokio::try_join!(config.connect_host_provider(), config.connect_quincey())?; let ru_provider = config.connect_ru_provider(); - let zenith = config.connect_zenith(host_provider.clone()); + // Set up the metrics task let metrics = MetricsTask { host_provider }; let (tx_channel, metrics_jh) = metrics.spawn(); + // Make a Tx submission task let submit = SubmitTask { zenith, quincey, config: config.clone(), outbound_tx_channel: tx_channel }; - let tx_poller = TxPoller::new(&config); - let (tx_receiver, tx_poller_jh) = tx_poller.spawn(); - - let bundle_poller = BundlePoller::new(&config, token); - let (bundle_receiver, bundle_poller_jh) = bundle_poller.spawn(); - + // Set up tx submission let (submit_channel, submit_jh) = submit.spawn(); - let sim_items = SimCache::new(); - let slot_calculator = config.slot_calculator; - - let sim = Simulator::new(&config, ru_provider.clone(), slot_calculator); - - let (basefee_jh, sim_cache_jh) = - sim.spawn_cache_tasks(tx_receiver, bundle_receiver, sim_items.clone()); - - let build_jh = sim.spawn_simulator_task(constants, sim_items.clone(), submit_channel); + // Set up the simulator + let sim = Simulator::new(&config, ru_provider.clone(), block_env); + let build_jh = sim.spawn_simulator_task(constants, cache_system.sim_cache, submit_channel); + // Start the healthcheck server let server = serve_builder(([0, 0, 0, 0], config.builder_port)); // We have finished initializing the builder, so we can drop the init span @@ -62,33 +57,34 @@ async fn main() -> eyre::Result<()> { drop(init_span_guard); select! { - _ = tx_poller_jh => { - tracing::info!("tx_poller finished"); + + _ = env_jh => { + info!("env task finished"); }, - _ = bundle_poller_jh => { - tracing::info!("bundle_poller finished"); + _ = cache_system.cache_task => { + info!("cache task finished"); + }, + _ = cache_system.tx_poller => { + info!("tx_poller finished"); + }, + _ = cache_system.bundle_poller => { + info!("bundle_poller finished"); }, - _ = sim_cache_jh => { - tracing::info!("sim cache task finished"); - } - _ = basefee_jh => { - tracing::info!("basefee task finished"); - } _ = submit_jh => { - tracing::info!("submit finished"); + info!("submit finished"); }, _ = metrics_jh => { - tracing::info!("metrics finished"); + info!("metrics finished"); }, _ = build_jh => { - tracing::info!("build finished"); + info!("build finished"); } _ = server => { - tracing::info!("server finished"); + info!("server finished"); } } - tracing::info!("shutting down"); + info!("shutting down"); Ok(()) } diff --git a/src/config.rs b/src/config.rs index 88e441c..00cb55a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,4 +1,11 @@ -use crate::quincey::Quincey; +use crate::{ + quincey::Quincey, + tasks::{ + block::cfg::SignetCfgEnv, + cache::{BundlePoller, CacheSystem, CacheTask, TxPoller}, + env::EnvTask, + }, +}; use alloy::{ network::{Ethereum, EthereumWallet}, primitives::Address, @@ -21,6 +28,8 @@ use init4_bin_base::{ }; use signet_zenith::Zenith; use std::borrow::Cow; +use tokio::sync::watch; +use trevm::revm::context::BlockEnv; /// Type alias for the provider used to simulate against rollup state. pub type RuProvider = RootProvider; @@ -132,7 +141,7 @@ pub struct BuilderConfig { )] pub tx_pool_cache_duration: u64, - /// Oauth2 configuration for the builder to connect to ini4 services. + /// Oauth2 configuration for the builder to connect to init4 services. pub oauth: OAuthConfig, /// The max number of simultaneous block simulations to run. @@ -166,8 +175,13 @@ impl BuilderConfig { /// Connect to the Rollup rpc provider. pub fn connect_ru_provider(&self) -> RootProvider { - let url = url::Url::parse(&self.ru_rpc_url).expect("failed to parse URL"); - RootProvider::::new_http(url) + static ONCE: std::sync::OnceLock> = std::sync::OnceLock::new(); + + ONCE.get_or_init(|| { + let url = url::Url::parse(&self.ru_rpc_url).expect("failed to parse URL"); + RootProvider::new_http(url) + }) + .clone() } /// Connect to the Host rpc provider. @@ -222,4 +236,36 @@ impl BuilderConfig { Ok(Quincey::new_remote(client, url, token)) } + + /// Create an [`EnvTask`] using this config. + pub fn env_task(&self) -> EnvTask { + let provider = self.connect_ru_provider(); + EnvTask::new(self.clone(), provider) + } + + /// Spawn a new [`CacheSystem`] using this config. This contains the + /// joinhandles for [`TxPoller`] and [`BundlePoller`] and [`CacheTask`], as + /// well as the [`SimCache`] and the block env watcher. + /// + /// [`SimCache`]: signet_sim::SimCache + pub fn spawn_cache_system(&self, block_env: watch::Receiver>) -> CacheSystem { + // Tx Poller pulls transactions from the cache + let tx_poller = TxPoller::new(self); + let (tx_receiver, tx_poller) = tx_poller.spawn(); + + // Bundle Poller pulls bundles from the cache + let bundle_poller = BundlePoller::new(self, self.oauth_token()); + let (bundle_receiver, bundle_poller) = bundle_poller.spawn(); + + // Set up the cache task + let cache_task = CacheTask::new(block_env.clone(), bundle_receiver, tx_receiver); + let (sim_cache, cache_task) = cache_task.spawn(); + + CacheSystem { cache_task, tx_poller, bundle_poller, sim_cache } + } + + /// Create a [`SignetCfgEnv`] using this config. + pub const fn cfg_env(&self) -> SignetCfgEnv { + SignetCfgEnv { chain_id: self.ru_chain_id } + } } diff --git a/src/tasks/block/cfg.rs b/src/tasks/block/cfg.rs index 07d6536..3570a42 100644 --- a/src/tasks/block/cfg.rs +++ b/src/tasks/block/cfg.rs @@ -1,21 +1,14 @@ //! This file implements the [`trevm::Cfg`] and [`trevm::Block`] traits for Pecorino blocks. -use alloy::primitives::{Address, B256, FixedBytes, U256}; -use trevm::{ - Block, - revm::{ - context::{BlockEnv, CfgEnv}, - context_interface::block::BlobExcessGasAndPrice, - primitives::hardfork::SpecId, - }, -}; - -use crate::config::BuilderConfig; +use trevm::revm::{context::CfgEnv, primitives::hardfork::SpecId}; /// PecorinoCfg holds network-level configuration values. #[derive(Debug, Clone, Copy)] -pub struct PecorinoCfg {} +pub struct SignetCfgEnv { + /// The chain ID. + pub chain_id: u64, +} -impl trevm::Cfg for PecorinoCfg { +impl trevm::Cfg for SignetCfgEnv { /// Fills the configuration environment with Pecorino-specific values. /// /// # Arguments @@ -24,76 +17,7 @@ impl trevm::Cfg for PecorinoCfg { fn fill_cfg_env(&self, cfg_env: &mut CfgEnv) { let CfgEnv { chain_id, spec, .. } = cfg_env; - *chain_id = signet_constants::pecorino::RU_CHAIN_ID; + *chain_id = self.chain_id; *spec = SpecId::default(); } } - -/// PecorinoBlockEnv holds block-level configurations for Pecorino blocks. -#[derive(Debug, Clone, Copy)] -pub struct PecorinoBlockEnv { - /// The block number for this block. - pub number: u64, - /// The address the block reward should be sent to. - pub beneficiary: Address, - /// Timestamp for the block. - pub timestamp: u64, - /// The gas limit for this block environment. - pub gas_limit: u64, - /// The basefee to use for calculating gas usage. - pub basefee: u64, - /// The prevrandao to use for this block. - pub prevrandao: Option>, -} - -/// Implements [`trevm::Block`] for the Pecorino block. -impl Block for PecorinoBlockEnv { - /// Fills the block environment with the Pecorino specific values - fn fill_block_env(&self, block_env: &mut BlockEnv) { - // Destructure the fields off of the block_env and modify them - let BlockEnv { - number, - beneficiary, - timestamp, - gas_limit, - basefee, - difficulty, - prevrandao, - blob_excess_gas_and_price, - } = block_env; - *number = self.number; - *beneficiary = self.beneficiary; - *timestamp = self.timestamp; - *gas_limit = self.gas_limit; - *basefee = self.basefee; - *prevrandao = self.prevrandao; - - // NB: The following fields are set to sane defaults because they - // are not supported by the rollup - *difficulty = U256::ZERO; - *blob_excess_gas_and_price = - Some(BlobExcessGasAndPrice { excess_blob_gas: 0, blob_gasprice: 0 }); - } -} - -impl PecorinoBlockEnv { - /// Returns a new PecorinoBlockEnv with the specified values. - /// - /// # Arguments - /// - /// - config: The BuilderConfig for the builder. - /// - number: The block number of this block, usually the latest block number plus 1, - /// unless simulating blocks in the past. - /// - timestamp: The timestamp of the block, typically set to the deadline of the - /// block building task. - pub fn new(config: BuilderConfig, number: u64, timestamp: u64, basefee: u64) -> Self { - PecorinoBlockEnv { - number, - beneficiary: config.builder_rewards_address, - timestamp, - gas_limit: config.rollup_block_gas_limit, - basefee, - prevrandao: Some(B256::random()), - } - } -} diff --git a/src/tasks/block/sim.rs b/src/tasks/block/sim.rs index 909db6b..dad6482 100644 --- a/src/tasks/block/sim.rs +++ b/src/tasks/block/sim.rs @@ -1,43 +1,30 @@ //! `block.rs` contains the Simulator and everything that wires it into an //! actor that handles the simulation of a stream of bundles and transactions //! and turns them into valid Pecorino blocks for network submission. -use super::cfg::PecorinoBlockEnv; -use crate::{ - config::{BuilderConfig, RuProvider}, - tasks::{block::cfg::PecorinoCfg, cache::Bundle}, -}; -use alloy::{ - consensus::TxEnvelope, - eips::{BlockId, BlockNumberOrTag::Latest}, - network::Ethereum, - providers::Provider, -}; -use chrono::{DateTime, Utc}; -use eyre::{Context, bail}; +use crate::config::{BuilderConfig, RuProvider}; +use alloy::{eips::BlockId, network::Ethereum, providers::Provider}; use init4_bin_base::{ - deps::tracing::{debug, error, info, warn}, + deps::tracing::{debug, error}, utils::calc::SlotCalculator, }; use signet_sim::{BlockBuild, BuiltBlock, SimCache}; use signet_types::constants::SignetSystemConstants; -use std::{ +use std::time::{Duration, Instant}; +use tokio::{ sync::{ - Arc, - atomic::{AtomicU64, Ordering}, + mpsc::{self}, + watch, }, - time::{Duration, Instant, SystemTime, UNIX_EPOCH}, -}; -use tokio::{ - select, - sync::mpsc::{self}, task::JoinHandle, - time::sleep, }; use trevm::revm::{ + context::BlockEnv, database::{AlloyDB, WrapDatabaseAsync}, inspector::NoOpInspector, }; +type AlloyDatabaseProvider = WrapDatabaseAsync>; + /// `Simulator` is responsible for periodically building blocks and submitting them for /// signing and inclusion in the blockchain. It wraps a rollup provider and a slot /// calculator with a builder configuration. @@ -47,11 +34,10 @@ pub struct Simulator { pub config: BuilderConfig, /// A provider that cannot sign transactions, used for interacting with the rollup. pub ru_provider: RuProvider, - /// The slot calculator for determining when to wake up and build blocks. - pub slot_calculator: SlotCalculator, -} -type AlloyDatabaseProvider = WrapDatabaseAsync>; + /// The block configuration environment on which to simulate + pub block_env: watch::Receiver>, +} impl Simulator { /// Creates a new `Simulator` instance. @@ -60,7 +46,6 @@ impl Simulator { /// /// - `config`: The configuration for the builder. /// - `ru_provider`: A provider for interacting with the rollup. - /// - `slot_calculator`: A slot calculator for managing block timing. /// /// # Returns /// @@ -68,9 +53,14 @@ impl Simulator { pub fn new( config: &BuilderConfig, ru_provider: RuProvider, - slot_calculator: SlotCalculator, + block_env: watch::Receiver>, ) -> Self { - Self { config: config.clone(), ru_provider, slot_calculator } + Self { config: config.clone(), ru_provider, block_env } + } + + /// Get the slot calculator. + pub const fn slot_calculator(&self) -> &SlotCalculator { + &self.config.slot_calculator } /// Handles building a single block. @@ -89,14 +79,14 @@ impl Simulator { constants: SignetSystemConstants, sim_items: SimCache, finish_by: Instant, - block: PecorinoBlockEnv, + block: BlockEnv, ) -> eyre::Result { let db = self.create_db().await.unwrap(); let block_build: BlockBuild<_, NoOpInspector> = BlockBuild::new( db, constants, - PecorinoCfg {}, + self.config.cfg_env(), block, finish_by, self.config.concurrency_limit, @@ -110,85 +100,6 @@ impl Simulator { Ok(block) } - /// Spawns two tasks: one to handle incoming transactions and bundles, - /// adding them to the simulation cache, and one to track the latest basefee. - /// - /// # Arguments - /// - /// - `tx_receiver`: A channel receiver for incoming transactions. - /// - `bundle_receiver`: A channel receiver for incoming bundles. - /// - `cache`: The simulation cache to store the received items. - /// - /// # Returns - /// - /// A `JoinHandle` for the basefee updater and a `JoinHandle` for the - /// cache handler. - pub fn spawn_cache_tasks( - &self, - tx_receiver: mpsc::UnboundedReceiver, - bundle_receiver: mpsc::UnboundedReceiver, - cache: SimCache, - ) -> (JoinHandle<()>, JoinHandle<()>) { - debug!("starting up cache handler"); - - let basefee_price = Arc::new(AtomicU64::new(0_u64)); - let basefee_reader = Arc::clone(&basefee_price); - let fut = self.basefee_updater_fut(basefee_price); - - // Update the basefee on a per-block cadence - let basefee_jh = tokio::spawn(fut); - - // Update the sim cache whenever a transaction or bundle is received with respect to the basefee - let cache_jh = tokio::spawn(async move { - cache_updater(tx_receiver, bundle_receiver, cache, basefee_reader).await - }); - - (basefee_jh, cache_jh) - } - - /// Periodically updates the shared basefee by querying the latest block. - /// - /// This function calculates the remaining time until the next slot, - /// sleeps until that time, and then retrieves the latest basefee from the rollup provider. - /// The updated basefee is stored in the provided `AtomicU64`. - /// - /// This function runs continuously. - /// - /// # Arguments - /// - /// - `price`: A shared `Arc` used to store the updated basefee value. - fn basefee_updater_fut(&self, price: Arc) -> impl Future + use<> { - let slot_calculator = self.slot_calculator; - let ru_provider = self.ru_provider.clone(); - - async move { - debug!("starting basefee updater"); - loop { - // calculate start of next slot plus a small buffer - let time_remaining = slot_calculator.slot_duration() - - slot_calculator.current_timepoint_within_slot() - + 1; - debug!(time_remaining = ?time_remaining, "basefee updater sleeping until next slot"); - - // wait until that point in time - sleep(Duration::from_secs(time_remaining)).await; - - // update the basefee with that price - let resp = ru_provider.get_block_by_number(Latest).await.inspect_err(|e| { - error!(error = %e, "RPC error during basefee update"); - }); - - if let Ok(Some(block)) = resp { - let basefee = block.header.base_fee_per_gas.unwrap_or(0); - price.store(basefee, Ordering::Relaxed); - debug!(basefee = basefee, "basefee updated"); - } else { - warn!("get basefee failed - an error likely occurred"); - } - } - } - } - /// Spawns the simulator task, which handles the setup and sets the deadline /// for the each round of simulation. /// @@ -227,7 +138,7 @@ impl Simulator { /// - `cache`: The simulation cache containing transactions and bundles. /// - `submit_sender`: A channel sender used to submit built blocks. async fn run_simulator( - self, + mut self, constants: SignetSystemConstants, cache: SimCache, submit_sender: mpsc::UnboundedSender, @@ -236,14 +147,16 @@ impl Simulator { let sim_cache = cache.clone(); let finish_by = self.calculate_deadline(); - let block_env = match self.next_block_env(finish_by).await { - Ok(block) => block, - Err(err) => { - error!(err = %err, "failed to configure next block"); - break; - } - }; - info!(block_env = ?block_env, "created block"); + // Wait for the block environment to be set + if self.block_env.changed().await.is_err() { + error!("block_env channel closed"); + return; + } + + // If no env, skip this run + let Some(block_env) = self.block_env.borrow_and_update().clone() else { return }; + + debug!(block_env = ?block_env, "building on block"); match self.handle_build(constants, sim_cache, finish_by, block_env).await { Ok(block) => { @@ -265,13 +178,19 @@ impl Simulator { /// An `Instant` representing the simulation deadline, as calculated by determining /// the time left in the current slot and adding that to the current timestamp in UNIX seconds. pub fn calculate_deadline(&self) -> Instant { - // Calculate the current timestamp in seconds since the UNIX epoch - let now = SystemTime::now(); - let unix_seconds = now.duration_since(UNIX_EPOCH).expect("Time went backwards").as_secs(); - // Calculate the time remaining in the current slot - let remaining = self.slot_calculator.calculate_timepoint_within_slot(unix_seconds); - // Deadline is equal to the start of the next slot plus the time remaining in this slot - Instant::now() + Duration::from_secs(remaining) + // Get the current timepoint within the slot. + let timepoint = self.slot_calculator().current_timepoint_within_slot(); + + // We have the timepoint in seconds into the slot. To find out what's + // remaining, we need to subtract it from the slot duration + let remaining = self.slot_calculator().slot_duration() - timepoint; + + // We add a 1500 ms buffer to account for sequencer stopping signing. + + let candidate = + Instant::now() + Duration::from_secs(remaining) - Duration::from_millis(1500); + + candidate.max(Instant::now()) } /// Creates an `AlloyDB` instance from the rollup provider. @@ -300,106 +219,4 @@ impl Simulator { let wrapped_db: AlloyDatabaseProvider = WrapDatabaseAsync::new(alloy_db).unwrap(); Some(wrapped_db) } - - /// Prepares the next block environment. - /// - /// Prepares the next block environment to load into the simulator by fetching the latest block number, - /// assigning the correct next block number, checking the basefee, and setting the timestamp, - /// reward address, and gas configuration for the block environment based on builder configuration. - /// - /// # Arguments - /// - /// - finish_by: The deadline at which block simulation will end. - async fn next_block_env(&self, finish_by: Instant) -> eyre::Result { - let remaining = finish_by.duration_since(Instant::now()); - let finish_time = SystemTime::now() + remaining; - let deadline: DateTime = finish_time.into(); - debug!(deadline = %deadline, "preparing block env"); - - // Fetch the latest block number and increment it by 1 - let latest_block_number = match self.ru_provider.get_block_number().await { - Ok(num) => num, - Err(err) => { - error!(%err, "RPC error during block build"); - bail!(err) - } - }; - debug!(next_block_num = latest_block_number + 1, "preparing block env"); - - // Fetch the basefee from previous block to calculate gas for this block - let basefee = match self.get_basefee().await? { - Some(basefee) => basefee, - None => { - warn!("get basefee failed - RPC error likely occurred"); - todo!() - } - }; - debug!(basefee = basefee, "setting basefee"); - - // Craft the Block environment to pass to the simulator - let block_env = PecorinoBlockEnv::new( - self.config.clone(), - latest_block_number + 1, - deadline.timestamp() as u64, - basefee, - ); - debug!(block_env = ?block_env, "prepared block env"); - - Ok(block_env) - } - - /// Returns the basefee of the latest block. - /// - /// # Returns - /// - /// The basefee of the previous (latest) block if the request was successful, - /// or a sane default if the RPC failed. - async fn get_basefee(&self) -> eyre::Result> { - let Some(block) = - self.ru_provider.get_block_by_number(Latest).await.wrap_err("basefee error")? - else { - return Ok(None); - }; - - debug!(basefee = ?block.header.base_fee_per_gas, "basefee found"); - Ok(block.header.base_fee_per_gas) - } -} - -/// Continuously updates the simulation cache with incoming transactions and bundles. -/// -/// This function listens for new transactions and bundles on their respective -/// channels and adds them to the simulation cache using the latest observed basefee. -/// -/// # Arguments -/// -/// - `tx_receiver`: A receiver channel for incoming Ethereum transactions. -/// - `bundle_receiver`: A receiver channel for incoming transaction bundles. -/// - `cache`: The simulation cache used to store transactions and bundles. -/// - `price_reader`: An `Arc` providing the latest basefee for simulation pricing. -async fn cache_updater( - mut tx_receiver: mpsc::UnboundedReceiver< - alloy::consensus::EthereumTxEnvelope, - >, - mut bundle_receiver: mpsc::UnboundedReceiver, - cache: SimCache, - price_reader: Arc, -) -> ! { - loop { - let p = price_reader.load(Ordering::Relaxed); - select! { - maybe_tx = tx_receiver.recv() => { - if let Some(tx) = maybe_tx { - debug!(tx = ?tx.hash(), "received transaction"); - cache.add_item(tx, p); - } - } - maybe_bundle = bundle_receiver.recv() => { - if let Some(bundle) = maybe_bundle { - debug!(bundle = ?bundle.id, "received bundle"); - cache.add_item(bundle.bundle, p); - } - } - } - } } diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index b8232b9..e07c151 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -6,30 +6,13 @@ use init4_bin_base::{ }; use oauth2::TokenResponse; use reqwest::{Client, Url}; -use serde::{Deserialize, Serialize}; -use signet_bundle::SignetEthBundle; +use signet_tx_cache::types::{TxCacheBundle, TxCacheBundlesResponse}; use tokio::{ sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, task::JoinHandle, time::{self, Duration}, }; -/// Holds a bundle from the cache with a unique ID and a Zenith bundle. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct Bundle { - /// Cache identifier for the bundle. - pub id: String, - /// The corresponding Signet bundle. - pub bundle: SignetEthBundle, -} - -/// Response from the tx-pool containing a list of bundles. -#[derive(Debug, Clone, Serialize, Deserialize)] -struct TxPoolBundleResponse { - /// Bundle responses are available on the bundles property. - pub bundles: Vec, -} - /// The BundlePoller polls the tx-pool for bundles. #[derive(Debug)] pub struct BundlePoller { @@ -60,7 +43,7 @@ impl BundlePoller { } /// Fetches bundles from the transaction cache and returns them. - pub async fn check_bundle_cache(&mut self) -> eyre::Result> { + pub async fn check_bundle_cache(&mut self) -> eyre::Result> { let bundle_url: Url = Url::parse(&self.config.tx_pool_url)?.join("bundles")?; let Some(token) = self.token.read() else { warn!("No token available, skipping bundle fetch"); @@ -75,7 +58,7 @@ impl BundlePoller { .error_for_status()? .json() .await - .map(|resp: TxPoolBundleResponse| resp.bundles) + .map(|resp: TxCacheBundlesResponse| resp.bundles) .map_err(Into::into) } @@ -84,7 +67,7 @@ impl BundlePoller { Duration::from_millis(self.poll_interval_ms) } - async fn task_future(mut self, outbound: UnboundedSender) { + async fn task_future(mut self, outbound: UnboundedSender) { loop { let span = debug_span!("BundlePoller::loop", url = %self.config.tx_pool_url); @@ -119,7 +102,7 @@ impl BundlePoller { } /// Spawns a task that sends bundles it finds to its channel sender. - pub fn spawn(self) -> (UnboundedReceiver, JoinHandle<()>) { + pub fn spawn(self) -> (UnboundedReceiver, JoinHandle<()>) { let (outbound, inbound) = unbounded_channel(); let jh = tokio::spawn(self.task_future(outbound)); diff --git a/src/tasks/cache/mod.rs b/src/tasks/cache/mod.rs index e1f0165..541e884 100644 --- a/src/tasks/cache/mod.rs +++ b/src/tasks/cache/mod.rs @@ -5,4 +5,23 @@ mod tx; pub use tx::TxPoller; mod bundle; -pub use bundle::{Bundle, BundlePoller}; +pub use bundle::BundlePoller; + +use signet_sim::SimCache; +use tokio::task::JoinHandle; + +/// Cache tasks for the block builder. +#[derive(Debug)] +pub struct CacheSystem { + /// The cache task. + pub cache_task: JoinHandle<()>, + + /// The transaction poller task. + pub tx_poller: JoinHandle<()>, + + /// The bundle poller task. + pub bundle_poller: JoinHandle<()>, + + /// The sim cache. + pub sim_cache: SimCache, +} diff --git a/src/tasks/cache/task.rs b/src/tasks/cache/task.rs index 2e88a7b..5c4fea5 100644 --- a/src/tasks/cache/task.rs +++ b/src/tasks/cache/task.rs @@ -15,9 +15,6 @@ use trevm::revm::context::BlockEnv; /// the environment changes. #[derive(Debug)] pub struct CacheTask { - /// The shared sim cache to populate. - cache: SimCache, - /// The channel to receive the block environment. env: watch::Receiver>, @@ -28,7 +25,16 @@ pub struct CacheTask { } impl CacheTask { - async fn task_future(mut self) { + /// Create a new cache task with the given cache and channels. + pub const fn new( + env: watch::Receiver>, + bundles: mpsc::UnboundedReceiver, + txns: mpsc::UnboundedReceiver, + ) -> Self { + Self { env, bundles, txns } + } + + async fn task_future(mut self, cache: SimCache) { loop { let mut basefee = 0; tokio::select! { @@ -41,24 +47,26 @@ impl CacheTask { if let Some(env) = self.env.borrow_and_update().as_ref() { basefee = env.basefee; info!(basefee, number = env.number, timestamp = env.timestamp, "block env changed, clearing cache"); - self.cache.clean( + cache.clean( env.number, env.timestamp ); } } Some(bundle) = self.bundles.recv() => { - self.cache.add_item(bundle.bundle, basefee); + cache.add_item(bundle.bundle, basefee); } Some(txn) = self.txns.recv() => { - self.cache.add_item(txn, basefee); + cache.add_item(txn, basefee); } } } } /// Spawn the cache task. - pub fn spawn(self) -> JoinHandle<()> { - let fut = self.task_future(); - tokio::spawn(fut) + pub fn spawn(self) -> (SimCache, JoinHandle<()>) { + let sim_cache = SimCache::default(); + let c = sim_cache.clone(); + let fut = self.task_future(sim_cache); + (c, tokio::spawn(fut)) } } diff --git a/src/tasks/env.rs b/src/tasks/env.rs index 75cf4d4..448d2c8 100644 --- a/src/tasks/env.rs +++ b/src/tasks/env.rs @@ -7,7 +7,7 @@ use alloy::{ }; use init4_bin_base::deps::tracing::{self, Instrument, debug, error, info_span}; use std::time::Duration; -use tokio::sync::watch; +use tokio::{sync::watch, task::JoinHandle}; use tokio_stream::StreamExt; use trevm::revm::{context::BlockEnv, context_interface::block::BlobExcessGasAndPrice}; @@ -25,7 +25,7 @@ impl EnvTask { } /// Construct a BlockEnv by making calls to the provider. - pub fn construct_block_env(&self, previous: &Header) -> BlockEnv { + fn construct_block_env(&self, previous: &Header) -> BlockEnv { BlockEnv { number: previous.number + 1, beneficiary: self.config.builder_rewards_address, @@ -45,7 +45,7 @@ impl EnvTask { } /// Construct the BlockEnv and send it to the sender. - pub async fn task_fut(self, sender: watch::Sender>) { + async fn task_fut(self, sender: watch::Sender>) { let span = info_span!("EnvTask::task_fut::init"); let mut poller = match self.provider.watch_blocks().instrument(span.clone()).await { Ok(poller) => poller, @@ -96,22 +96,24 @@ impl EnvTask { } }; span.record("number", previous.number); + debug!("retrieved latest block"); let env = self.construct_block_env(&previous); debug!(?env, "constructed block env"); if sender.send(Some(env)).is_err() { // The receiver has been dropped, so we can stop the task. + debug!("receiver dropped, stopping task"); break; } } } /// Spawn the task and return a watch::Receiver for the BlockEnv. - pub fn spawn(self) -> watch::Receiver> { + pub fn spawn(self) -> (watch::Receiver>, JoinHandle<()>) { let (sender, receiver) = watch::channel(None); let fut = self.task_fut(sender); - tokio::spawn(fut); + let jh = tokio::spawn(fut); - receiver + (receiver, jh) } } diff --git a/src/tasks/metrics.rs b/src/tasks/metrics.rs index c940717..8d099ec 100644 --- a/src/tasks/metrics.rs +++ b/src/tasks/metrics.rs @@ -18,6 +18,11 @@ pub struct MetricsTask { } impl MetricsTask { + /// Create a new MetricsTask with the given provider + pub const fn new(host_provider: HostProvider) -> Self { + Self { host_provider } + } + /// Given a transaction hash, record metrics on the result of the /// transaction mining pub fn log_tx(&self, tx_hash: TxHash) -> impl Future + use<> { diff --git a/src/test_utils.rs b/src/test_utils.rs index 63e2ee6..e7d81cd 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -1,11 +1,10 @@ //! Test utilities for testing builder tasks -use crate::{config::BuilderConfig, tasks::block::cfg::PecorinoBlockEnv}; +use crate::config::BuilderConfig; use alloy::{ consensus::{SignableTransaction, TxEip1559, TxEnvelope}, - primitives::{Address, FixedBytes, TxKind, U256}, + primitives::{Address, B256, TxKind, U256}, signers::{SignerSync, local::PrivateKeySigner}, }; -use chrono::{DateTime, Utc}; use eyre::Result; use init4_bin_base::{ deps::tracing_subscriber::{ @@ -14,10 +13,8 @@ use init4_bin_base::{ perms::OAuthConfig, utils::calc::SlotCalculator, }; -use std::{ - str::FromStr, - time::{Instant, SystemTime}, -}; +use std::str::FromStr; +use trevm::revm::{context::BlockEnv, context_interface::block::BlobExcessGasAndPrice}; /// Sets up a block builder with test values pub fn setup_test_config() -> Result { @@ -90,18 +87,19 @@ pub fn test_block_env( config: BuilderConfig, number: u64, basefee: u64, - finish_by: Instant, -) -> PecorinoBlockEnv { - let remaining = finish_by.duration_since(Instant::now()); - let finish_time = SystemTime::now() + remaining; - let deadline: DateTime = finish_time.into(); - - PecorinoBlockEnv { + timestamp: u64, +) -> BlockEnv { + BlockEnv { number, - beneficiary: Address::repeat_byte(0), - timestamp: deadline.timestamp() as u64, + beneficiary: Address::repeat_byte(1), + timestamp, gas_limit: config.rollup_block_gas_limit, basefee, - prevrandao: Some(FixedBytes::random()), + difficulty: U256::ZERO, + prevrandao: Some(B256::random()), + blob_excess_gas_and_price: Some(BlobExcessGasAndPrice { + excess_blob_gas: 0, + blob_gasprice: 0, + }), } } diff --git a/tests/block_builder_test.rs b/tests/block_builder_test.rs index 93d2038..a88869b 100644 --- a/tests/block_builder_test.rs +++ b/tests/block_builder_test.rs @@ -1,134 +1,130 @@ //! Tests for the block building task. -#[cfg(test)] -mod tests { - use alloy::{ - network::Ethereum, - node_bindings::Anvil, - primitives::U256, - providers::{Provider, RootProvider}, - signers::local::PrivateKeySigner, - }; - use builder::{ - tasks::block::sim::Simulator, - test_utils::{new_signed_tx, setup_logging, setup_test_config, test_block_env}, - }; - use init4_bin_base::utils::calc::SlotCalculator; - use signet_sim::{SimCache, SimItem}; - use signet_types::constants::SignetSystemConstants; - use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; - use tokio::{sync::mpsc::unbounded_channel, time::timeout}; - - /// Tests the `handle_build` method of the `Simulator`. - /// - /// This test sets up a simulated environment using Anvil, creates a block builder, - /// and verifies that the block builder can successfully build a block containing - /// transactions from multiple senders. - #[cfg(feature = "integration")] - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn test_handle_build() { - setup_logging(); - - // Make a test config - let config = setup_test_config().unwrap(); - let constants = SignetSystemConstants::pecorino(); - - // Create an anvil instance for testing - let anvil_instance = Anvil::new().chain_id(signet_constants::pecorino::RU_CHAIN_ID).spawn(); - - // Create a wallet - let keys = anvil_instance.keys(); - let test_key_0 = PrivateKeySigner::from_signing_key(keys[0].clone().into()); - let test_key_1 = PrivateKeySigner::from_signing_key(keys[1].clone().into()); - - // Create a rollup provider - let ru_provider = RootProvider::::new_http(anvil_instance.endpoint_url()); - - // Create a block builder with a slot calculator for testing - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Clock may have gone backwards") - .as_secs(); - - let slot_calculator = SlotCalculator::new(now, 0, 12); - let block_builder = Simulator::new(&config, ru_provider.clone(), slot_calculator); - - // Setup a sim cache - let sim_items = SimCache::new(); - - // Add two transactions from two senders to the sim cache - let tx_1 = new_signed_tx(&test_key_0, 0, U256::from(1_f64), 11_000).unwrap(); - sim_items.add_item(SimItem::Tx(tx_1), 0); - - let tx_2 = new_signed_tx(&test_key_1, 0, U256::from(2_f64), 10_000).unwrap(); - sim_items.add_item(SimItem::Tx(tx_2), 0); - - // Setup the block env - let finish_by = Instant::now() + Duration::from_secs(2); - let block_number = ru_provider.get_block_number().await.unwrap(); - let block_env = test_block_env(config, block_number, 7, finish_by); - - // Spawn the block builder task - let got = block_builder.handle_build(constants, sim_items, finish_by, block_env).await; - - // Assert on the built block - assert!(got.is_ok()); - assert!(got.unwrap().tx_count() == 2); - } - - /// Tests the full block builder loop, including transaction ingestion and block simulation. - /// - /// This test sets up a simulated environment using Anvil, creates a block builder, - /// and verifies that the builder can process incoming transactions and produce a block - /// within a specified timeout. - #[ignore = "integration test"] - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn test_spawn() { - setup_logging(); - - // Make a test config - let config = setup_test_config().unwrap(); - let constants = SignetSystemConstants::pecorino(); - - // Create an anvil instance for testing - let anvil_instance = Anvil::new().chain_id(signet_constants::pecorino::RU_CHAIN_ID).spawn(); - - // Create a wallet - let keys = anvil_instance.keys(); - let test_key_0 = PrivateKeySigner::from_signing_key(keys[0].clone().into()); - let test_key_1 = PrivateKeySigner::from_signing_key(keys[1].clone().into()); - - // Plumb inputs for the test setup - let (tx_sender, tx_receiver) = unbounded_channel(); - let (_, bundle_receiver) = unbounded_channel(); - let (block_sender, mut block_receiver) = unbounded_channel(); - - // Create a rollup provider - let ru_provider = RootProvider::::new_http(anvil_instance.endpoint_url()); - - let sim = Simulator::new(&config, ru_provider.clone(), config.slot_calculator); - - // Create a shared sim cache - let sim_cache = SimCache::new(); - - // Create a sim cache and start filling it with items - sim.spawn_cache_tasks(tx_receiver, bundle_receiver, sim_cache.clone()); - - // Finally, Kick off the block builder task. - sim.spawn_simulator_task(constants, sim_cache.clone(), block_sender); - - // Feed in transactions to the tx_sender and wait for the block to be simulated - let tx_1 = new_signed_tx(&test_key_0, 0, U256::from(1_f64), 11_000).unwrap(); - let tx_2 = new_signed_tx(&test_key_1, 0, U256::from(2_f64), 10_000).unwrap(); - tx_sender.send(tx_1).unwrap(); - tx_sender.send(tx_2).unwrap(); - - // Wait for a block with timeout - let result = timeout(Duration::from_secs(5), block_receiver.recv()).await; - assert!(result.is_ok(), "Did not receive block within 5 seconds"); - - // Assert on the block - let block = result.unwrap(); - assert!(block.is_some(), "Block channel closed without receiving a block"); - assert!(block.unwrap().tx_count() == 2); // TODO: Why is this failing? I'm seeing EVM errors but haven't tracked them down yet. - } + +use alloy::{ + network::Ethereum, + node_bindings::Anvil, + primitives::U256, + providers::{Provider, RootProvider}, + signers::local::PrivateKeySigner, +}; +use builder::{ + tasks::{block::sim::Simulator, cache::CacheTask}, + test_utils::{new_signed_tx, setup_logging, setup_test_config, test_block_env}, +}; +use signet_sim::{SimCache, SimItem}; +use signet_types::constants::SignetSystemConstants; +use std::time::{Duration, Instant}; +use tokio::{sync::mpsc::unbounded_channel, time::timeout}; + +/// Tests the `handle_build` method of the `Simulator`. +/// +/// This test sets up a simulated environment using Anvil, creates a block builder, +/// and verifies that the block builder can successfully build a block containing +/// transactions from multiple senders. +#[ignore = "integration test"] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_handle_build() { + use alloy::eips::BlockId; + + setup_logging(); + + // Make a test config + let config = setup_test_config().unwrap(); + let constants = SignetSystemConstants::pecorino(); + + // Create an anvil instance for testing + let anvil_instance = Anvil::new().chain_id(signet_constants::pecorino::RU_CHAIN_ID).spawn(); + + // Create a wallet + let keys = anvil_instance.keys(); + let test_key_0 = PrivateKeySigner::from_signing_key(keys[0].clone().into()); + let test_key_1 = PrivateKeySigner::from_signing_key(keys[1].clone().into()); + + // Create a rollup provider + let ru_provider = RootProvider::::new_http(anvil_instance.endpoint_url()); + + let block_env = config.env_task().spawn().0; + + let block_builder = Simulator::new(&config, ru_provider.clone(), block_env); + + // Setup a sim cache + let sim_items = SimCache::new(); + + // Add two transactions from two senders to the sim cache + let tx_1 = new_signed_tx(&test_key_0, 0, U256::from(1_f64), 11_000).unwrap(); + sim_items.add_item(SimItem::Tx(tx_1), 0); + + let tx_2 = new_signed_tx(&test_key_1, 0, U256::from(2_f64), 10_000).unwrap(); + sim_items.add_item(SimItem::Tx(tx_2), 0); + + // Setup the block env + let finish_by = Instant::now() + Duration::from_secs(2); + let header = ru_provider.get_block(BlockId::latest()).await.unwrap().unwrap().header.inner; + let number = header.number + 1; + let timestamp = header.timestamp + config.slot_calculator.slot_duration(); + let block_env = test_block_env(config, number, 7, timestamp); + + // Spawn the block builder task + let got = block_builder.handle_build(constants, sim_items, finish_by, block_env).await; + + // Assert on the built block + assert!(got.is_ok()); + assert!(got.unwrap().tx_count() == 2); +} + +/// Tests the full block builder loop, including transaction ingestion and block simulation. +/// +/// This test sets up a simulated environment using Anvil, creates a block builder, +/// and verifies that the builder can process incoming transactions and produce a block +/// within a specified timeout. +#[ignore = "integration test"] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_spawn() { + setup_logging(); + + // Make a test config + let config = setup_test_config().unwrap(); + let constants = SignetSystemConstants::pecorino(); + + // Create an anvil instance for testing + let anvil_instance = Anvil::new().chain_id(signet_constants::pecorino::RU_CHAIN_ID).spawn(); + + // Create a wallet + let keys = anvil_instance.keys(); + let test_key_0 = PrivateKeySigner::from_signing_key(keys[0].clone().into()); + let test_key_1 = PrivateKeySigner::from_signing_key(keys[1].clone().into()); + + // Plumb inputs for the test setup + let (tx_sender, tx_receiver) = unbounded_channel(); + let (_, bundle_receiver) = unbounded_channel(); + let (block_sender, mut block_receiver) = unbounded_channel(); + + let env_task = config.env_task(); + let (block_env, _env_jh) = env_task.spawn(); + + let cache_task = CacheTask::new(block_env.clone(), bundle_receiver, tx_receiver); + let (sim_cache, _cache_jh) = cache_task.spawn(); + + // Create a rollup provider + let ru_provider = RootProvider::::new_http(anvil_instance.endpoint_url()); + + let sim = Simulator::new(&config, ru_provider.clone(), block_env); + + // Finally, Kick off the block builder task. + sim.spawn_simulator_task(constants, sim_cache.clone(), block_sender); + + // Feed in transactions to the tx_sender and wait for the block to be simulated + let tx_1 = new_signed_tx(&test_key_0, 0, U256::from(1_f64), 11_000).unwrap(); + let tx_2 = new_signed_tx(&test_key_1, 0, U256::from(2_f64), 10_000).unwrap(); + tx_sender.send(tx_1).unwrap(); + tx_sender.send(tx_2).unwrap(); + + // Wait for a block with timeout + let result = timeout(Duration::from_secs(5), block_receiver.recv()).await; + assert!(result.is_ok(), "Did not receive block within 5 seconds"); + + // Assert on the block + let block = result.unwrap(); + assert!(block.is_some(), "Block channel closed without receiving a block"); + assert!(block.unwrap().tx_count() == 2); // TODO: Why is this failing? I'm seeing EVM errors but haven't tracked them down yet. } diff --git a/tests/bundle_poller_test.rs b/tests/bundle_poller_test.rs index ae051d0..2e7a811 100644 --- a/tests/bundle_poller_test.rs +++ b/tests/bundle_poller_test.rs @@ -1,17 +1,15 @@ -mod tests { - use builder::test_utils; - use eyre::Result; +use builder::test_utils; +use eyre::Result; - #[ignore = "integration test"] - #[tokio::test] - async fn test_bundle_poller_roundtrip() -> Result<()> { - let config = test_utils::setup_test_config().unwrap(); - let token = config.oauth_token(); +#[ignore = "integration test"] +#[tokio::test] +async fn test_bundle_poller_roundtrip() -> Result<()> { + let config = test_utils::setup_test_config().unwrap(); + let token = config.oauth_token(); - let mut bundle_poller = builder::tasks::cache::BundlePoller::new(&config, token); + let mut bundle_poller = builder::tasks::cache::BundlePoller::new(&config, token); - let _ = bundle_poller.check_bundle_cache().await?; + let _ = bundle_poller.check_bundle_cache().await?; - Ok(()) - } + Ok(()) } diff --git a/tests/cache.rs b/tests/cache.rs new file mode 100644 index 0000000..bd80f19 --- /dev/null +++ b/tests/cache.rs @@ -0,0 +1,20 @@ +use builder::test_utils::{setup_logging, setup_test_config}; +use init4_bin_base::deps::tracing::warn; +use std::time::Duration; + +#[ignore = "integration test. This test will take >12 seconds to run, and requires Authz configuration env vars."] +#[tokio::test] +async fn test_bundle_poller_roundtrip() -> eyre::Result<()> { + setup_logging(); + + let config = setup_test_config().unwrap(); + + let (block_env, _jh) = config.env_task().spawn(); + let cache = config.spawn_cache_system(block_env); + + tokio::time::sleep(Duration::from_secs(12)).await; + + warn!(txns = ?cache.sim_cache.read_best(5)); + + Ok(()) +} diff --git a/tests/env.rs b/tests/env.rs new file mode 100644 index 0000000..adebe43 --- /dev/null +++ b/tests/env.rs @@ -0,0 +1,15 @@ +use builder::test_utils::{setup_logging, setup_test_config}; + +#[ignore = "integration test. This test will take between 0 and 12 seconds to run."] +#[tokio::test] +async fn test_bundle_poller_roundtrip() { + setup_logging(); + + let config = setup_test_config().unwrap(); + let env_task = config.env_task(); + let (mut env_watcher, _jh) = env_task.spawn(); + + env_watcher.changed().await.unwrap(); + let env = env_watcher.borrow_and_update(); + assert!(env.as_ref().is_some(), "Env should be Some"); +} diff --git a/tests/tx_poller_test.rs b/tests/tx_poller_test.rs index c0f5ffe..e48338f 100644 --- a/tests/tx_poller_test.rs +++ b/tests/tx_poller_test.rs @@ -1,50 +1,48 @@ -mod tests { - use alloy::{primitives::U256, signers::local::PrivateKeySigner}; - use builder::{ - config::BuilderConfig, - tasks::cache::TxPoller, - test_utils::{new_signed_tx, setup_logging, setup_test_config}, - }; - // Import the refactored function - use eyre::{Ok, Result}; +use alloy::{primitives::U256, signers::local::PrivateKeySigner}; +use builder::{ + config::BuilderConfig, + tasks::cache::TxPoller, + test_utils::{new_signed_tx, setup_logging, setup_test_config}, +}; +// Import the refactored function +use eyre::{Ok, Result}; - #[ignore = "integration test"] - #[tokio::test] - async fn test_tx_roundtrip() -> Result<()> { - setup_logging(); +#[ignore = "integration test"] +#[tokio::test] +async fn test_tx_roundtrip() -> Result<()> { + setup_logging(); - // Create a new test environment - let config = setup_test_config()?; + // Create a new test environment + let config = setup_test_config()?; - // Post a transaction to the cache - post_tx(&config).await?; + // Post a transaction to the cache + post_tx(&config).await?; - // Create a new poller - let mut poller = TxPoller::new(&config); + // Create a new poller + let mut poller = TxPoller::new(&config); - // Fetch transactions the pool - let transactions = poller.check_tx_cache().await?; + // Fetch transactions the pool + let transactions = poller.check_tx_cache().await?; - // Ensure at least one transaction exists - assert!(!transactions.is_empty()); + // Ensure at least one transaction exists + assert!(!transactions.is_empty()); - Ok(()) - } - - async fn post_tx(config: &BuilderConfig) -> Result<()> { - let client = reqwest::Client::new(); + Ok(()) +} - let wallet = PrivateKeySigner::random(); - let tx_envelope = new_signed_tx(&wallet, 1, U256::from(1), 10_000)?; +async fn post_tx(config: &BuilderConfig) -> Result<()> { + let client = reqwest::Client::new(); - let url = format!("{}/transactions", config.tx_pool_url); - let response = client.post(&url).json(&tx_envelope).send().await?; + let wallet = PrivateKeySigner::random(); + let tx_envelope = new_signed_tx(&wallet, 1, U256::from(1), 10_000)?; - if !response.status().is_success() { - let error_text = response.text().await?; - eyre::bail!("Failed to post transaction: {}", error_text); - } + let url = format!("{}/transactions", config.tx_pool_url); + let response = client.post(&url).json(&tx_envelope).send().await?; - Ok(()) + if !response.status().is_success() { + let error_text = response.text().await?; + eyre::bail!("Failed to post transaction: {}", error_text); } + + Ok(()) }