Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 6bf80dd

Browse files
committedMay 20, 2025··
refactor: integrate env and cache tasks
1 parent 888c1db commit 6bf80dd

File tree

16 files changed

+419
-595
lines changed

16 files changed

+419
-595
lines changed
 

‎Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ integration = []
2727
[dependencies]
2828
init4-bin-base = { version = "0.3.4", features = ["perms"] }
2929

30-
signet-bundle = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }
3130
signet-constants = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }
3231
signet-sim = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }
3332
signet-tx-cache = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }
@@ -56,7 +55,6 @@ serde_json = "1.0"
5655
tokio = { version = "1.36.0", features = ["full", "macros", "rt-multi-thread"] }
5756

5857
oauth2 = "5"
59-
chrono = "0.4.41"
6058
tokio-stream = "0.1.17"
6159
url = "2.5.4"
6260

‎bin/builder.rs

Lines changed: 37 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,14 @@
11
use builder::{
22
config::BuilderConfig,
33
service::serve_builder,
4-
tasks::{
5-
block::sim::Simulator,
6-
cache::{BundlePoller, TxPoller},
7-
metrics::MetricsTask,
8-
submit::SubmitTask,
9-
},
4+
tasks::{block::sim::Simulator, metrics::MetricsTask, submit::SubmitTask},
5+
};
6+
use init4_bin_base::{
7+
deps::tracing::{info, info_span},
8+
utils::from_env::FromEnv,
109
};
11-
use init4_bin_base::{deps::tracing, utils::from_env::FromEnv};
12-
use signet_sim::SimCache;
1310
use signet_types::constants::SignetSystemConstants;
1411
use tokio::select;
15-
use tracing::info_span;
1612

1713
// Note: Must be set to `multi_thread` to support async tasks.
1814
// See: https://docs.rs/tokio/latest/tokio/attr.main.html
@@ -21,74 +17,74 @@ async fn main() -> eyre::Result<()> {
2117
let _guard = init4_bin_base::init4();
2218
let init_span_guard = info_span!("builder initialization");
2319

20+
// Pull the configuration from the environment
2421
let config = BuilderConfig::from_env()?.clone();
2522
let constants = SignetSystemConstants::pecorino();
26-
let token = config.oauth_token();
2723

24+
// Spawn the EnvTask
25+
let env_task = config.env_task();
26+
let (block_env, env_jh) = env_task.spawn();
27+
28+
// Spawn the cache system
29+
let cache_system = config.spawn_cache_system(block_env.clone());
30+
31+
// Prep providers and contracts
2832
let (host_provider, quincey) =
2933
tokio::try_join!(config.connect_host_provider(), config.connect_quincey())?;
3034
let ru_provider = config.connect_ru_provider();
31-
3235
let zenith = config.connect_zenith(host_provider.clone());
3336

37+
// Set up the metrics task
3438
let metrics = MetricsTask { host_provider };
3539
let (tx_channel, metrics_jh) = metrics.spawn();
3640

41+
// Make a Tx submission task
3742
let submit =
3843
SubmitTask { zenith, quincey, config: config.clone(), outbound_tx_channel: tx_channel };
3944

40-
let tx_poller = TxPoller::new(&config);
41-
let (tx_receiver, tx_poller_jh) = tx_poller.spawn();
42-
43-
let bundle_poller = BundlePoller::new(&config, token);
44-
let (bundle_receiver, bundle_poller_jh) = bundle_poller.spawn();
45-
45+
// Set up tx submission
4646
let (submit_channel, submit_jh) = submit.spawn();
4747

48-
let sim_items = SimCache::new();
49-
let slot_calculator = config.slot_calculator;
50-
51-
let sim = Simulator::new(&config, ru_provider.clone(), slot_calculator);
52-
53-
let (basefee_jh, sim_cache_jh) =
54-
sim.spawn_cache_tasks(tx_receiver, bundle_receiver, sim_items.clone());
55-
56-
let build_jh = sim.spawn_simulator_task(constants, sim_items.clone(), submit_channel);
48+
// Set up the simulator
49+
let sim = Simulator::new(&config, ru_provider.clone(), block_env);
50+
let build_jh = sim.spawn_simulator_task(constants, cache_system.sim_cache, submit_channel);
5751

52+
// Start the healthcheck server
5853
let server = serve_builder(([0, 0, 0, 0], config.builder_port));
5954

6055
// We have finished initializing the builder, so we can drop the init span
6156
// guard.
6257
drop(init_span_guard);
6358

6459
select! {
65-
_ = tx_poller_jh => {
66-
tracing::info!("tx_poller finished");
60+
61+
_ = env_jh => {
62+
info!("env task finished");
6763
},
68-
_ = bundle_poller_jh => {
69-
tracing::info!("bundle_poller finished");
64+
_ = cache_system.cache_task => {
65+
info!("cache task finished");
66+
},
67+
_ = cache_system.tx_poller => {
68+
info!("tx_poller finished");
69+
},
70+
_ = cache_system.bundle_poller => {
71+
info!("bundle_poller finished");
7072
},
71-
_ = sim_cache_jh => {
72-
tracing::info!("sim cache task finished");
73-
}
74-
_ = basefee_jh => {
75-
tracing::info!("basefee task finished");
76-
}
7773
_ = submit_jh => {
78-
tracing::info!("submit finished");
74+
info!("submit finished");
7975
},
8076
_ = metrics_jh => {
81-
tracing::info!("metrics finished");
77+
info!("metrics finished");
8278
},
8379
_ = build_jh => {
84-
tracing::info!("build finished");
80+
info!("build finished");
8581
}
8682
_ = server => {
87-
tracing::info!("server finished");
83+
info!("server finished");
8884
}
8985
}
9086

91-
tracing::info!("shutting down");
87+
info!("shutting down");
9288

9389
Ok(())
9490
}

‎src/config.rs

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,11 @@
1-
use crate::quincey::Quincey;
1+
use crate::{
2+
quincey::Quincey,
3+
tasks::{
4+
block::cfg::SignetCfgEnv,
5+
cache::{BundlePoller, CacheSystem, CacheTask, TxPoller},
6+
env::EnvTask,
7+
},
8+
};
29
use alloy::{
310
network::{Ethereum, EthereumWallet},
411
primitives::Address,
@@ -21,6 +28,8 @@ use init4_bin_base::{
2128
};
2229
use signet_zenith::Zenith;
2330
use std::borrow::Cow;
31+
use tokio::sync::watch;
32+
use trevm::revm::context::BlockEnv;
2433

2534
/// Type alias for the provider used to simulate against rollup state.
2635
pub type RuProvider = RootProvider<Ethereum>;
@@ -166,8 +175,13 @@ impl BuilderConfig {
166175

167176
/// Connect to the Rollup rpc provider.
168177
pub fn connect_ru_provider(&self) -> RootProvider<Ethereum> {
169-
let url = url::Url::parse(&self.ru_rpc_url).expect("failed to parse URL");
170-
RootProvider::<Ethereum>::new_http(url)
178+
static ONCE: std::sync::OnceLock<RootProvider<Ethereum>> = std::sync::OnceLock::new();
179+
180+
ONCE.get_or_init(|| {
181+
let url = url::Url::parse(&self.ru_rpc_url).expect("failed to parse URL");
182+
RootProvider::new_http(url)
183+
})
184+
.clone()
171185
}
172186

173187
/// Connect to the Host rpc provider.
@@ -222,4 +236,36 @@ impl BuilderConfig {
222236

223237
Ok(Quincey::new_remote(client, url, token))
224238
}
239+
240+
/// Create an [`EnvTask`] using this config.
241+
pub fn env_task(&self) -> EnvTask {
242+
let provider = self.connect_ru_provider();
243+
EnvTask::new(self.clone(), provider)
244+
}
245+
246+
/// Spawn a new [`CacheSystem`] using this config. This contains the
247+
/// joinhandles for [`TxPoller`] and [`BundlePoller`] and [`CacheTask`], as
248+
/// well as the [`SimCache`] and the block env watcher.
249+
///
250+
/// [`SimCache`]: signet_sim::SimCache
251+
pub fn spawn_cache_system(&self, block_env: watch::Receiver<Option<BlockEnv>>) -> CacheSystem {
252+
// Tx Poller pulls transactions from the cache
253+
let tx_poller = TxPoller::new(self);
254+
let (tx_receiver, tx_poller) = tx_poller.spawn();
255+
256+
// Bundle Poller pulls bundles from the cache
257+
let bundle_poller = BundlePoller::new(self, self.oauth_token());
258+
let (bundle_receiver, bundle_poller) = bundle_poller.spawn();
259+
260+
// Set up the cache task
261+
let cache_task = CacheTask::new(block_env.clone(), bundle_receiver, tx_receiver);
262+
let (sim_cache, cache_task) = cache_task.spawn();
263+
264+
CacheSystem { cache_task, tx_poller, bundle_poller, sim_cache }
265+
}
266+
267+
/// Create a [`SignetCfgEnv`] using this config.
268+
pub const fn cfg_env(&self) -> SignetCfgEnv {
269+
SignetCfgEnv { chain_id: self.ru_chain_id }
270+
}
225271
}

‎src/tasks/block/cfg.rs

Lines changed: 7 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,14 @@
11
//! This file implements the [`trevm::Cfg`] and [`trevm::Block`] traits for Pecorino blocks.
2-
use alloy::primitives::{Address, B256, FixedBytes, U256};
3-
use trevm::{
4-
Block,
5-
revm::{
6-
context::{BlockEnv, CfgEnv},
7-
context_interface::block::BlobExcessGasAndPrice,
8-
primitives::hardfork::SpecId,
9-
},
10-
};
11-
12-
use crate::config::BuilderConfig;
2+
use trevm::revm::{context::CfgEnv, primitives::hardfork::SpecId};
133

144
/// PecorinoCfg holds network-level configuration values.
155
#[derive(Debug, Clone, Copy)]
16-
pub struct PecorinoCfg {}
6+
pub struct SignetCfgEnv {
7+
/// The chain ID.
8+
pub chain_id: u64,
9+
}
1710

18-
impl trevm::Cfg for PecorinoCfg {
11+
impl trevm::Cfg for SignetCfgEnv {
1912
/// Fills the configuration environment with Pecorino-specific values.
2013
///
2114
/// # Arguments
@@ -24,76 +17,7 @@ impl trevm::Cfg for PecorinoCfg {
2417
fn fill_cfg_env(&self, cfg_env: &mut CfgEnv) {
2518
let CfgEnv { chain_id, spec, .. } = cfg_env;
2619

27-
*chain_id = signet_constants::pecorino::RU_CHAIN_ID;
20+
*chain_id = self.chain_id;
2821
*spec = SpecId::default();
2922
}
3023
}
31-
32-
/// PecorinoBlockEnv holds block-level configurations for Pecorino blocks.
33-
#[derive(Debug, Clone, Copy)]
34-
pub struct PecorinoBlockEnv {
35-
/// The block number for this block.
36-
pub number: u64,
37-
/// The address the block reward should be sent to.
38-
pub beneficiary: Address,
39-
/// Timestamp for the block.
40-
pub timestamp: u64,
41-
/// The gas limit for this block environment.
42-
pub gas_limit: u64,
43-
/// The basefee to use for calculating gas usage.
44-
pub basefee: u64,
45-
/// The prevrandao to use for this block.
46-
pub prevrandao: Option<FixedBytes<32>>,
47-
}
48-
49-
/// Implements [`trevm::Block`] for the Pecorino block.
50-
impl Block for PecorinoBlockEnv {
51-
/// Fills the block environment with the Pecorino specific values
52-
fn fill_block_env(&self, block_env: &mut BlockEnv) {
53-
// Destructure the fields off of the block_env and modify them
54-
let BlockEnv {
55-
number,
56-
beneficiary,
57-
timestamp,
58-
gas_limit,
59-
basefee,
60-
difficulty,
61-
prevrandao,
62-
blob_excess_gas_and_price,
63-
} = block_env;
64-
*number = self.number;
65-
*beneficiary = self.beneficiary;
66-
*timestamp = self.timestamp;
67-
*gas_limit = self.gas_limit;
68-
*basefee = self.basefee;
69-
*prevrandao = self.prevrandao;
70-
71-
// NB: The following fields are set to sane defaults because they
72-
// are not supported by the rollup
73-
*difficulty = U256::ZERO;
74-
*blob_excess_gas_and_price =
75-
Some(BlobExcessGasAndPrice { excess_blob_gas: 0, blob_gasprice: 0 });
76-
}
77-
}
78-
79-
impl PecorinoBlockEnv {
80-
/// Returns a new PecorinoBlockEnv with the specified values.
81-
///
82-
/// # Arguments
83-
///
84-
/// - config: The BuilderConfig for the builder.
85-
/// - number: The block number of this block, usually the latest block number plus 1,
86-
/// unless simulating blocks in the past.
87-
/// - timestamp: The timestamp of the block, typically set to the deadline of the
88-
/// block building task.
89-
pub fn new(config: BuilderConfig, number: u64, timestamp: u64, basefee: u64) -> Self {
90-
PecorinoBlockEnv {
91-
number,
92-
beneficiary: config.builder_rewards_address,
93-
timestamp,
94-
gas_limit: config.rollup_block_gas_limit,
95-
basefee,
96-
prevrandao: Some(B256::random()),
97-
}
98-
}
99-
}

‎src/tasks/block/sim.rs

Lines changed: 46 additions & 229 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,30 @@
11
//! `block.rs` contains the Simulator and everything that wires it into an
22
//! actor that handles the simulation of a stream of bundles and transactions
33
//! and turns them into valid Pecorino blocks for network submission.
4-
use super::cfg::PecorinoBlockEnv;
5-
use crate::{
6-
config::{BuilderConfig, RuProvider},
7-
tasks::{block::cfg::PecorinoCfg, cache::Bundle},
8-
};
9-
use alloy::{
10-
consensus::TxEnvelope,
11-
eips::{BlockId, BlockNumberOrTag::Latest},
12-
network::Ethereum,
13-
providers::Provider,
14-
};
15-
use chrono::{DateTime, Utc};
16-
use eyre::{Context, bail};
4+
use crate::config::{BuilderConfig, RuProvider};
5+
use alloy::{eips::BlockId, network::Ethereum, providers::Provider};
176
use init4_bin_base::{
18-
deps::tracing::{debug, error, info, warn},
7+
deps::tracing::{debug, error},
198
utils::calc::SlotCalculator,
209
};
2110
use signet_sim::{BlockBuild, BuiltBlock, SimCache};
2211
use signet_types::constants::SignetSystemConstants;
23-
use std::{
12+
use std::time::{Duration, Instant};
13+
use tokio::{
2414
sync::{
25-
Arc,
26-
atomic::{AtomicU64, Ordering},
15+
mpsc::{self},
16+
watch,
2717
},
28-
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
29-
};
30-
use tokio::{
31-
select,
32-
sync::mpsc::{self},
3318
task::JoinHandle,
34-
time::sleep,
3519
};
3620
use trevm::revm::{
21+
context::BlockEnv,
3722
database::{AlloyDB, WrapDatabaseAsync},
3823
inspector::NoOpInspector,
3924
};
4025

26+
type AlloyDatabaseProvider = WrapDatabaseAsync<AlloyDB<Ethereum, RuProvider>>;
27+
4128
/// `Simulator` is responsible for periodically building blocks and submitting them for
4229
/// signing and inclusion in the blockchain. It wraps a rollup provider and a slot
4330
/// calculator with a builder configuration.
@@ -47,11 +34,10 @@ pub struct Simulator {
4734
pub config: BuilderConfig,
4835
/// A provider that cannot sign transactions, used for interacting with the rollup.
4936
pub ru_provider: RuProvider,
50-
/// The slot calculator for determining when to wake up and build blocks.
51-
pub slot_calculator: SlotCalculator,
52-
}
5337

54-
type AlloyDatabaseProvider = WrapDatabaseAsync<AlloyDB<Ethereum, RuProvider>>;
38+
/// The block configuration environment on which to simulate
39+
pub block_env: watch::Receiver<Option<BlockEnv>>,
40+
}
5541

5642
impl Simulator {
5743
/// Creates a new `Simulator` instance.
@@ -60,17 +46,21 @@ impl Simulator {
6046
///
6147
/// - `config`: The configuration for the builder.
6248
/// - `ru_provider`: A provider for interacting with the rollup.
63-
/// - `slot_calculator`: A slot calculator for managing block timing.
6449
///
6550
/// # Returns
6651
///
6752
/// A new `Simulator` instance.
6853
pub fn new(
6954
config: &BuilderConfig,
7055
ru_provider: RuProvider,
71-
slot_calculator: SlotCalculator,
56+
block_env: watch::Receiver<Option<BlockEnv>>,
7257
) -> Self {
73-
Self { config: config.clone(), ru_provider, slot_calculator }
58+
Self { config: config.clone(), ru_provider, block_env }
59+
}
60+
61+
/// Get the slot calculator.
62+
pub const fn slot_calculator(&self) -> &SlotCalculator {
63+
&self.config.slot_calculator
7464
}
7565

7666
/// Handles building a single block.
@@ -89,14 +79,14 @@ impl Simulator {
8979
constants: SignetSystemConstants,
9080
sim_items: SimCache,
9181
finish_by: Instant,
92-
block: PecorinoBlockEnv,
82+
block: BlockEnv,
9383
) -> eyre::Result<BuiltBlock> {
9484
let db = self.create_db().await.unwrap();
9585

9686
let block_build: BlockBuild<_, NoOpInspector> = BlockBuild::new(
9787
db,
9888
constants,
99-
PecorinoCfg {},
89+
self.config.cfg_env(),
10090
block,
10191
finish_by,
10292
self.config.concurrency_limit,
@@ -110,85 +100,6 @@ impl Simulator {
110100
Ok(block)
111101
}
112102

113-
/// Spawns two tasks: one to handle incoming transactions and bundles,
114-
/// adding them to the simulation cache, and one to track the latest basefee.
115-
///
116-
/// # Arguments
117-
///
118-
/// - `tx_receiver`: A channel receiver for incoming transactions.
119-
/// - `bundle_receiver`: A channel receiver for incoming bundles.
120-
/// - `cache`: The simulation cache to store the received items.
121-
///
122-
/// # Returns
123-
///
124-
/// A `JoinHandle` for the basefee updater and a `JoinHandle` for the
125-
/// cache handler.
126-
pub fn spawn_cache_tasks(
127-
&self,
128-
tx_receiver: mpsc::UnboundedReceiver<TxEnvelope>,
129-
bundle_receiver: mpsc::UnboundedReceiver<Bundle>,
130-
cache: SimCache,
131-
) -> (JoinHandle<()>, JoinHandle<()>) {
132-
debug!("starting up cache handler");
133-
134-
let basefee_price = Arc::new(AtomicU64::new(0_u64));
135-
let basefee_reader = Arc::clone(&basefee_price);
136-
let fut = self.basefee_updater_fut(basefee_price);
137-
138-
// Update the basefee on a per-block cadence
139-
let basefee_jh = tokio::spawn(fut);
140-
141-
// Update the sim cache whenever a transaction or bundle is received with respect to the basefee
142-
let cache_jh = tokio::spawn(async move {
143-
cache_updater(tx_receiver, bundle_receiver, cache, basefee_reader).await
144-
});
145-
146-
(basefee_jh, cache_jh)
147-
}
148-
149-
/// Periodically updates the shared basefee by querying the latest block.
150-
///
151-
/// This function calculates the remaining time until the next slot,
152-
/// sleeps until that time, and then retrieves the latest basefee from the rollup provider.
153-
/// The updated basefee is stored in the provided `AtomicU64`.
154-
///
155-
/// This function runs continuously.
156-
///
157-
/// # Arguments
158-
///
159-
/// - `price`: A shared `Arc<AtomicU64>` used to store the updated basefee value.
160-
fn basefee_updater_fut(&self, price: Arc<AtomicU64>) -> impl Future<Output = ()> + use<> {
161-
let slot_calculator = self.slot_calculator;
162-
let ru_provider = self.ru_provider.clone();
163-
164-
async move {
165-
debug!("starting basefee updater");
166-
loop {
167-
// calculate start of next slot plus a small buffer
168-
let time_remaining = slot_calculator.slot_duration()
169-
- slot_calculator.current_timepoint_within_slot()
170-
+ 1;
171-
debug!(time_remaining = ?time_remaining, "basefee updater sleeping until next slot");
172-
173-
// wait until that point in time
174-
sleep(Duration::from_secs(time_remaining)).await;
175-
176-
// update the basefee with that price
177-
let resp = ru_provider.get_block_by_number(Latest).await.inspect_err(|e| {
178-
error!(error = %e, "RPC error during basefee update");
179-
});
180-
181-
if let Ok(Some(block)) = resp {
182-
let basefee = block.header.base_fee_per_gas.unwrap_or(0);
183-
price.store(basefee, Ordering::Relaxed);
184-
debug!(basefee = basefee, "basefee updated");
185-
} else {
186-
warn!("get basefee failed - an error likely occurred");
187-
}
188-
}
189-
}
190-
}
191-
192103
/// Spawns the simulator task, which handles the setup and sets the deadline
193104
/// for the each round of simulation.
194105
///
@@ -227,7 +138,7 @@ impl Simulator {
227138
/// - `cache`: The simulation cache containing transactions and bundles.
228139
/// - `submit_sender`: A channel sender used to submit built blocks.
229140
async fn run_simulator(
230-
self,
141+
mut self,
231142
constants: SignetSystemConstants,
232143
cache: SimCache,
233144
submit_sender: mpsc::UnboundedSender<BuiltBlock>,
@@ -236,14 +147,16 @@ impl Simulator {
236147
let sim_cache = cache.clone();
237148
let finish_by = self.calculate_deadline();
238149

239-
let block_env = match self.next_block_env(finish_by).await {
240-
Ok(block) => block,
241-
Err(err) => {
242-
error!(err = %err, "failed to configure next block");
243-
break;
244-
}
245-
};
246-
info!(block_env = ?block_env, "created block");
150+
// Wait for the block environment to be set
151+
if self.block_env.changed().await.is_err() {
152+
error!("block_env channel closed");
153+
return;
154+
}
155+
156+
// If no env, skip this run
157+
let Some(block_env) = self.block_env.borrow_and_update().clone() else { return };
158+
159+
debug!(block_env = ?block_env, "building on block");
247160

248161
match self.handle_build(constants, sim_cache, finish_by, block_env).await {
249162
Ok(block) => {
@@ -265,13 +178,19 @@ impl Simulator {
265178
/// An `Instant` representing the simulation deadline, as calculated by determining
266179
/// the time left in the current slot and adding that to the current timestamp in UNIX seconds.
267180
pub fn calculate_deadline(&self) -> Instant {
268-
// Calculate the current timestamp in seconds since the UNIX epoch
269-
let now = SystemTime::now();
270-
let unix_seconds = now.duration_since(UNIX_EPOCH).expect("Time went backwards").as_secs();
271-
// Calculate the time remaining in the current slot
272-
let remaining = self.slot_calculator.calculate_timepoint_within_slot(unix_seconds);
273-
// Deadline is equal to the start of the next slot plus the time remaining in this slot
274-
Instant::now() + Duration::from_secs(remaining)
181+
// Get the current timepoint within the slot.
182+
let timepoint = self.slot_calculator().current_timepoint_within_slot();
183+
184+
// We have the timepoint in seconds into the slot. To find out what's
185+
// remaining, we need to subtract it from the slot duration
186+
let remaining = self.slot_calculator().slot_duration() - timepoint;
187+
188+
// We add a 1500 ms buffer to account for sequencer stopping signing.
189+
190+
let candidate =
191+
Instant::now() + Duration::from_secs(remaining) - Duration::from_millis(1500);
192+
193+
candidate.max(Instant::now())
275194
}
276195

277196
/// Creates an `AlloyDB` instance from the rollup provider.
@@ -300,106 +219,4 @@ impl Simulator {
300219
let wrapped_db: AlloyDatabaseProvider = WrapDatabaseAsync::new(alloy_db).unwrap();
301220
Some(wrapped_db)
302221
}
303-
304-
/// Prepares the next block environment.
305-
///
306-
/// Prepares the next block environment to load into the simulator by fetching the latest block number,
307-
/// assigning the correct next block number, checking the basefee, and setting the timestamp,
308-
/// reward address, and gas configuration for the block environment based on builder configuration.
309-
///
310-
/// # Arguments
311-
///
312-
/// - finish_by: The deadline at which block simulation will end.
313-
async fn next_block_env(&self, finish_by: Instant) -> eyre::Result<PecorinoBlockEnv> {
314-
let remaining = finish_by.duration_since(Instant::now());
315-
let finish_time = SystemTime::now() + remaining;
316-
let deadline: DateTime<Utc> = finish_time.into();
317-
debug!(deadline = %deadline, "preparing block env");
318-
319-
// Fetch the latest block number and increment it by 1
320-
let latest_block_number = match self.ru_provider.get_block_number().await {
321-
Ok(num) => num,
322-
Err(err) => {
323-
error!(%err, "RPC error during block build");
324-
bail!(err)
325-
}
326-
};
327-
debug!(next_block_num = latest_block_number + 1, "preparing block env");
328-
329-
// Fetch the basefee from previous block to calculate gas for this block
330-
let basefee = match self.get_basefee().await? {
331-
Some(basefee) => basefee,
332-
None => {
333-
warn!("get basefee failed - RPC error likely occurred");
334-
todo!()
335-
}
336-
};
337-
debug!(basefee = basefee, "setting basefee");
338-
339-
// Craft the Block environment to pass to the simulator
340-
let block_env = PecorinoBlockEnv::new(
341-
self.config.clone(),
342-
latest_block_number + 1,
343-
deadline.timestamp() as u64,
344-
basefee,
345-
);
346-
debug!(block_env = ?block_env, "prepared block env");
347-
348-
Ok(block_env)
349-
}
350-
351-
/// Returns the basefee of the latest block.
352-
///
353-
/// # Returns
354-
///
355-
/// The basefee of the previous (latest) block if the request was successful,
356-
/// or a sane default if the RPC failed.
357-
async fn get_basefee(&self) -> eyre::Result<Option<u64>> {
358-
let Some(block) =
359-
self.ru_provider.get_block_by_number(Latest).await.wrap_err("basefee error")?
360-
else {
361-
return Ok(None);
362-
};
363-
364-
debug!(basefee = ?block.header.base_fee_per_gas, "basefee found");
365-
Ok(block.header.base_fee_per_gas)
366-
}
367-
}
368-
369-
/// Continuously updates the simulation cache with incoming transactions and bundles.
370-
///
371-
/// This function listens for new transactions and bundles on their respective
372-
/// channels and adds them to the simulation cache using the latest observed basefee.
373-
///
374-
/// # Arguments
375-
///
376-
/// - `tx_receiver`: A receiver channel for incoming Ethereum transactions.
377-
/// - `bundle_receiver`: A receiver channel for incoming transaction bundles.
378-
/// - `cache`: The simulation cache used to store transactions and bundles.
379-
/// - `price_reader`: An `Arc<AtomicU64>` providing the latest basefee for simulation pricing.
380-
async fn cache_updater(
381-
mut tx_receiver: mpsc::UnboundedReceiver<
382-
alloy::consensus::EthereumTxEnvelope<alloy::consensus::TxEip4844Variant>,
383-
>,
384-
mut bundle_receiver: mpsc::UnboundedReceiver<Bundle>,
385-
cache: SimCache,
386-
price_reader: Arc<AtomicU64>,
387-
) -> ! {
388-
loop {
389-
let p = price_reader.load(Ordering::Relaxed);
390-
select! {
391-
maybe_tx = tx_receiver.recv() => {
392-
if let Some(tx) = maybe_tx {
393-
debug!(tx = ?tx.hash(), "received transaction");
394-
cache.add_item(tx, p);
395-
}
396-
}
397-
maybe_bundle = bundle_receiver.recv() => {
398-
if let Some(bundle) = maybe_bundle {
399-
debug!(bundle = ?bundle.id, "received bundle");
400-
cache.add_item(bundle.bundle, p);
401-
}
402-
}
403-
}
404-
}
405222
}

‎src/tasks/cache/bundle.rs

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,30 +6,13 @@ use init4_bin_base::{
66
};
77
use oauth2::TokenResponse;
88
use reqwest::{Client, Url};
9-
use serde::{Deserialize, Serialize};
10-
use signet_bundle::SignetEthBundle;
9+
use signet_tx_cache::types::{TxCacheBundle, TxCacheBundlesResponse};
1110
use tokio::{
1211
sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
1312
task::JoinHandle,
1413
time::{self, Duration},
1514
};
1615

17-
/// Holds a bundle from the cache with a unique ID and a Zenith bundle.
18-
#[derive(Debug, Clone, Serialize, Deserialize)]
19-
pub struct Bundle {
20-
/// Cache identifier for the bundle.
21-
pub id: String,
22-
/// The corresponding Signet bundle.
23-
pub bundle: SignetEthBundle,
24-
}
25-
26-
/// Response from the tx-pool containing a list of bundles.
27-
#[derive(Debug, Clone, Serialize, Deserialize)]
28-
struct TxPoolBundleResponse {
29-
/// Bundle responses are available on the bundles property.
30-
pub bundles: Vec<Bundle>,
31-
}
32-
3316
/// The BundlePoller polls the tx-pool for bundles.
3417
#[derive(Debug)]
3518
pub struct BundlePoller {
@@ -60,7 +43,7 @@ impl BundlePoller {
6043
}
6144

6245
/// Fetches bundles from the transaction cache and returns them.
63-
pub async fn check_bundle_cache(&mut self) -> eyre::Result<Vec<Bundle>> {
46+
pub async fn check_bundle_cache(&mut self) -> eyre::Result<Vec<TxCacheBundle>> {
6447
let bundle_url: Url = Url::parse(&self.config.tx_pool_url)?.join("bundles")?;
6548
let Some(token) = self.token.read() else {
6649
warn!("No token available, skipping bundle fetch");
@@ -75,7 +58,7 @@ impl BundlePoller {
7558
.error_for_status()?
7659
.json()
7760
.await
78-
.map(|resp: TxPoolBundleResponse| resp.bundles)
61+
.map(|resp: TxCacheBundlesResponse| resp.bundles)
7962
.map_err(Into::into)
8063
}
8164

@@ -84,7 +67,7 @@ impl BundlePoller {
8467
Duration::from_millis(self.poll_interval_ms)
8568
}
8669

87-
async fn task_future(mut self, outbound: UnboundedSender<Bundle>) {
70+
async fn task_future(mut self, outbound: UnboundedSender<TxCacheBundle>) {
8871
loop {
8972
let span = debug_span!("BundlePoller::loop", url = %self.config.tx_pool_url);
9073

@@ -119,7 +102,7 @@ impl BundlePoller {
119102
}
120103

121104
/// Spawns a task that sends bundles it finds to its channel sender.
122-
pub fn spawn(self) -> (UnboundedReceiver<Bundle>, JoinHandle<()>) {
105+
pub fn spawn(self) -> (UnboundedReceiver<TxCacheBundle>, JoinHandle<()>) {
123106
let (outbound, inbound) = unbounded_channel();
124107

125108
let jh = tokio::spawn(self.task_future(outbound));

‎src/tasks/cache/mod.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,23 @@ mod tx;
55
pub use tx::TxPoller;
66

77
mod bundle;
8-
pub use bundle::{Bundle, BundlePoller};
8+
pub use bundle::BundlePoller;
9+
10+
use signet_sim::SimCache;
11+
use tokio::task::JoinHandle;
12+
13+
/// Cache tasks for the block builder.
14+
#[derive(Debug)]
15+
pub struct CacheSystem {
16+
/// The cache task.
17+
pub cache_task: JoinHandle<()>,
18+
19+
/// The transaction poller task.
20+
pub tx_poller: JoinHandle<()>,
21+
22+
/// The bundle poller task.
23+
pub bundle_poller: JoinHandle<()>,
24+
25+
/// The sim cache.
26+
pub sim_cache: SimCache,
27+
}

‎src/tasks/cache/task.rs

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,6 @@ use trevm::revm::context::BlockEnv;
1515
/// the environment changes.
1616
#[derive(Debug)]
1717
pub struct CacheTask {
18-
/// The shared sim cache to populate.
19-
cache: SimCache,
20-
2118
/// The channel to receive the block environment.
2219
env: watch::Receiver<Option<BlockEnv>>,
2320

@@ -28,7 +25,16 @@ pub struct CacheTask {
2825
}
2926

3027
impl CacheTask {
31-
async fn task_future(mut self) {
28+
/// Create a new cache task with the given cache and channels.
29+
pub const fn new(
30+
env: watch::Receiver<Option<BlockEnv>>,
31+
bundles: mpsc::UnboundedReceiver<TxCacheBundle>,
32+
txns: mpsc::UnboundedReceiver<TxEnvelope>,
33+
) -> Self {
34+
Self { env, bundles, txns }
35+
}
36+
37+
async fn task_future(mut self, cache: SimCache) {
3238
loop {
3339
let mut basefee = 0;
3440
tokio::select! {
@@ -41,24 +47,26 @@ impl CacheTask {
4147
if let Some(env) = self.env.borrow_and_update().as_ref() {
4248
basefee = env.basefee;
4349
info!(basefee, number = env.number, timestamp = env.timestamp, "block env changed, clearing cache");
44-
self.cache.clean(
50+
cache.clean(
4551
env.number, env.timestamp
4652
);
4753
}
4854
}
4955
Some(bundle) = self.bundles.recv() => {
50-
self.cache.add_item(bundle.bundle, basefee);
56+
cache.add_item(bundle.bundle, basefee);
5157
}
5258
Some(txn) = self.txns.recv() => {
53-
self.cache.add_item(txn, basefee);
59+
cache.add_item(txn, basefee);
5460
}
5561
}
5662
}
5763
}
5864

5965
/// Spawn the cache task.
60-
pub fn spawn(self) -> JoinHandle<()> {
61-
let fut = self.task_future();
62-
tokio::spawn(fut)
66+
pub fn spawn(self) -> (SimCache, JoinHandle<()>) {
67+
let sim_cache = SimCache::default();
68+
let c = sim_cache.clone();
69+
let fut = self.task_future(sim_cache);
70+
(c, tokio::spawn(fut))
6371
}
6472
}

‎src/tasks/env.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use alloy::{
77
};
88
use init4_bin_base::deps::tracing::{self, Instrument, debug, error, info_span};
99
use std::time::Duration;
10-
use tokio::sync::watch;
10+
use tokio::{sync::watch, task::JoinHandle};
1111
use tokio_stream::StreamExt;
1212
use trevm::revm::{context::BlockEnv, context_interface::block::BlobExcessGasAndPrice};
1313

@@ -25,7 +25,7 @@ impl EnvTask {
2525
}
2626

2727
/// Construct a BlockEnv by making calls to the provider.
28-
pub fn construct_block_env(&self, previous: &Header) -> BlockEnv {
28+
fn construct_block_env(&self, previous: &Header) -> BlockEnv {
2929
BlockEnv {
3030
number: previous.number + 1,
3131
beneficiary: self.config.builder_rewards_address,
@@ -45,7 +45,7 @@ impl EnvTask {
4545
}
4646

4747
/// Construct the BlockEnv and send it to the sender.
48-
pub async fn task_fut(self, sender: watch::Sender<Option<BlockEnv>>) {
48+
async fn task_fut(self, sender: watch::Sender<Option<BlockEnv>>) {
4949
let span = info_span!("EnvTask::task_fut::init");
5050
let mut poller = match self.provider.watch_blocks().instrument(span.clone()).await {
5151
Ok(poller) => poller,
@@ -96,22 +96,24 @@ impl EnvTask {
9696
}
9797
};
9898
span.record("number", previous.number);
99+
debug!("retrieved latest block");
99100

100101
let env = self.construct_block_env(&previous);
101102
debug!(?env, "constructed block env");
102103
if sender.send(Some(env)).is_err() {
103104
// The receiver has been dropped, so we can stop the task.
105+
debug!("receiver dropped, stopping task");
104106
break;
105107
}
106108
}
107109
}
108110

109111
/// Spawn the task and return a watch::Receiver for the BlockEnv.
110-
pub fn spawn(self) -> watch::Receiver<Option<BlockEnv>> {
112+
pub fn spawn(self) -> (watch::Receiver<Option<BlockEnv>>, JoinHandle<()>) {
111113
let (sender, receiver) = watch::channel(None);
112114
let fut = self.task_fut(sender);
113-
tokio::spawn(fut);
115+
let jh = tokio::spawn(fut);
114116

115-
receiver
117+
(receiver, jh)
116118
}
117119
}

‎src/tasks/metrics.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ pub struct MetricsTask {
1818
}
1919

2020
impl MetricsTask {
21+
/// Create a new MetricsTask with the given provider
22+
pub const fn new(host_provider: HostProvider) -> Self {
23+
Self { host_provider }
24+
}
25+
2126
/// Given a transaction hash, record metrics on the result of the
2227
/// transaction mining
2328
pub fn log_tx(&self, tx_hash: TxHash) -> impl Future<Output = ()> + use<> {

‎src/test_utils.rs

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
//! Test utilities for testing builder tasks
2-
use crate::{config::BuilderConfig, tasks::block::cfg::PecorinoBlockEnv};
2+
use crate::config::BuilderConfig;
33
use alloy::{
44
consensus::{SignableTransaction, TxEip1559, TxEnvelope},
5-
primitives::{Address, FixedBytes, TxKind, U256},
5+
primitives::{Address, B256, TxKind, U256},
66
signers::{SignerSync, local::PrivateKeySigner},
77
};
8-
use chrono::{DateTime, Utc};
98
use eyre::Result;
109
use init4_bin_base::{
1110
deps::tracing_subscriber::{
@@ -14,10 +13,8 @@ use init4_bin_base::{
1413
perms::OAuthConfig,
1514
utils::calc::SlotCalculator,
1615
};
17-
use std::{
18-
str::FromStr,
19-
time::{Instant, SystemTime},
20-
};
16+
use std::str::FromStr;
17+
use trevm::revm::{context::BlockEnv, context_interface::block::BlobExcessGasAndPrice};
2118

2219
/// Sets up a block builder with test values
2320
pub fn setup_test_config() -> Result<BuilderConfig> {
@@ -90,18 +87,19 @@ pub fn test_block_env(
9087
config: BuilderConfig,
9188
number: u64,
9289
basefee: u64,
93-
finish_by: Instant,
94-
) -> PecorinoBlockEnv {
95-
let remaining = finish_by.duration_since(Instant::now());
96-
let finish_time = SystemTime::now() + remaining;
97-
let deadline: DateTime<Utc> = finish_time.into();
98-
99-
PecorinoBlockEnv {
90+
timestamp: u64,
91+
) -> BlockEnv {
92+
BlockEnv {
10093
number,
101-
beneficiary: Address::repeat_byte(0),
102-
timestamp: deadline.timestamp() as u64,
94+
beneficiary: Address::repeat_byte(1),
95+
timestamp,
10396
gas_limit: config.rollup_block_gas_limit,
10497
basefee,
105-
prevrandao: Some(FixedBytes::random()),
98+
difficulty: U256::ZERO,
99+
prevrandao: Some(B256::random()),
100+
blob_excess_gas_and_price: Some(BlobExcessGasAndPrice {
101+
excess_blob_gas: 0,
102+
blob_gasprice: 0,
103+
}),
106104
}
107105
}

‎tests/block_builder_test.rs

Lines changed: 128 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -1,134 +1,130 @@
11
//! Tests for the block building task.
2-
#[cfg(test)]
3-
mod tests {
4-
use alloy::{
5-
network::Ethereum,
6-
node_bindings::Anvil,
7-
primitives::U256,
8-
providers::{Provider, RootProvider},
9-
signers::local::PrivateKeySigner,
10-
};
11-
use builder::{
12-
tasks::block::sim::Simulator,
13-
test_utils::{new_signed_tx, setup_logging, setup_test_config, test_block_env},
14-
};
15-
use init4_bin_base::utils::calc::SlotCalculator;
16-
use signet_sim::{SimCache, SimItem};
17-
use signet_types::constants::SignetSystemConstants;
18-
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
19-
use tokio::{sync::mpsc::unbounded_channel, time::timeout};
20-
21-
/// Tests the `handle_build` method of the `Simulator`.
22-
///
23-
/// This test sets up a simulated environment using Anvil, creates a block builder,
24-
/// and verifies that the block builder can successfully build a block containing
25-
/// transactions from multiple senders.
26-
#[cfg(feature = "integration")]
27-
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
28-
async fn test_handle_build() {
29-
setup_logging();
30-
31-
// Make a test config
32-
let config = setup_test_config().unwrap();
33-
let constants = SignetSystemConstants::pecorino();
34-
35-
// Create an anvil instance for testing
36-
let anvil_instance = Anvil::new().chain_id(signet_constants::pecorino::RU_CHAIN_ID).spawn();
37-
38-
// Create a wallet
39-
let keys = anvil_instance.keys();
40-
let test_key_0 = PrivateKeySigner::from_signing_key(keys[0].clone().into());
41-
let test_key_1 = PrivateKeySigner::from_signing_key(keys[1].clone().into());
42-
43-
// Create a rollup provider
44-
let ru_provider = RootProvider::<Ethereum>::new_http(anvil_instance.endpoint_url());
45-
46-
// Create a block builder with a slot calculator for testing
47-
let now = SystemTime::now()
48-
.duration_since(UNIX_EPOCH)
49-
.expect("Clock may have gone backwards")
50-
.as_secs();
51-
52-
let slot_calculator = SlotCalculator::new(now, 0, 12);
53-
let block_builder = Simulator::new(&config, ru_provider.clone(), slot_calculator);
54-
55-
// Setup a sim cache
56-
let sim_items = SimCache::new();
57-
58-
// Add two transactions from two senders to the sim cache
59-
let tx_1 = new_signed_tx(&test_key_0, 0, U256::from(1_f64), 11_000).unwrap();
60-
sim_items.add_item(SimItem::Tx(tx_1), 0);
61-
62-
let tx_2 = new_signed_tx(&test_key_1, 0, U256::from(2_f64), 10_000).unwrap();
63-
sim_items.add_item(SimItem::Tx(tx_2), 0);
64-
65-
// Setup the block env
66-
let finish_by = Instant::now() + Duration::from_secs(2);
67-
let block_number = ru_provider.get_block_number().await.unwrap();
68-
let block_env = test_block_env(config, block_number, 7, finish_by);
69-
70-
// Spawn the block builder task
71-
let got = block_builder.handle_build(constants, sim_items, finish_by, block_env).await;
72-
73-
// Assert on the built block
74-
assert!(got.is_ok());
75-
assert!(got.unwrap().tx_count() == 2);
76-
}
77-
78-
/// Tests the full block builder loop, including transaction ingestion and block simulation.
79-
///
80-
/// This test sets up a simulated environment using Anvil, creates a block builder,
81-
/// and verifies that the builder can process incoming transactions and produce a block
82-
/// within a specified timeout.
83-
#[ignore = "integration test"]
84-
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
85-
async fn test_spawn() {
86-
setup_logging();
87-
88-
// Make a test config
89-
let config = setup_test_config().unwrap();
90-
let constants = SignetSystemConstants::pecorino();
91-
92-
// Create an anvil instance for testing
93-
let anvil_instance = Anvil::new().chain_id(signet_constants::pecorino::RU_CHAIN_ID).spawn();
94-
95-
// Create a wallet
96-
let keys = anvil_instance.keys();
97-
let test_key_0 = PrivateKeySigner::from_signing_key(keys[0].clone().into());
98-
let test_key_1 = PrivateKeySigner::from_signing_key(keys[1].clone().into());
99-
100-
// Plumb inputs for the test setup
101-
let (tx_sender, tx_receiver) = unbounded_channel();
102-
let (_, bundle_receiver) = unbounded_channel();
103-
let (block_sender, mut block_receiver) = unbounded_channel();
104-
105-
// Create a rollup provider
106-
let ru_provider = RootProvider::<Ethereum>::new_http(anvil_instance.endpoint_url());
107-
108-
let sim = Simulator::new(&config, ru_provider.clone(), config.slot_calculator);
109-
110-
// Create a shared sim cache
111-
let sim_cache = SimCache::new();
112-
113-
// Create a sim cache and start filling it with items
114-
sim.spawn_cache_tasks(tx_receiver, bundle_receiver, sim_cache.clone());
115-
116-
// Finally, Kick off the block builder task.
117-
sim.spawn_simulator_task(constants, sim_cache.clone(), block_sender);
118-
119-
// Feed in transactions to the tx_sender and wait for the block to be simulated
120-
let tx_1 = new_signed_tx(&test_key_0, 0, U256::from(1_f64), 11_000).unwrap();
121-
let tx_2 = new_signed_tx(&test_key_1, 0, U256::from(2_f64), 10_000).unwrap();
122-
tx_sender.send(tx_1).unwrap();
123-
tx_sender.send(tx_2).unwrap();
124-
125-
// Wait for a block with timeout
126-
let result = timeout(Duration::from_secs(5), block_receiver.recv()).await;
127-
assert!(result.is_ok(), "Did not receive block within 5 seconds");
128-
129-
// Assert on the block
130-
let block = result.unwrap();
131-
assert!(block.is_some(), "Block channel closed without receiving a block");
132-
assert!(block.unwrap().tx_count() == 2); // TODO: Why is this failing? I'm seeing EVM errors but haven't tracked them down yet.
133-
}
2+
3+
use alloy::{
4+
network::Ethereum,
5+
node_bindings::Anvil,
6+
primitives::U256,
7+
providers::{Provider, RootProvider},
8+
signers::local::PrivateKeySigner,
9+
};
10+
use builder::{
11+
tasks::{block::sim::Simulator, cache::CacheTask},
12+
test_utils::{new_signed_tx, setup_logging, setup_test_config, test_block_env},
13+
};
14+
use signet_sim::{SimCache, SimItem};
15+
use signet_types::constants::SignetSystemConstants;
16+
use std::time::{Duration, Instant};
17+
use tokio::{sync::mpsc::unbounded_channel, time::timeout};
18+
19+
/// Tests the `handle_build` method of the `Simulator`.
20+
///
21+
/// This test sets up a simulated environment using Anvil, creates a block builder,
22+
/// and verifies that the block builder can successfully build a block containing
23+
/// transactions from multiple senders.
24+
#[cfg(feature = "integration")]
25+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
26+
async fn test_handle_build() {
27+
use alloy::eips::BlockId;
28+
29+
setup_logging();
30+
31+
// Make a test config
32+
let config = setup_test_config().unwrap();
33+
let constants = SignetSystemConstants::pecorino();
34+
35+
// Create an anvil instance for testing
36+
let anvil_instance = Anvil::new().chain_id(signet_constants::pecorino::RU_CHAIN_ID).spawn();
37+
38+
// Create a wallet
39+
let keys = anvil_instance.keys();
40+
let test_key_0 = PrivateKeySigner::from_signing_key(keys[0].clone().into());
41+
let test_key_1 = PrivateKeySigner::from_signing_key(keys[1].clone().into());
42+
43+
// Create a rollup provider
44+
let ru_provider = RootProvider::<Ethereum>::new_http(anvil_instance.endpoint_url());
45+
46+
let block_env = config.env_task().spawn().0;
47+
48+
let block_builder = Simulator::new(&config, ru_provider.clone(), block_env);
49+
50+
// Setup a sim cache
51+
let sim_items = SimCache::new();
52+
53+
// Add two transactions from two senders to the sim cache
54+
let tx_1 = new_signed_tx(&test_key_0, 0, U256::from(1_f64), 11_000).unwrap();
55+
sim_items.add_item(SimItem::Tx(tx_1), 0);
56+
57+
let tx_2 = new_signed_tx(&test_key_1, 0, U256::from(2_f64), 10_000).unwrap();
58+
sim_items.add_item(SimItem::Tx(tx_2), 0);
59+
60+
// Setup the block env
61+
let finish_by = Instant::now() + Duration::from_secs(2);
62+
let header = ru_provider.get_block(BlockId::latest()).await.unwrap().unwrap().header.inner;
63+
let number = header.number + 1;
64+
let timestamp = header.timestamp + config.slot_calculator.slot_duration();
65+
let block_env = test_block_env(config, number, 7, timestamp);
66+
67+
// Spawn the block builder task
68+
let got = block_builder.handle_build(constants, sim_items, finish_by, block_env).await;
69+
70+
// Assert on the built block
71+
assert!(got.is_ok());
72+
assert!(got.unwrap().tx_count() == 2);
73+
}
74+
75+
/// Tests the full block builder loop, including transaction ingestion and block simulation.
76+
///
77+
/// This test sets up a simulated environment using Anvil, creates a block builder,
78+
/// and verifies that the builder can process incoming transactions and produce a block
79+
/// within a specified timeout.
80+
#[ignore = "integration test"]
81+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
82+
async fn test_spawn() {
83+
setup_logging();
84+
85+
// Make a test config
86+
let config = setup_test_config().unwrap();
87+
let constants = SignetSystemConstants::pecorino();
88+
89+
// Create an anvil instance for testing
90+
let anvil_instance = Anvil::new().chain_id(signet_constants::pecorino::RU_CHAIN_ID).spawn();
91+
92+
// Create a wallet
93+
let keys = anvil_instance.keys();
94+
let test_key_0 = PrivateKeySigner::from_signing_key(keys[0].clone().into());
95+
let test_key_1 = PrivateKeySigner::from_signing_key(keys[1].clone().into());
96+
97+
// Plumb inputs for the test setup
98+
let (tx_sender, tx_receiver) = unbounded_channel();
99+
let (_, bundle_receiver) = unbounded_channel();
100+
let (block_sender, mut block_receiver) = unbounded_channel();
101+
102+
let env_task = config.env_task();
103+
let (block_env, _env_jh) = env_task.spawn();
104+
105+
let cache_task = CacheTask::new(block_env.clone(), bundle_receiver, tx_receiver);
106+
let (sim_cache, _cache_jh) = cache_task.spawn();
107+
108+
// Create a rollup provider
109+
let ru_provider = RootProvider::<Ethereum>::new_http(anvil_instance.endpoint_url());
110+
111+
let sim = Simulator::new(&config, ru_provider.clone(), block_env);
112+
113+
// Finally, Kick off the block builder task.
114+
sim.spawn_simulator_task(constants, sim_cache.clone(), block_sender);
115+
116+
// Feed in transactions to the tx_sender and wait for the block to be simulated
117+
let tx_1 = new_signed_tx(&test_key_0, 0, U256::from(1_f64), 11_000).unwrap();
118+
let tx_2 = new_signed_tx(&test_key_1, 0, U256::from(2_f64), 10_000).unwrap();
119+
tx_sender.send(tx_1).unwrap();
120+
tx_sender.send(tx_2).unwrap();
121+
122+
// Wait for a block with timeout
123+
let result = timeout(Duration::from_secs(5), block_receiver.recv()).await;
124+
assert!(result.is_ok(), "Did not receive block within 5 seconds");
125+
126+
// Assert on the block
127+
let block = result.unwrap();
128+
assert!(block.is_some(), "Block channel closed without receiving a block");
129+
assert!(block.unwrap().tx_count() == 2); // TODO: Why is this failing? I'm seeing EVM errors but haven't tracked them down yet.
134130
}

‎tests/bundle_poller_test.rs

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,15 @@
1-
mod tests {
2-
use builder::test_utils;
3-
use eyre::Result;
1+
use builder::test_utils;
2+
use eyre::Result;
43

5-
#[ignore = "integration test"]
6-
#[tokio::test]
7-
async fn test_bundle_poller_roundtrip() -> Result<()> {
8-
let config = test_utils::setup_test_config().unwrap();
9-
let token = config.oauth_token();
4+
#[ignore = "integration test"]
5+
#[tokio::test]
6+
async fn test_bundle_poller_roundtrip() -> Result<()> {
7+
let config = test_utils::setup_test_config().unwrap();
8+
let token = config.oauth_token();
109

11-
let mut bundle_poller = builder::tasks::cache::BundlePoller::new(&config, token);
10+
let mut bundle_poller = builder::tasks::cache::BundlePoller::new(&config, token);
1211

13-
let _ = bundle_poller.check_bundle_cache().await?;
12+
let _ = bundle_poller.check_bundle_cache().await?;
1413

15-
Ok(())
16-
}
14+
Ok(())
1715
}

‎tests/cache.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
use std::time::Duration;
2+
3+
use builder::test_utils::{setup_logging, setup_test_config};
4+
use init4_bin_base::deps::tracing::warn;
5+
6+
#[ignore = "integration test. This test will take >12 seconds to run, and requires Authz configuration env vars."]
7+
#[tokio::test]
8+
async fn test_bundle_poller_roundtrip() -> eyre::Result<()> {
9+
setup_logging();
10+
11+
let config = setup_test_config().unwrap();
12+
13+
let (block_env, _jh) = config.env_task().spawn();
14+
let cache = config.spawn_cache_system(block_env);
15+
16+
tokio::time::sleep(Duration::from_secs(12)).await;
17+
18+
warn!(txns = ?cache.sim_cache.read_best(5));
19+
20+
Ok(())
21+
}

‎tests/env.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
use builder::test_utils::{setup_logging, setup_test_config};
2+
3+
#[ignore = "integration test. This test will take between 0 and 12 seconds to run."]
4+
#[tokio::test]
5+
async fn test_bundle_poller_roundtrip() {
6+
setup_logging();
7+
8+
let config = setup_test_config().unwrap();
9+
let env_task = config.env_task();
10+
let (mut env_watcher, _jh) = env_task.spawn();
11+
12+
env_watcher.changed().await.unwrap();
13+
let env = env_watcher.borrow_and_update();
14+
assert!(env.as_ref().is_some(), "Env should be Some");
15+
}

‎tests/tx_poller_test.rs

Lines changed: 35 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,48 @@
1-
mod tests {
2-
use alloy::{primitives::U256, signers::local::PrivateKeySigner};
3-
use builder::{
4-
config::BuilderConfig,
5-
tasks::cache::TxPoller,
6-
test_utils::{new_signed_tx, setup_logging, setup_test_config},
7-
};
8-
// Import the refactored function
9-
use eyre::{Ok, Result};
1+
use alloy::{primitives::U256, signers::local::PrivateKeySigner};
2+
use builder::{
3+
config::BuilderConfig,
4+
tasks::cache::TxPoller,
5+
test_utils::{new_signed_tx, setup_logging, setup_test_config},
6+
};
7+
// Import the refactored function
8+
use eyre::{Ok, Result};
109

11-
#[ignore = "integration test"]
12-
#[tokio::test]
13-
async fn test_tx_roundtrip() -> Result<()> {
14-
setup_logging();
10+
#[ignore = "integration test"]
11+
#[tokio::test]
12+
async fn test_tx_roundtrip() -> Result<()> {
13+
setup_logging();
1514

16-
// Create a new test environment
17-
let config = setup_test_config()?;
15+
// Create a new test environment
16+
let config = setup_test_config()?;
1817

19-
// Post a transaction to the cache
20-
post_tx(&config).await?;
18+
// Post a transaction to the cache
19+
post_tx(&config).await?;
2120

22-
// Create a new poller
23-
let mut poller = TxPoller::new(&config);
21+
// Create a new poller
22+
let mut poller = TxPoller::new(&config);
2423

25-
// Fetch transactions the pool
26-
let transactions = poller.check_tx_cache().await?;
24+
// Fetch transactions the pool
25+
let transactions = poller.check_tx_cache().await?;
2726

28-
// Ensure at least one transaction exists
29-
assert!(!transactions.is_empty());
27+
// Ensure at least one transaction exists
28+
assert!(!transactions.is_empty());
3029

31-
Ok(())
32-
}
33-
34-
async fn post_tx(config: &BuilderConfig) -> Result<()> {
35-
let client = reqwest::Client::new();
30+
Ok(())
31+
}
3632

37-
let wallet = PrivateKeySigner::random();
38-
let tx_envelope = new_signed_tx(&wallet, 1, U256::from(1), 10_000)?;
33+
async fn post_tx(config: &BuilderConfig) -> Result<()> {
34+
let client = reqwest::Client::new();
3935

40-
let url = format!("{}/transactions", config.tx_pool_url);
41-
let response = client.post(&url).json(&tx_envelope).send().await?;
36+
let wallet = PrivateKeySigner::random();
37+
let tx_envelope = new_signed_tx(&wallet, 1, U256::from(1), 10_000)?;
4238

43-
if !response.status().is_success() {
44-
let error_text = response.text().await?;
45-
eyre::bail!("Failed to post transaction: {}", error_text);
46-
}
39+
let url = format!("{}/transactions", config.tx_pool_url);
40+
let response = client.post(&url).json(&tx_envelope).send().await?;
4741

48-
Ok(())
42+
if !response.status().is_success() {
43+
let error_text = response.text().await?;
44+
eyre::bail!("Failed to post transaction: {}", error_text);
4945
}
46+
47+
Ok(())
5048
}

0 commit comments

Comments
 (0)
Please sign in to comment.