Skip to content

feat: cache task #90

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ integration = []
[dependencies]
init4-bin-base = "0.3"

signet-zenith = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }
signet-constants = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }
signet-types = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }
signet-bundle = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }
signet-constants = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }
signet-sim = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }
signet-tx-cache = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }
signet-types = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }
signet-zenith = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }

trevm = { version = "0.20.10", features = ["concurrent-db", "test-utils"] }

Expand Down
11 changes: 8 additions & 3 deletions bin/builder.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use builder::{
config::BuilderConfig,
service::serve_builder,
tasks::{block::sim::Simulator, bundler, metrics::MetricsTask, submit::SubmitTask, tx_poller},
tasks::{
block::sim::Simulator,
cache::{BundlePoller, TxPoller},
metrics::MetricsTask,
submit::SubmitTask,
},
};
use init4_bin_base::{deps::tracing, utils::from_env::FromEnv};
use signet_sim::SimCache;
Expand Down Expand Up @@ -32,10 +37,10 @@ async fn main() -> eyre::Result<()> {
let submit =
SubmitTask { zenith, quincey, config: config.clone(), outbound_tx_channel: tx_channel };

let tx_poller = tx_poller::TxPoller::new(&config);
let tx_poller = TxPoller::new(&config);
let (tx_receiver, tx_poller_jh) = tx_poller.spawn();

let bundle_poller = bundler::BundlePoller::new(&config, token);
let bundle_poller = BundlePoller::new(&config, token);
let (bundle_receiver, bundle_poller_jh) = bundle_poller.spawn();

let (submit_channel, submit_jh) = submit.spawn();
Expand Down
2 changes: 2 additions & 0 deletions src/signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub enum LocalOrAws {
}

/// Error during signing
#[allow(clippy::large_enum_variant)] // type about to be deleted
#[derive(Debug, thiserror::Error)]
pub enum SignerError {
/// Error during [`AwsSigner`] instantiation
Expand Down Expand Up @@ -44,6 +45,7 @@ impl LocalOrAws {
/// # Panics
///
/// Panics if the env var contents is not a valid secp256k1 private key.
#[allow(clippy::result_large_err)] // type about to be deleted
fn wallet(private_key: &str) -> Result<PrivateKeySigner, SignerError> {
let bytes = hex::decode(private_key.strip_prefix("0x").unwrap_or(private_key))?;
Ok(PrivateKeySigner::from_slice(&bytes).unwrap())
Expand Down
2 changes: 1 addition & 1 deletion src/tasks/block/sim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use super::cfg::PecorinoBlockEnv;
use crate::{
config::{BuilderConfig, RuProvider},
tasks::{block::cfg::PecorinoCfg, bundler::Bundle},
tasks::{block::cfg::PecorinoCfg, cache::Bundle},
};
use alloy::{
consensus::TxEnvelope,
Expand Down
2 changes: 1 addition & 1 deletion src/tasks/bundler.rs → src/tasks/cache/bundle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub struct Bundle {

/// Response from the tx-pool containing a list of bundles.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TxPoolBundleResponse {
struct TxPoolBundleResponse {
/// Bundle responses are available on the bundles property.
pub bundles: Vec<Bundle>,
}
Expand Down
8 changes: 8 additions & 0 deletions src/tasks/cache/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
mod task;
pub use task::CacheTask;

mod tx;
pub use tx::TxPoller;

mod bundle;
pub use bundle::{Bundle, BundlePoller};
64 changes: 64 additions & 0 deletions src/tasks/cache/task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
use alloy::consensus::TxEnvelope;
use init4_bin_base::deps::tracing::{debug, info};
use signet_sim::SimCache;
use signet_tx_cache::types::TxCacheBundle;
use tokio::{
sync::{mpsc, watch},
task::JoinHandle,
};
use trevm::revm::context::BlockEnv;

/// Cache task for the block builder.
///
/// This tasks handles the ingestion of transactions and bundles into the cache.
/// It keeps a receiver for the block environment and cleans the cache when
/// the environment changes.
#[derive(Debug)]
pub struct CacheTask {
/// The shared sim cache to populate.
cache: SimCache,

/// The channel to receive the block environment.
env: watch::Receiver<Option<BlockEnv>>,

/// The channel to receive the transaction bundles.
bundles: mpsc::UnboundedReceiver<TxCacheBundle>,
/// The channel to receive the transactions.
txns: mpsc::UnboundedReceiver<TxEnvelope>,
}

impl CacheTask {
async fn task_future(mut self) {
loop {
let mut basefee = 0;
tokio::select! {
biased;
res = self.env.changed() => {
if res.is_err() {
debug!("Cache task: env channel closed, exiting");
break;
}
if let Some(env) = self.env.borrow_and_update().as_ref() {
basefee = env.basefee;
info!(basefee, number = env.number, timestamp = env.timestamp, "block env changed, clearing cache");
self.cache.clean(
env.number, env.timestamp
);
}
}
Some(bundle) = self.bundles.recv() => {
self.cache.add_item(bundle.bundle, basefee);
}
Some(txn) = self.txns.recv() => {
self.cache.add_item(txn, basefee);
}
}
}
}

/// Spawn the cache task.
pub fn spawn(self) -> JoinHandle<()> {
let fut = self.task_future();
tokio::spawn(fut)
}
}
2 changes: 1 addition & 1 deletion src/tasks/tx_poller.rs → src/tasks/cache/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tokio::{sync::mpsc, task::JoinHandle, time};

/// Models a response from the transaction pool.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TxPoolResponse {
struct TxPoolResponse {
/// Holds the transactions property as a list on the response.
transactions: Vec<TxEnvelope>,
}
Expand Down
13 changes: 5 additions & 8 deletions src/tasks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
/// Bundle poller task
pub mod bundler;
/// Block creation task
pub mod block;

/// Cache ingestion task
pub mod cache;

/// Tx submission metric task
pub mod metrics;
Expand All @@ -10,11 +13,5 @@ pub mod oauth;
/// Tx submission task
pub mod submit;

/// Tx polling task
pub mod tx_poller;

/// Block simulation and environment
pub mod block;

/// Constructs the simualtion environment.
pub mod env;
2 changes: 1 addition & 1 deletion tests/bundle_poller_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod tests {
let config = test_utils::setup_test_config().unwrap();
let auth = Authenticator::new(&config)?;

let mut bundle_poller = builder::tasks::bundler::BundlePoller::new(&config, auth.token());
let mut bundle_poller = builder::tasks::cache::BundlePoller::new(&config, auth.token());

let _ = bundle_poller.check_bundle_cache().await?;

Expand Down
14 changes: 8 additions & 6 deletions tests/tx_poller_test.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
mod tests {
use alloy::primitives::U256;
use alloy::signers::local::PrivateKeySigner;
use builder::config::BuilderConfig;
use builder::tasks::tx_poller;
use builder::test_utils::{new_signed_tx, setup_logging, setup_test_config}; // Import the refactored function
use alloy::{primitives::U256, signers::local::PrivateKeySigner};
use builder::{
config::BuilderConfig,
tasks::cache::TxPoller,
test_utils::{new_signed_tx, setup_logging, setup_test_config},
};
// Import the refactored function
use eyre::{Ok, Result};

#[ignore = "integration test"]
Expand All @@ -18,7 +20,7 @@ mod tests {
post_tx(&config).await?;

// Create a new poller
let mut poller = tx_poller::TxPoller::new(&config);
let mut poller = TxPoller::new(&config);

// Fetch transactions the pool
let transactions = poller.check_tx_cache().await?;
Expand Down
Loading