diff --git a/Cargo.toml b/Cargo.toml index c7ccc148..6c6e978a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,11 +27,12 @@ integration = [] [dependencies] init4-bin-base = "0.3" -signet-zenith = { git = "https://github.com/init4tech/signet-sdk", branch = "main" } -signet-constants = { git = "https://github.com/init4tech/signet-sdk", branch = "main" } -signet-types = { git = "https://github.com/init4tech/signet-sdk", branch = "main" } signet-bundle = { git = "https://github.com/init4tech/signet-sdk", branch = "main" } +signet-constants = { git = "https://github.com/init4tech/signet-sdk", branch = "main" } signet-sim = { git = "https://github.com/init4tech/signet-sdk", branch = "main" } +signet-tx-cache = { git = "https://github.com/init4tech/signet-sdk", branch = "main" } +signet-types = { git = "https://github.com/init4tech/signet-sdk", branch = "main" } +signet-zenith = { git = "https://github.com/init4tech/signet-sdk", branch = "main" } trevm = { version = "0.20.10", features = ["concurrent-db", "test-utils"] } diff --git a/bin/builder.rs b/bin/builder.rs index 2eef9708..884a7315 100644 --- a/bin/builder.rs +++ b/bin/builder.rs @@ -1,7 +1,12 @@ use builder::{ config::BuilderConfig, service::serve_builder, - tasks::{block::sim::Simulator, bundler, metrics::MetricsTask, submit::SubmitTask, tx_poller}, + tasks::{ + block::sim::Simulator, + cache::{BundlePoller, TxPoller}, + metrics::MetricsTask, + submit::SubmitTask, + }, }; use init4_bin_base::{deps::tracing, utils::from_env::FromEnv}; use signet_sim::SimCache; @@ -32,10 +37,10 @@ async fn main() -> eyre::Result<()> { let submit = SubmitTask { zenith, quincey, config: config.clone(), outbound_tx_channel: tx_channel }; - let tx_poller = tx_poller::TxPoller::new(&config); + let tx_poller = TxPoller::new(&config); let (tx_receiver, tx_poller_jh) = tx_poller.spawn(); - let bundle_poller = bundler::BundlePoller::new(&config, token); + let bundle_poller = BundlePoller::new(&config, token); let (bundle_receiver, bundle_poller_jh) = bundle_poller.spawn(); let (submit_channel, submit_jh) = submit.spawn(); diff --git a/src/signer.rs b/src/signer.rs index 6883cb4b..ae3ccf69 100644 --- a/src/signer.rs +++ b/src/signer.rs @@ -15,6 +15,7 @@ pub enum LocalOrAws { } /// Error during signing +#[allow(clippy::large_enum_variant)] // type about to be deleted #[derive(Debug, thiserror::Error)] pub enum SignerError { /// Error during [`AwsSigner`] instantiation @@ -44,6 +45,7 @@ impl LocalOrAws { /// # Panics /// /// Panics if the env var contents is not a valid secp256k1 private key. + #[allow(clippy::result_large_err)] // type about to be deleted fn wallet(private_key: &str) -> Result { let bytes = hex::decode(private_key.strip_prefix("0x").unwrap_or(private_key))?; Ok(PrivateKeySigner::from_slice(&bytes).unwrap()) diff --git a/src/tasks/block/sim.rs b/src/tasks/block/sim.rs index fec12609..909db6bd 100644 --- a/src/tasks/block/sim.rs +++ b/src/tasks/block/sim.rs @@ -4,7 +4,7 @@ use super::cfg::PecorinoBlockEnv; use crate::{ config::{BuilderConfig, RuProvider}, - tasks::{block::cfg::PecorinoCfg, bundler::Bundle}, + tasks::{block::cfg::PecorinoCfg, cache::Bundle}, }; use alloy::{ consensus::TxEnvelope, diff --git a/src/tasks/bundler.rs b/src/tasks/cache/bundle.rs similarity index 99% rename from src/tasks/bundler.rs rename to src/tasks/cache/bundle.rs index 4aeffa1b..4d4d8bbe 100644 --- a/src/tasks/bundler.rs +++ b/src/tasks/cache/bundle.rs @@ -22,7 +22,7 @@ pub struct Bundle { /// Response from the tx-pool containing a list of bundles. #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct TxPoolBundleResponse { +struct TxPoolBundleResponse { /// Bundle responses are available on the bundles property. pub bundles: Vec, } diff --git a/src/tasks/cache/mod.rs b/src/tasks/cache/mod.rs new file mode 100644 index 00000000..e1f0165e --- /dev/null +++ b/src/tasks/cache/mod.rs @@ -0,0 +1,8 @@ +mod task; +pub use task::CacheTask; + +mod tx; +pub use tx::TxPoller; + +mod bundle; +pub use bundle::{Bundle, BundlePoller}; diff --git a/src/tasks/cache/task.rs b/src/tasks/cache/task.rs new file mode 100644 index 00000000..2e88a7b5 --- /dev/null +++ b/src/tasks/cache/task.rs @@ -0,0 +1,64 @@ +use alloy::consensus::TxEnvelope; +use init4_bin_base::deps::tracing::{debug, info}; +use signet_sim::SimCache; +use signet_tx_cache::types::TxCacheBundle; +use tokio::{ + sync::{mpsc, watch}, + task::JoinHandle, +}; +use trevm::revm::context::BlockEnv; + +/// Cache task for the block builder. +/// +/// This tasks handles the ingestion of transactions and bundles into the cache. +/// It keeps a receiver for the block environment and cleans the cache when +/// the environment changes. +#[derive(Debug)] +pub struct CacheTask { + /// The shared sim cache to populate. + cache: SimCache, + + /// The channel to receive the block environment. + env: watch::Receiver>, + + /// The channel to receive the transaction bundles. + bundles: mpsc::UnboundedReceiver, + /// The channel to receive the transactions. + txns: mpsc::UnboundedReceiver, +} + +impl CacheTask { + async fn task_future(mut self) { + loop { + let mut basefee = 0; + tokio::select! { + biased; + res = self.env.changed() => { + if res.is_err() { + debug!("Cache task: env channel closed, exiting"); + break; + } + if let Some(env) = self.env.borrow_and_update().as_ref() { + basefee = env.basefee; + info!(basefee, number = env.number, timestamp = env.timestamp, "block env changed, clearing cache"); + self.cache.clean( + env.number, env.timestamp + ); + } + } + Some(bundle) = self.bundles.recv() => { + self.cache.add_item(bundle.bundle, basefee); + } + Some(txn) = self.txns.recv() => { + self.cache.add_item(txn, basefee); + } + } + } + } + + /// Spawn the cache task. + pub fn spawn(self) -> JoinHandle<()> { + let fut = self.task_future(); + tokio::spawn(fut) + } +} diff --git a/src/tasks/tx_poller.rs b/src/tasks/cache/tx.rs similarity index 99% rename from src/tasks/tx_poller.rs rename to src/tasks/cache/tx.rs index e9df5b0a..ef75327d 100644 --- a/src/tasks/tx_poller.rs +++ b/src/tasks/cache/tx.rs @@ -10,7 +10,7 @@ use tokio::{sync::mpsc, task::JoinHandle, time}; /// Models a response from the transaction pool. #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct TxPoolResponse { +struct TxPoolResponse { /// Holds the transactions property as a list on the response. transactions: Vec, } diff --git a/src/tasks/mod.rs b/src/tasks/mod.rs index bc261493..dd94ab00 100644 --- a/src/tasks/mod.rs +++ b/src/tasks/mod.rs @@ -1,5 +1,8 @@ -/// Bundle poller task -pub mod bundler; +/// Block creation task +pub mod block; + +/// Cache ingestion task +pub mod cache; /// Tx submission metric task pub mod metrics; @@ -10,11 +13,5 @@ pub mod oauth; /// Tx submission task pub mod submit; -/// Tx polling task -pub mod tx_poller; - -/// Block simulation and environment -pub mod block; - /// Constructs the simualtion environment. pub mod env; diff --git a/tests/bundle_poller_test.rs b/tests/bundle_poller_test.rs index 38765417..e1e5dde2 100644 --- a/tests/bundle_poller_test.rs +++ b/tests/bundle_poller_test.rs @@ -8,7 +8,7 @@ mod tests { let config = test_utils::setup_test_config().unwrap(); let auth = Authenticator::new(&config)?; - let mut bundle_poller = builder::tasks::bundler::BundlePoller::new(&config, auth.token()); + let mut bundle_poller = builder::tasks::cache::BundlePoller::new(&config, auth.token()); let _ = bundle_poller.check_bundle_cache().await?; diff --git a/tests/tx_poller_test.rs b/tests/tx_poller_test.rs index 3d8e5c62..c0f5ffe3 100644 --- a/tests/tx_poller_test.rs +++ b/tests/tx_poller_test.rs @@ -1,9 +1,11 @@ mod tests { - use alloy::primitives::U256; - use alloy::signers::local::PrivateKeySigner; - use builder::config::BuilderConfig; - use builder::tasks::tx_poller; - use builder::test_utils::{new_signed_tx, setup_logging, setup_test_config}; // Import the refactored function + use alloy::{primitives::U256, signers::local::PrivateKeySigner}; + use builder::{ + config::BuilderConfig, + tasks::cache::TxPoller, + test_utils::{new_signed_tx, setup_logging, setup_test_config}, + }; + // Import the refactored function use eyre::{Ok, Result}; #[ignore = "integration test"] @@ -18,7 +20,7 @@ mod tests { post_tx(&config).await?; // Create a new poller - let mut poller = tx_poller::TxPoller::new(&config); + let mut poller = TxPoller::new(&config); // Fetch transactions the pool let transactions = poller.check_tx_cache().await?;