Skip to content

refactor: integrate env and cache tasks #92

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 26, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -27,7 +27,6 @@ integration = []
[dependencies]
init4-bin-base = { version = "0.3.4", features = ["perms"] }

signet-bundle = { git = "https://github.com/init4tech/signet-sdk", rev = "b8251ff0fec7cb14ca87e6f95c14f56bc2593049" }
signet-constants = { git = "https://github.com/init4tech/signet-sdk", rev = "b8251ff0fec7cb14ca87e6f95c14f56bc2593049" }
signet-sim = { git = "https://github.com/init4tech/signet-sdk", rev = "b8251ff0fec7cb14ca87e6f95c14f56bc2593049" }
signet-tx-cache = { git = "https://github.com/init4tech/signet-sdk", rev = "b8251ff0fec7cb14ca87e6f95c14f56bc2593049" }
@@ -56,6 +55,5 @@ serde_json = "1.0"
tokio = { version = "1.36.0", features = ["full", "macros", "rt-multi-thread"] }

oauth2 = "5"
chrono = "0.4.41"
tokio-stream = "0.1.17"
url = "2.5.4"
78 changes: 37 additions & 41 deletions bin/builder.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
use builder::{
config::BuilderConfig,
service::serve_builder,
tasks::{
block::sim::Simulator,
cache::{BundlePoller, TxPoller},
metrics::MetricsTask,
submit::SubmitTask,
},
tasks::{block::sim::Simulator, metrics::MetricsTask, submit::SubmitTask},
};
use init4_bin_base::{
deps::tracing::{info, info_span},
utils::from_env::FromEnv,
};
use init4_bin_base::{deps::tracing, utils::from_env::FromEnv};
use signet_sim::SimCache;
use signet_types::constants::SignetSystemConstants;
use tokio::select;
use tracing::info_span;

// Note: Must be set to `multi_thread` to support async tasks.
// See: https://docs.rs/tokio/latest/tokio/attr.main.html
@@ -21,74 +17,74 @@ async fn main() -> eyre::Result<()> {
let _guard = init4_bin_base::init4();
let init_span_guard = info_span!("builder initialization");

// Pull the configuration from the environment
let config = BuilderConfig::from_env()?.clone();
let constants = SignetSystemConstants::pecorino();
let token = config.oauth_token();

// Spawn the EnvTask
let env_task = config.env_task();
let (block_env, env_jh) = env_task.spawn();

// Spawn the cache system
let cache_system = config.spawn_cache_system(block_env.clone());

// Prep providers and contracts
let (host_provider, quincey) =
tokio::try_join!(config.connect_host_provider(), config.connect_quincey())?;
let ru_provider = config.connect_ru_provider();

let zenith = config.connect_zenith(host_provider.clone());

// Set up the metrics task
let metrics = MetricsTask { host_provider };
let (tx_channel, metrics_jh) = metrics.spawn();

// Make a Tx submission task
let submit =
SubmitTask { zenith, quincey, config: config.clone(), outbound_tx_channel: tx_channel };

let tx_poller = TxPoller::new(&config);
let (tx_receiver, tx_poller_jh) = tx_poller.spawn();

let bundle_poller = BundlePoller::new(&config, token);
let (bundle_receiver, bundle_poller_jh) = bundle_poller.spawn();

// Set up tx submission
let (submit_channel, submit_jh) = submit.spawn();

let sim_items = SimCache::new();
let slot_calculator = config.slot_calculator;

let sim = Simulator::new(&config, ru_provider.clone(), slot_calculator);

let (basefee_jh, sim_cache_jh) =
sim.spawn_cache_tasks(tx_receiver, bundle_receiver, sim_items.clone());

let build_jh = sim.spawn_simulator_task(constants, sim_items.clone(), submit_channel);
// Set up the simulator
let sim = Simulator::new(&config, ru_provider.clone(), block_env);
let build_jh = sim.spawn_simulator_task(constants, cache_system.sim_cache, submit_channel);

// Start the healthcheck server
let server = serve_builder(([0, 0, 0, 0], config.builder_port));

// We have finished initializing the builder, so we can drop the init span
// guard.
drop(init_span_guard);

select! {
_ = tx_poller_jh => {
tracing::info!("tx_poller finished");

_ = env_jh => {
info!("env task finished");
},
_ = bundle_poller_jh => {
tracing::info!("bundle_poller finished");
_ = cache_system.cache_task => {
info!("cache task finished");
},
_ = cache_system.tx_poller => {
info!("tx_poller finished");
},
_ = cache_system.bundle_poller => {
info!("bundle_poller finished");
},
_ = sim_cache_jh => {
tracing::info!("sim cache task finished");
}
_ = basefee_jh => {
tracing::info!("basefee task finished");
}
_ = submit_jh => {
tracing::info!("submit finished");
info!("submit finished");
},
_ = metrics_jh => {
tracing::info!("metrics finished");
info!("metrics finished");
},
_ = build_jh => {
tracing::info!("build finished");
info!("build finished");
}
_ = server => {
tracing::info!("server finished");
info!("server finished");
}
}

tracing::info!("shutting down");
info!("shutting down");

Ok(())
}
54 changes: 50 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
use crate::quincey::Quincey;
use crate::{
quincey::Quincey,
tasks::{
block::cfg::SignetCfgEnv,
cache::{BundlePoller, CacheSystem, CacheTask, TxPoller},
env::EnvTask,
},
};
use alloy::{
network::{Ethereum, EthereumWallet},
primitives::Address,
@@ -21,6 +28,8 @@ use init4_bin_base::{
};
use signet_zenith::Zenith;
use std::borrow::Cow;
use tokio::sync::watch;
use trevm::revm::context::BlockEnv;

/// Type alias for the provider used to simulate against rollup state.
pub type RuProvider = RootProvider<Ethereum>;
@@ -132,7 +141,7 @@ pub struct BuilderConfig {
)]
pub tx_pool_cache_duration: u64,

/// Oauth2 configuration for the builder to connect to ini4 services.
/// Oauth2 configuration for the builder to connect to init4 services.
pub oauth: OAuthConfig,

/// The max number of simultaneous block simulations to run.
@@ -166,8 +175,13 @@ impl BuilderConfig {

/// Connect to the Rollup rpc provider.
pub fn connect_ru_provider(&self) -> RootProvider<Ethereum> {
let url = url::Url::parse(&self.ru_rpc_url).expect("failed to parse URL");
RootProvider::<Ethereum>::new_http(url)
static ONCE: std::sync::OnceLock<RootProvider<Ethereum>> = std::sync::OnceLock::new();

ONCE.get_or_init(|| {
let url = url::Url::parse(&self.ru_rpc_url).expect("failed to parse URL");
RootProvider::new_http(url)
})
.clone()
}

/// Connect to the Host rpc provider.
@@ -222,4 +236,36 @@ impl BuilderConfig {

Ok(Quincey::new_remote(client, url, token))
}

/// Create an [`EnvTask`] using this config.
pub fn env_task(&self) -> EnvTask {
let provider = self.connect_ru_provider();
EnvTask::new(self.clone(), provider)
}

/// Spawn a new [`CacheSystem`] using this config. This contains the
/// joinhandles for [`TxPoller`] and [`BundlePoller`] and [`CacheTask`], as
/// well as the [`SimCache`] and the block env watcher.
///
/// [`SimCache`]: signet_sim::SimCache
pub fn spawn_cache_system(&self, block_env: watch::Receiver<Option<BlockEnv>>) -> CacheSystem {
// Tx Poller pulls transactions from the cache
let tx_poller = TxPoller::new(self);
let (tx_receiver, tx_poller) = tx_poller.spawn();

// Bundle Poller pulls bundles from the cache
let bundle_poller = BundlePoller::new(self, self.oauth_token());
let (bundle_receiver, bundle_poller) = bundle_poller.spawn();

// Set up the cache task
let cache_task = CacheTask::new(block_env.clone(), bundle_receiver, tx_receiver);
let (sim_cache, cache_task) = cache_task.spawn();

CacheSystem { cache_task, tx_poller, bundle_poller, sim_cache }
}

/// Create a [`SignetCfgEnv`] using this config.
pub const fn cfg_env(&self) -> SignetCfgEnv {
SignetCfgEnv { chain_id: self.ru_chain_id }
}
}
90 changes: 7 additions & 83 deletions src/tasks/block/cfg.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,14 @@
//! This file implements the [`trevm::Cfg`] and [`trevm::Block`] traits for Pecorino blocks.
use alloy::primitives::{Address, B256, FixedBytes, U256};
use trevm::{
Block,
revm::{
context::{BlockEnv, CfgEnv},
context_interface::block::BlobExcessGasAndPrice,
primitives::hardfork::SpecId,
},
};

use crate::config::BuilderConfig;
use trevm::revm::{context::CfgEnv, primitives::hardfork::SpecId};

/// PecorinoCfg holds network-level configuration values.
#[derive(Debug, Clone, Copy)]
pub struct PecorinoCfg {}
pub struct SignetCfgEnv {
/// The chain ID.
pub chain_id: u64,
}

impl trevm::Cfg for PecorinoCfg {
impl trevm::Cfg for SignetCfgEnv {
/// Fills the configuration environment with Pecorino-specific values.
///
/// # Arguments
@@ -24,76 +17,7 @@ impl trevm::Cfg for PecorinoCfg {
fn fill_cfg_env(&self, cfg_env: &mut CfgEnv) {
let CfgEnv { chain_id, spec, .. } = cfg_env;

*chain_id = signet_constants::pecorino::RU_CHAIN_ID;
*chain_id = self.chain_id;
*spec = SpecId::default();
}
}

/// PecorinoBlockEnv holds block-level configurations for Pecorino blocks.
#[derive(Debug, Clone, Copy)]
pub struct PecorinoBlockEnv {
/// The block number for this block.
pub number: u64,
/// The address the block reward should be sent to.
pub beneficiary: Address,
/// Timestamp for the block.
pub timestamp: u64,
/// The gas limit for this block environment.
pub gas_limit: u64,
/// The basefee to use for calculating gas usage.
pub basefee: u64,
/// The prevrandao to use for this block.
pub prevrandao: Option<FixedBytes<32>>,
}

/// Implements [`trevm::Block`] for the Pecorino block.
impl Block for PecorinoBlockEnv {
/// Fills the block environment with the Pecorino specific values
fn fill_block_env(&self, block_env: &mut BlockEnv) {
// Destructure the fields off of the block_env and modify them
let BlockEnv {
number,
beneficiary,
timestamp,
gas_limit,
basefee,
difficulty,
prevrandao,
blob_excess_gas_and_price,
} = block_env;
*number = self.number;
*beneficiary = self.beneficiary;
*timestamp = self.timestamp;
*gas_limit = self.gas_limit;
*basefee = self.basefee;
*prevrandao = self.prevrandao;

// NB: The following fields are set to sane defaults because they
// are not supported by the rollup
*difficulty = U256::ZERO;
*blob_excess_gas_and_price =
Some(BlobExcessGasAndPrice { excess_blob_gas: 0, blob_gasprice: 0 });
}
}

impl PecorinoBlockEnv {
/// Returns a new PecorinoBlockEnv with the specified values.
///
/// # Arguments
///
/// - config: The BuilderConfig for the builder.
/// - number: The block number of this block, usually the latest block number plus 1,
/// unless simulating blocks in the past.
/// - timestamp: The timestamp of the block, typically set to the deadline of the
/// block building task.
pub fn new(config: BuilderConfig, number: u64, timestamp: u64, basefee: u64) -> Self {
PecorinoBlockEnv {
number,
beneficiary: config.builder_rewards_address,
timestamp,
gas_limit: config.rollup_block_gas_limit,
basefee,
prevrandao: Some(B256::random()),
}
}
}
275 changes: 46 additions & 229 deletions src/tasks/block/sim.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,30 @@
//! `block.rs` contains the Simulator and everything that wires it into an
//! actor that handles the simulation of a stream of bundles and transactions
//! and turns them into valid Pecorino blocks for network submission.
use super::cfg::PecorinoBlockEnv;
use crate::{
config::{BuilderConfig, RuProvider},
tasks::{block::cfg::PecorinoCfg, cache::Bundle},
};
use alloy::{
consensus::TxEnvelope,
eips::{BlockId, BlockNumberOrTag::Latest},
network::Ethereum,
providers::Provider,
};
use chrono::{DateTime, Utc};
use eyre::{Context, bail};
use crate::config::{BuilderConfig, RuProvider};
use alloy::{eips::BlockId, network::Ethereum, providers::Provider};
use init4_bin_base::{
deps::tracing::{debug, error, info, warn},
deps::tracing::{debug, error},
utils::calc::SlotCalculator,
};
use signet_sim::{BlockBuild, BuiltBlock, SimCache};
use signet_types::constants::SignetSystemConstants;
use std::{
use std::time::{Duration, Instant};
use tokio::{
sync::{
Arc,
atomic::{AtomicU64, Ordering},
mpsc::{self},
watch,
},
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};
use tokio::{
select,
sync::mpsc::{self},
task::JoinHandle,
time::sleep,
};
use trevm::revm::{
context::BlockEnv,
database::{AlloyDB, WrapDatabaseAsync},
inspector::NoOpInspector,
};

type AlloyDatabaseProvider = WrapDatabaseAsync<AlloyDB<Ethereum, RuProvider>>;

/// `Simulator` is responsible for periodically building blocks and submitting them for
/// signing and inclusion in the blockchain. It wraps a rollup provider and a slot
/// calculator with a builder configuration.
@@ -47,11 +34,10 @@ pub struct Simulator {
pub config: BuilderConfig,
/// A provider that cannot sign transactions, used for interacting with the rollup.
pub ru_provider: RuProvider,
/// The slot calculator for determining when to wake up and build blocks.
pub slot_calculator: SlotCalculator,
}

type AlloyDatabaseProvider = WrapDatabaseAsync<AlloyDB<Ethereum, RuProvider>>;
/// The block configuration environment on which to simulate
pub block_env: watch::Receiver<Option<BlockEnv>>,
}

impl Simulator {
/// Creates a new `Simulator` instance.
@@ -60,17 +46,21 @@ impl Simulator {
///
/// - `config`: The configuration for the builder.
/// - `ru_provider`: A provider for interacting with the rollup.
/// - `slot_calculator`: A slot calculator for managing block timing.
///
/// # Returns
///
/// A new `Simulator` instance.
pub fn new(
config: &BuilderConfig,
ru_provider: RuProvider,
slot_calculator: SlotCalculator,
block_env: watch::Receiver<Option<BlockEnv>>,
) -> Self {
Self { config: config.clone(), ru_provider, slot_calculator }
Self { config: config.clone(), ru_provider, block_env }
}

/// Get the slot calculator.
pub const fn slot_calculator(&self) -> &SlotCalculator {
&self.config.slot_calculator
}

/// Handles building a single block.
@@ -89,14 +79,14 @@ impl Simulator {
constants: SignetSystemConstants,
sim_items: SimCache,
finish_by: Instant,
block: PecorinoBlockEnv,
block: BlockEnv,
) -> eyre::Result<BuiltBlock> {
let db = self.create_db().await.unwrap();

let block_build: BlockBuild<_, NoOpInspector> = BlockBuild::new(
db,
constants,
PecorinoCfg {},
self.config.cfg_env(),
block,
finish_by,
self.config.concurrency_limit,
@@ -110,85 +100,6 @@ impl Simulator {
Ok(block)
}

/// Spawns two tasks: one to handle incoming transactions and bundles,
/// adding them to the simulation cache, and one to track the latest basefee.
///
/// # Arguments
///
/// - `tx_receiver`: A channel receiver for incoming transactions.
/// - `bundle_receiver`: A channel receiver for incoming bundles.
/// - `cache`: The simulation cache to store the received items.
///
/// # Returns
///
/// A `JoinHandle` for the basefee updater and a `JoinHandle` for the
/// cache handler.
pub fn spawn_cache_tasks(
&self,
tx_receiver: mpsc::UnboundedReceiver<TxEnvelope>,
bundle_receiver: mpsc::UnboundedReceiver<Bundle>,
cache: SimCache,
) -> (JoinHandle<()>, JoinHandle<()>) {
debug!("starting up cache handler");

let basefee_price = Arc::new(AtomicU64::new(0_u64));
let basefee_reader = Arc::clone(&basefee_price);
let fut = self.basefee_updater_fut(basefee_price);

// Update the basefee on a per-block cadence
let basefee_jh = tokio::spawn(fut);

// Update the sim cache whenever a transaction or bundle is received with respect to the basefee
let cache_jh = tokio::spawn(async move {
cache_updater(tx_receiver, bundle_receiver, cache, basefee_reader).await
});

(basefee_jh, cache_jh)
}

/// Periodically updates the shared basefee by querying the latest block.
///
/// This function calculates the remaining time until the next slot,
/// sleeps until that time, and then retrieves the latest basefee from the rollup provider.
/// The updated basefee is stored in the provided `AtomicU64`.
///
/// This function runs continuously.
///
/// # Arguments
///
/// - `price`: A shared `Arc<AtomicU64>` used to store the updated basefee value.
fn basefee_updater_fut(&self, price: Arc<AtomicU64>) -> impl Future<Output = ()> + use<> {
let slot_calculator = self.slot_calculator;
let ru_provider = self.ru_provider.clone();

async move {
debug!("starting basefee updater");
loop {
// calculate start of next slot plus a small buffer
let time_remaining = slot_calculator.slot_duration()
- slot_calculator.current_timepoint_within_slot()
+ 1;
debug!(time_remaining = ?time_remaining, "basefee updater sleeping until next slot");

// wait until that point in time
sleep(Duration::from_secs(time_remaining)).await;

// update the basefee with that price
let resp = ru_provider.get_block_by_number(Latest).await.inspect_err(|e| {
error!(error = %e, "RPC error during basefee update");
});

if let Ok(Some(block)) = resp {
let basefee = block.header.base_fee_per_gas.unwrap_or(0);
price.store(basefee, Ordering::Relaxed);
debug!(basefee = basefee, "basefee updated");
} else {
warn!("get basefee failed - an error likely occurred");
}
}
}
}

/// Spawns the simulator task, which handles the setup and sets the deadline
/// for the each round of simulation.
///
@@ -227,7 +138,7 @@ impl Simulator {
/// - `cache`: The simulation cache containing transactions and bundles.
/// - `submit_sender`: A channel sender used to submit built blocks.
async fn run_simulator(
self,
mut self,
constants: SignetSystemConstants,
cache: SimCache,
submit_sender: mpsc::UnboundedSender<BuiltBlock>,
@@ -236,14 +147,16 @@ impl Simulator {
let sim_cache = cache.clone();
let finish_by = self.calculate_deadline();

let block_env = match self.next_block_env(finish_by).await {
Ok(block) => block,
Err(err) => {
error!(err = %err, "failed to configure next block");
break;
}
};
info!(block_env = ?block_env, "created block");
// Wait for the block environment to be set
if self.block_env.changed().await.is_err() {
error!("block_env channel closed");
return;
}

// If no env, skip this run
let Some(block_env) = self.block_env.borrow_and_update().clone() else { return };

debug!(block_env = ?block_env, "building on block");

match self.handle_build(constants, sim_cache, finish_by, block_env).await {
Ok(block) => {
@@ -265,13 +178,19 @@ impl Simulator {
/// An `Instant` representing the simulation deadline, as calculated by determining
/// the time left in the current slot and adding that to the current timestamp in UNIX seconds.
pub fn calculate_deadline(&self) -> Instant {
// Calculate the current timestamp in seconds since the UNIX epoch
let now = SystemTime::now();
let unix_seconds = now.duration_since(UNIX_EPOCH).expect("Time went backwards").as_secs();
// Calculate the time remaining in the current slot
let remaining = self.slot_calculator.calculate_timepoint_within_slot(unix_seconds);
// Deadline is equal to the start of the next slot plus the time remaining in this slot
Instant::now() + Duration::from_secs(remaining)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems to have been giving incorrect output. we need to sub remaining from duration, then add that to now

// Get the current timepoint within the slot.
let timepoint = self.slot_calculator().current_timepoint_within_slot();

// We have the timepoint in seconds into the slot. To find out what's
// remaining, we need to subtract it from the slot duration
let remaining = self.slot_calculator().slot_duration() - timepoint;

// We add a 1500 ms buffer to account for sequencer stopping signing.

let candidate =
Instant::now() + Duration::from_secs(remaining) - Duration::from_millis(1500);

candidate.max(Instant::now())
}

/// Creates an `AlloyDB` instance from the rollup provider.
@@ -300,106 +219,4 @@ impl Simulator {
let wrapped_db: AlloyDatabaseProvider = WrapDatabaseAsync::new(alloy_db).unwrap();
Some(wrapped_db)
}

/// Prepares the next block environment.
///
/// Prepares the next block environment to load into the simulator by fetching the latest block number,
/// assigning the correct next block number, checking the basefee, and setting the timestamp,
/// reward address, and gas configuration for the block environment based on builder configuration.
///
/// # Arguments
///
/// - finish_by: The deadline at which block simulation will end.
async fn next_block_env(&self, finish_by: Instant) -> eyre::Result<PecorinoBlockEnv> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replaced by EnvTask

let remaining = finish_by.duration_since(Instant::now());
let finish_time = SystemTime::now() + remaining;
let deadline: DateTime<Utc> = finish_time.into();
debug!(deadline = %deadline, "preparing block env");

// Fetch the latest block number and increment it by 1
let latest_block_number = match self.ru_provider.get_block_number().await {
Ok(num) => num,
Err(err) => {
error!(%err, "RPC error during block build");
bail!(err)
}
};
debug!(next_block_num = latest_block_number + 1, "preparing block env");

// Fetch the basefee from previous block to calculate gas for this block
let basefee = match self.get_basefee().await? {
Some(basefee) => basefee,
None => {
warn!("get basefee failed - RPC error likely occurred");
todo!()
}
};
debug!(basefee = basefee, "setting basefee");

// Craft the Block environment to pass to the simulator
let block_env = PecorinoBlockEnv::new(
self.config.clone(),
latest_block_number + 1,
deadline.timestamp() as u64,
basefee,
);
debug!(block_env = ?block_env, "prepared block env");

Ok(block_env)
}

/// Returns the basefee of the latest block.
///
/// # Returns
///
/// The basefee of the previous (latest) block if the request was successful,
/// or a sane default if the RPC failed.
async fn get_basefee(&self) -> eyre::Result<Option<u64>> {
let Some(block) =
self.ru_provider.get_block_by_number(Latest).await.wrap_err("basefee error")?
else {
return Ok(None);
};

debug!(basefee = ?block.header.base_fee_per_gas, "basefee found");
Ok(block.header.base_fee_per_gas)
}
}

/// Continuously updates the simulation cache with incoming transactions and bundles.
///
/// This function listens for new transactions and bundles on their respective
/// channels and adds them to the simulation cache using the latest observed basefee.
///
/// # Arguments
///
/// - `tx_receiver`: A receiver channel for incoming Ethereum transactions.
/// - `bundle_receiver`: A receiver channel for incoming transaction bundles.
/// - `cache`: The simulation cache used to store transactions and bundles.
/// - `price_reader`: An `Arc<AtomicU64>` providing the latest basefee for simulation pricing.
async fn cache_updater(
mut tx_receiver: mpsc::UnboundedReceiver<
alloy::consensus::EthereumTxEnvelope<alloy::consensus::TxEip4844Variant>,
>,
mut bundle_receiver: mpsc::UnboundedReceiver<Bundle>,
cache: SimCache,
price_reader: Arc<AtomicU64>,
) -> ! {
loop {
let p = price_reader.load(Ordering::Relaxed);
select! {
maybe_tx = tx_receiver.recv() => {
if let Some(tx) = maybe_tx {
debug!(tx = ?tx.hash(), "received transaction");
cache.add_item(tx, p);
}
}
maybe_bundle = bundle_receiver.recv() => {
if let Some(bundle) = maybe_bundle {
debug!(bundle = ?bundle.id, "received bundle");
cache.add_item(bundle.bundle, p);
}
}
}
}
}
27 changes: 5 additions & 22 deletions src/tasks/cache/bundle.rs
Original file line number Diff line number Diff line change
@@ -6,30 +6,13 @@ use init4_bin_base::{
};
use oauth2::TokenResponse;
use reqwest::{Client, Url};
use serde::{Deserialize, Serialize};
use signet_bundle::SignetEthBundle;
use signet_tx_cache::types::{TxCacheBundle, TxCacheBundlesResponse};
use tokio::{
sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
task::JoinHandle,
time::{self, Duration},
};

/// Holds a bundle from the cache with a unique ID and a Zenith bundle.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replaced with sdk types

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Bundle {
/// Cache identifier for the bundle.
pub id: String,
/// The corresponding Signet bundle.
pub bundle: SignetEthBundle,
}

/// Response from the tx-pool containing a list of bundles.
#[derive(Debug, Clone, Serialize, Deserialize)]
struct TxPoolBundleResponse {
/// Bundle responses are available on the bundles property.
pub bundles: Vec<Bundle>,
}

/// The BundlePoller polls the tx-pool for bundles.
#[derive(Debug)]
pub struct BundlePoller {
@@ -60,7 +43,7 @@ impl BundlePoller {
}

/// Fetches bundles from the transaction cache and returns them.
pub async fn check_bundle_cache(&mut self) -> eyre::Result<Vec<Bundle>> {
pub async fn check_bundle_cache(&mut self) -> eyre::Result<Vec<TxCacheBundle>> {
let bundle_url: Url = Url::parse(&self.config.tx_pool_url)?.join("bundles")?;
let Some(token) = self.token.read() else {
warn!("No token available, skipping bundle fetch");
@@ -75,7 +58,7 @@ impl BundlePoller {
.error_for_status()?
.json()
.await
.map(|resp: TxPoolBundleResponse| resp.bundles)
.map(|resp: TxCacheBundlesResponse| resp.bundles)
.map_err(Into::into)
}

@@ -84,7 +67,7 @@ impl BundlePoller {
Duration::from_millis(self.poll_interval_ms)
}

async fn task_future(mut self, outbound: UnboundedSender<Bundle>) {
async fn task_future(mut self, outbound: UnboundedSender<TxCacheBundle>) {
loop {
let span = debug_span!("BundlePoller::loop", url = %self.config.tx_pool_url);

@@ -119,7 +102,7 @@ impl BundlePoller {
}

/// Spawns a task that sends bundles it finds to its channel sender.
pub fn spawn(self) -> (UnboundedReceiver<Bundle>, JoinHandle<()>) {
pub fn spawn(self) -> (UnboundedReceiver<TxCacheBundle>, JoinHandle<()>) {
let (outbound, inbound) = unbounded_channel();

let jh = tokio::spawn(self.task_future(outbound));
21 changes: 20 additions & 1 deletion src/tasks/cache/mod.rs
Original file line number Diff line number Diff line change
@@ -5,4 +5,23 @@ mod tx;
pub use tx::TxPoller;

mod bundle;
pub use bundle::{Bundle, BundlePoller};
pub use bundle::BundlePoller;

use signet_sim::SimCache;
use tokio::task::JoinHandle;

/// Cache tasks for the block builder.
#[derive(Debug)]
pub struct CacheSystem {
/// The cache task.
pub cache_task: JoinHandle<()>,

/// The transaction poller task.
pub tx_poller: JoinHandle<()>,

/// The bundle poller task.
pub bundle_poller: JoinHandle<()>,

/// The sim cache.
pub sim_cache: SimCache,
}
28 changes: 18 additions & 10 deletions src/tasks/cache/task.rs
Original file line number Diff line number Diff line change
@@ -15,9 +15,6 @@ use trevm::revm::context::BlockEnv;
/// the environment changes.
#[derive(Debug)]
pub struct CacheTask {
/// The shared sim cache to populate.
cache: SimCache,

/// The channel to receive the block environment.
env: watch::Receiver<Option<BlockEnv>>,

@@ -28,7 +25,16 @@ pub struct CacheTask {
}

impl CacheTask {
async fn task_future(mut self) {
/// Create a new cache task with the given cache and channels.
pub const fn new(
env: watch::Receiver<Option<BlockEnv>>,
bundles: mpsc::UnboundedReceiver<TxCacheBundle>,
txns: mpsc::UnboundedReceiver<TxEnvelope>,
) -> Self {
Self { env, bundles, txns }
}

async fn task_future(mut self, cache: SimCache) {
loop {
let mut basefee = 0;
tokio::select! {
@@ -41,24 +47,26 @@ impl CacheTask {
if let Some(env) = self.env.borrow_and_update().as_ref() {
basefee = env.basefee;
info!(basefee, number = env.number, timestamp = env.timestamp, "block env changed, clearing cache");
self.cache.clean(
cache.clean(
env.number, env.timestamp
);
}
}
Some(bundle) = self.bundles.recv() => {
self.cache.add_item(bundle.bundle, basefee);
cache.add_item(bundle.bundle, basefee);
}
Some(txn) = self.txns.recv() => {
self.cache.add_item(txn, basefee);
cache.add_item(txn, basefee);
}
}
}
}

/// Spawn the cache task.
pub fn spawn(self) -> JoinHandle<()> {
let fut = self.task_future();
tokio::spawn(fut)
pub fn spawn(self) -> (SimCache, JoinHandle<()>) {
let sim_cache = SimCache::default();
let c = sim_cache.clone();
let fut = self.task_future(sim_cache);
(c, tokio::spawn(fut))
}
}
14 changes: 8 additions & 6 deletions src/tasks/env.rs
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@ use alloy::{
};
use init4_bin_base::deps::tracing::{self, Instrument, debug, error, info_span};
use std::time::Duration;
use tokio::sync::watch;
use tokio::{sync::watch, task::JoinHandle};
use tokio_stream::StreamExt;
use trevm::revm::{context::BlockEnv, context_interface::block::BlobExcessGasAndPrice};

@@ -25,7 +25,7 @@ impl EnvTask {
}

/// Construct a BlockEnv by making calls to the provider.
pub fn construct_block_env(&self, previous: &Header) -> BlockEnv {
fn construct_block_env(&self, previous: &Header) -> BlockEnv {
BlockEnv {
number: previous.number + 1,
beneficiary: self.config.builder_rewards_address,
@@ -45,7 +45,7 @@ impl EnvTask {
}

/// Construct the BlockEnv and send it to the sender.
pub async fn task_fut(self, sender: watch::Sender<Option<BlockEnv>>) {
async fn task_fut(self, sender: watch::Sender<Option<BlockEnv>>) {
let span = info_span!("EnvTask::task_fut::init");
let mut poller = match self.provider.watch_blocks().instrument(span.clone()).await {
Ok(poller) => poller,
@@ -96,22 +96,24 @@ impl EnvTask {
}
};
span.record("number", previous.number);
debug!("retrieved latest block");

let env = self.construct_block_env(&previous);
debug!(?env, "constructed block env");
if sender.send(Some(env)).is_err() {
// The receiver has been dropped, so we can stop the task.
debug!("receiver dropped, stopping task");
break;
}
}
}

/// Spawn the task and return a watch::Receiver for the BlockEnv.
pub fn spawn(self) -> watch::Receiver<Option<BlockEnv>> {
pub fn spawn(self) -> (watch::Receiver<Option<BlockEnv>>, JoinHandle<()>) {
let (sender, receiver) = watch::channel(None);
let fut = self.task_fut(sender);
tokio::spawn(fut);
let jh = tokio::spawn(fut);

receiver
(receiver, jh)
}
}
5 changes: 5 additions & 0 deletions src/tasks/metrics.rs
Original file line number Diff line number Diff line change
@@ -18,6 +18,11 @@ pub struct MetricsTask {
}

impl MetricsTask {
/// Create a new MetricsTask with the given provider
pub const fn new(host_provider: HostProvider) -> Self {
Self { host_provider }
}

/// Given a transaction hash, record metrics on the result of the
/// transaction mining
pub fn log_tx(&self, tx_hash: TxHash) -> impl Future<Output = ()> + use<> {
32 changes: 15 additions & 17 deletions src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
//! Test utilities for testing builder tasks
use crate::{config::BuilderConfig, tasks::block::cfg::PecorinoBlockEnv};
use crate::config::BuilderConfig;
use alloy::{
consensus::{SignableTransaction, TxEip1559, TxEnvelope},
primitives::{Address, FixedBytes, TxKind, U256},
primitives::{Address, B256, TxKind, U256},
signers::{SignerSync, local::PrivateKeySigner},
};
use chrono::{DateTime, Utc};
use eyre::Result;
use init4_bin_base::{
deps::tracing_subscriber::{
@@ -14,10 +13,8 @@ use init4_bin_base::{
perms::OAuthConfig,
utils::calc::SlotCalculator,
};
use std::{
str::FromStr,
time::{Instant, SystemTime},
};
use std::str::FromStr;
use trevm::revm::{context::BlockEnv, context_interface::block::BlobExcessGasAndPrice};

/// Sets up a block builder with test values
pub fn setup_test_config() -> Result<BuilderConfig> {
@@ -90,18 +87,19 @@ pub fn test_block_env(
config: BuilderConfig,
number: u64,
basefee: u64,
finish_by: Instant,
) -> PecorinoBlockEnv {
let remaining = finish_by.duration_since(Instant::now());
let finish_time = SystemTime::now() + remaining;
let deadline: DateTime<Utc> = finish_time.into();

PecorinoBlockEnv {
timestamp: u64,
) -> BlockEnv {
BlockEnv {
number,
beneficiary: Address::repeat_byte(0),
timestamp: deadline.timestamp() as u64,
beneficiary: Address::repeat_byte(1),
timestamp,
gas_limit: config.rollup_block_gas_limit,
basefee,
prevrandao: Some(FixedBytes::random()),
difficulty: U256::ZERO,
prevrandao: Some(B256::random()),
blob_excess_gas_and_price: Some(BlobExcessGasAndPrice {
excess_blob_gas: 0,
blob_gasprice: 0,
}),
}
}
260 changes: 128 additions & 132 deletions tests/block_builder_test.rs
Original file line number Diff line number Diff line change
@@ -1,134 +1,130 @@
//! Tests for the block building task.
#[cfg(test)]
mod tests {
use alloy::{
network::Ethereum,
node_bindings::Anvil,
primitives::U256,
providers::{Provider, RootProvider},
signers::local::PrivateKeySigner,
};
use builder::{
tasks::block::sim::Simulator,
test_utils::{new_signed_tx, setup_logging, setup_test_config, test_block_env},
};
use init4_bin_base::utils::calc::SlotCalculator;
use signet_sim::{SimCache, SimItem};
use signet_types::constants::SignetSystemConstants;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::{sync::mpsc::unbounded_channel, time::timeout};

/// Tests the `handle_build` method of the `Simulator`.
///
/// This test sets up a simulated environment using Anvil, creates a block builder,
/// and verifies that the block builder can successfully build a block containing
/// transactions from multiple senders.
#[cfg(feature = "integration")]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_handle_build() {
setup_logging();

// Make a test config
let config = setup_test_config().unwrap();
let constants = SignetSystemConstants::pecorino();

// Create an anvil instance for testing
let anvil_instance = Anvil::new().chain_id(signet_constants::pecorino::RU_CHAIN_ID).spawn();

// Create a wallet
let keys = anvil_instance.keys();
let test_key_0 = PrivateKeySigner::from_signing_key(keys[0].clone().into());
let test_key_1 = PrivateKeySigner::from_signing_key(keys[1].clone().into());

// Create a rollup provider
let ru_provider = RootProvider::<Ethereum>::new_http(anvil_instance.endpoint_url());

// Create a block builder with a slot calculator for testing
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Clock may have gone backwards")
.as_secs();

let slot_calculator = SlotCalculator::new(now, 0, 12);
let block_builder = Simulator::new(&config, ru_provider.clone(), slot_calculator);

// Setup a sim cache
let sim_items = SimCache::new();

// Add two transactions from two senders to the sim cache
let tx_1 = new_signed_tx(&test_key_0, 0, U256::from(1_f64), 11_000).unwrap();
sim_items.add_item(SimItem::Tx(tx_1), 0);

let tx_2 = new_signed_tx(&test_key_1, 0, U256::from(2_f64), 10_000).unwrap();
sim_items.add_item(SimItem::Tx(tx_2), 0);

// Setup the block env
let finish_by = Instant::now() + Duration::from_secs(2);
let block_number = ru_provider.get_block_number().await.unwrap();
let block_env = test_block_env(config, block_number, 7, finish_by);

// Spawn the block builder task
let got = block_builder.handle_build(constants, sim_items, finish_by, block_env).await;

// Assert on the built block
assert!(got.is_ok());
assert!(got.unwrap().tx_count() == 2);
}

/// Tests the full block builder loop, including transaction ingestion and block simulation.
///
/// This test sets up a simulated environment using Anvil, creates a block builder,
/// and verifies that the builder can process incoming transactions and produce a block
/// within a specified timeout.
#[ignore = "integration test"]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_spawn() {
setup_logging();

// Make a test config
let config = setup_test_config().unwrap();
let constants = SignetSystemConstants::pecorino();

// Create an anvil instance for testing
let anvil_instance = Anvil::new().chain_id(signet_constants::pecorino::RU_CHAIN_ID).spawn();

// Create a wallet
let keys = anvil_instance.keys();
let test_key_0 = PrivateKeySigner::from_signing_key(keys[0].clone().into());
let test_key_1 = PrivateKeySigner::from_signing_key(keys[1].clone().into());

// Plumb inputs for the test setup
let (tx_sender, tx_receiver) = unbounded_channel();
let (_, bundle_receiver) = unbounded_channel();
let (block_sender, mut block_receiver) = unbounded_channel();

// Create a rollup provider
let ru_provider = RootProvider::<Ethereum>::new_http(anvil_instance.endpoint_url());

let sim = Simulator::new(&config, ru_provider.clone(), config.slot_calculator);

// Create a shared sim cache
let sim_cache = SimCache::new();

// Create a sim cache and start filling it with items
sim.spawn_cache_tasks(tx_receiver, bundle_receiver, sim_cache.clone());

// Finally, Kick off the block builder task.
sim.spawn_simulator_task(constants, sim_cache.clone(), block_sender);

// Feed in transactions to the tx_sender and wait for the block to be simulated
let tx_1 = new_signed_tx(&test_key_0, 0, U256::from(1_f64), 11_000).unwrap();
let tx_2 = new_signed_tx(&test_key_1, 0, U256::from(2_f64), 10_000).unwrap();
tx_sender.send(tx_1).unwrap();
tx_sender.send(tx_2).unwrap();

// Wait for a block with timeout
let result = timeout(Duration::from_secs(5), block_receiver.recv()).await;
assert!(result.is_ok(), "Did not receive block within 5 seconds");

// Assert on the block
let block = result.unwrap();
assert!(block.is_some(), "Block channel closed without receiving a block");
assert!(block.unwrap().tx_count() == 2); // TODO: Why is this failing? I'm seeing EVM errors but haven't tracked them down yet.
}
use alloy::{
network::Ethereum,
node_bindings::Anvil,
primitives::U256,
providers::{Provider, RootProvider},
signers::local::PrivateKeySigner,
};
use builder::{
tasks::{block::sim::Simulator, cache::CacheTask},
test_utils::{new_signed_tx, setup_logging, setup_test_config, test_block_env},
};
use signet_sim::{SimCache, SimItem};
use signet_types::constants::SignetSystemConstants;
use std::time::{Duration, Instant};
use tokio::{sync::mpsc::unbounded_channel, time::timeout};

/// Tests the `handle_build` method of the `Simulator`.
///
/// This test sets up a simulated environment using Anvil, creates a block builder,
/// and verifies that the block builder can successfully build a block containing
/// transactions from multiple senders.
#[ignore = "integration test"]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_handle_build() {
use alloy::eips::BlockId;

setup_logging();

// Make a test config
let config = setup_test_config().unwrap();
let constants = SignetSystemConstants::pecorino();

// Create an anvil instance for testing
let anvil_instance = Anvil::new().chain_id(signet_constants::pecorino::RU_CHAIN_ID).spawn();

// Create a wallet
let keys = anvil_instance.keys();
let test_key_0 = PrivateKeySigner::from_signing_key(keys[0].clone().into());
let test_key_1 = PrivateKeySigner::from_signing_key(keys[1].clone().into());

// Create a rollup provider
let ru_provider = RootProvider::<Ethereum>::new_http(anvil_instance.endpoint_url());

let block_env = config.env_task().spawn().0;

let block_builder = Simulator::new(&config, ru_provider.clone(), block_env);

// Setup a sim cache
let sim_items = SimCache::new();

// Add two transactions from two senders to the sim cache
let tx_1 = new_signed_tx(&test_key_0, 0, U256::from(1_f64), 11_000).unwrap();
sim_items.add_item(SimItem::Tx(tx_1), 0);

let tx_2 = new_signed_tx(&test_key_1, 0, U256::from(2_f64), 10_000).unwrap();
sim_items.add_item(SimItem::Tx(tx_2), 0);

// Setup the block env
let finish_by = Instant::now() + Duration::from_secs(2);
let header = ru_provider.get_block(BlockId::latest()).await.unwrap().unwrap().header.inner;
let number = header.number + 1;
let timestamp = header.timestamp + config.slot_calculator.slot_duration();
let block_env = test_block_env(config, number, 7, timestamp);

// Spawn the block builder task
let got = block_builder.handle_build(constants, sim_items, finish_by, block_env).await;

// Assert on the built block
assert!(got.is_ok());
assert!(got.unwrap().tx_count() == 2);
}

/// Tests the full block builder loop, including transaction ingestion and block simulation.
///
/// This test sets up a simulated environment using Anvil, creates a block builder,
/// and verifies that the builder can process incoming transactions and produce a block
/// within a specified timeout.
#[ignore = "integration test"]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_spawn() {
setup_logging();

// Make a test config
let config = setup_test_config().unwrap();
let constants = SignetSystemConstants::pecorino();

// Create an anvil instance for testing
let anvil_instance = Anvil::new().chain_id(signet_constants::pecorino::RU_CHAIN_ID).spawn();

// Create a wallet
let keys = anvil_instance.keys();
let test_key_0 = PrivateKeySigner::from_signing_key(keys[0].clone().into());
let test_key_1 = PrivateKeySigner::from_signing_key(keys[1].clone().into());

// Plumb inputs for the test setup
let (tx_sender, tx_receiver) = unbounded_channel();
let (_, bundle_receiver) = unbounded_channel();
let (block_sender, mut block_receiver) = unbounded_channel();

let env_task = config.env_task();
let (block_env, _env_jh) = env_task.spawn();

let cache_task = CacheTask::new(block_env.clone(), bundle_receiver, tx_receiver);
let (sim_cache, _cache_jh) = cache_task.spawn();

// Create a rollup provider
let ru_provider = RootProvider::<Ethereum>::new_http(anvil_instance.endpoint_url());

let sim = Simulator::new(&config, ru_provider.clone(), block_env);

// Finally, Kick off the block builder task.
sim.spawn_simulator_task(constants, sim_cache.clone(), block_sender);

// Feed in transactions to the tx_sender and wait for the block to be simulated
let tx_1 = new_signed_tx(&test_key_0, 0, U256::from(1_f64), 11_000).unwrap();
let tx_2 = new_signed_tx(&test_key_1, 0, U256::from(2_f64), 10_000).unwrap();
tx_sender.send(tx_1).unwrap();
tx_sender.send(tx_2).unwrap();

// Wait for a block with timeout
let result = timeout(Duration::from_secs(5), block_receiver.recv()).await;
assert!(result.is_ok(), "Did not receive block within 5 seconds");

// Assert on the block
let block = result.unwrap();
assert!(block.is_some(), "Block channel closed without receiving a block");
assert!(block.unwrap().tx_count() == 2); // TODO: Why is this failing? I'm seeing EVM errors but haven't tracked them down yet.
}
22 changes: 10 additions & 12 deletions tests/bundle_poller_test.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
mod tests {
use builder::test_utils;
use eyre::Result;
use builder::test_utils;
use eyre::Result;

#[ignore = "integration test"]
#[tokio::test]
async fn test_bundle_poller_roundtrip() -> Result<()> {
let config = test_utils::setup_test_config().unwrap();
let token = config.oauth_token();
#[ignore = "integration test"]
#[tokio::test]
async fn test_bundle_poller_roundtrip() -> Result<()> {
let config = test_utils::setup_test_config().unwrap();
let token = config.oauth_token();

let mut bundle_poller = builder::tasks::cache::BundlePoller::new(&config, token);
let mut bundle_poller = builder::tasks::cache::BundlePoller::new(&config, token);

let _ = bundle_poller.check_bundle_cache().await?;
let _ = bundle_poller.check_bundle_cache().await?;

Ok(())
}
Ok(())
}
20 changes: 20 additions & 0 deletions tests/cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use builder::test_utils::{setup_logging, setup_test_config};
use init4_bin_base::deps::tracing::warn;
use std::time::Duration;

#[ignore = "integration test. This test will take >12 seconds to run, and requires Authz configuration env vars."]
#[tokio::test]
async fn test_bundle_poller_roundtrip() -> eyre::Result<()> {
setup_logging();

let config = setup_test_config().unwrap();

let (block_env, _jh) = config.env_task().spawn();
let cache = config.spawn_cache_system(block_env);

tokio::time::sleep(Duration::from_secs(12)).await;

warn!(txns = ?cache.sim_cache.read_best(5));

Ok(())
}
15 changes: 15 additions & 0 deletions tests/env.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use builder::test_utils::{setup_logging, setup_test_config};

#[ignore = "integration test. This test will take between 0 and 12 seconds to run."]
#[tokio::test]
async fn test_bundle_poller_roundtrip() {
setup_logging();

let config = setup_test_config().unwrap();
let env_task = config.env_task();
let (mut env_watcher, _jh) = env_task.spawn();

env_watcher.changed().await.unwrap();
let env = env_watcher.borrow_and_update();
assert!(env.as_ref().is_some(), "Env should be Some");
}
72 changes: 35 additions & 37 deletions tests/tx_poller_test.rs
Original file line number Diff line number Diff line change
@@ -1,50 +1,48 @@
mod tests {
use alloy::{primitives::U256, signers::local::PrivateKeySigner};
use builder::{
config::BuilderConfig,
tasks::cache::TxPoller,
test_utils::{new_signed_tx, setup_logging, setup_test_config},
};
// Import the refactored function
use eyre::{Ok, Result};
use alloy::{primitives::U256, signers::local::PrivateKeySigner};
use builder::{
config::BuilderConfig,
tasks::cache::TxPoller,
test_utils::{new_signed_tx, setup_logging, setup_test_config},
};
// Import the refactored function
use eyre::{Ok, Result};

#[ignore = "integration test"]
#[tokio::test]
async fn test_tx_roundtrip() -> Result<()> {
setup_logging();
#[ignore = "integration test"]
#[tokio::test]
async fn test_tx_roundtrip() -> Result<()> {
setup_logging();

// Create a new test environment
let config = setup_test_config()?;
// Create a new test environment
let config = setup_test_config()?;

// Post a transaction to the cache
post_tx(&config).await?;
// Post a transaction to the cache
post_tx(&config).await?;

// Create a new poller
let mut poller = TxPoller::new(&config);
// Create a new poller
let mut poller = TxPoller::new(&config);

// Fetch transactions the pool
let transactions = poller.check_tx_cache().await?;
// Fetch transactions the pool
let transactions = poller.check_tx_cache().await?;

// Ensure at least one transaction exists
assert!(!transactions.is_empty());
// Ensure at least one transaction exists
assert!(!transactions.is_empty());

Ok(())
}

async fn post_tx(config: &BuilderConfig) -> Result<()> {
let client = reqwest::Client::new();
Ok(())
}

let wallet = PrivateKeySigner::random();
let tx_envelope = new_signed_tx(&wallet, 1, U256::from(1), 10_000)?;
async fn post_tx(config: &BuilderConfig) -> Result<()> {
let client = reqwest::Client::new();

let url = format!("{}/transactions", config.tx_pool_url);
let response = client.post(&url).json(&tx_envelope).send().await?;
let wallet = PrivateKeySigner::random();
let tx_envelope = new_signed_tx(&wallet, 1, U256::from(1), 10_000)?;

if !response.status().is_success() {
let error_text = response.text().await?;
eyre::bail!("Failed to post transaction: {}", error_text);
}
let url = format!("{}/transactions", config.tx_pool_url);
let response = client.post(&url).json(&tx_envelope).send().await?;

Ok(())
if !response.status().is_success() {
let error_text = response.text().await?;
eyre::bail!("Failed to post transaction: {}", error_text);
}

Ok(())
}