Skip to content

Commit e82c2cb

Browse files
committed
feat: cache task
1 parent 4822c6f commit e82c2cb

File tree

10 files changed

+101
-24
lines changed

10 files changed

+101
-24
lines changed

Cargo.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,12 @@ integration = []
2727
[dependencies]
2828
init4-bin-base = "0.3"
2929

30-
signet-zenith = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }
31-
signet-constants = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }
32-
signet-types = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }
3330
signet-bundle = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }
31+
signet-constants = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }
3432
signet-sim = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }
33+
signet-tx-cache = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }
34+
signet-types = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }
35+
signet-zenith = { git = "https://github.com/init4tech/signet-sdk", branch = "main" }
3536

3637
trevm = { version = "0.20.10", features = ["concurrent-db", "test-utils"] }
3738

bin/builder.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
use builder::{
22
config::BuilderConfig,
33
service::serve_builder,
4-
tasks::{block::sim::Simulator, bundler, metrics::MetricsTask, submit::SubmitTask, tx_poller},
4+
tasks::{
5+
block::sim::Simulator,
6+
cache::{BundlePoller, TxPoller},
7+
metrics::MetricsTask,
8+
submit::SubmitTask,
9+
},
510
};
611
use init4_bin_base::{deps::tracing, utils::from_env::FromEnv};
712
use signet_sim::SimCache;
@@ -32,10 +37,10 @@ async fn main() -> eyre::Result<()> {
3237
let submit =
3338
SubmitTask { zenith, quincey, config: config.clone(), outbound_tx_channel: tx_channel };
3439

35-
let tx_poller = tx_poller::TxPoller::new(&config);
40+
let tx_poller = TxPoller::new(&config);
3641
let (tx_receiver, tx_poller_jh) = tx_poller.spawn();
3742

38-
let bundle_poller = bundler::BundlePoller::new(&config, token);
43+
let bundle_poller = BundlePoller::new(&config, token);
3944
let (bundle_receiver, bundle_poller_jh) = bundle_poller.spawn();
4045

4146
let (submit_channel, submit_jh) = submit.spawn();

src/tasks/block/sim.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
use super::cfg::PecorinoBlockEnv;
55
use crate::{
66
config::{BuilderConfig, RuProvider},
7-
tasks::{block::cfg::PecorinoCfg, bundler::Bundle},
7+
tasks::{block::cfg::PecorinoCfg, cache::Bundle},
88
};
99
use alloy::{
1010
consensus::TxEnvelope,

src/tasks/bundler.rs renamed to src/tasks/cache/bundle.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ pub struct Bundle {
2222

2323
/// Response from the tx-pool containing a list of bundles.
2424
#[derive(Debug, Clone, Serialize, Deserialize)]
25-
pub struct TxPoolBundleResponse {
25+
struct TxPoolBundleResponse {
2626
/// Bundle responses are available on the bundles property.
2727
pub bundles: Vec<Bundle>,
2828
}

src/tasks/cache/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
mod task;
2+
pub use task::CacheTask;
3+
4+
mod tx;
5+
pub use tx::TxPoller;
6+
7+
mod bundle;
8+
pub use bundle::{Bundle, BundlePoller};

src/tasks/cache/task.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
use alloy::consensus::TxEnvelope;
2+
use init4_bin_base::deps::tracing::{debug, info};
3+
use signet_sim::SimCache;
4+
use signet_tx_cache::types::TxCacheBundle;
5+
use tokio::{
6+
sync::{mpsc, watch},
7+
task::JoinHandle,
8+
};
9+
use trevm::revm::context::BlockEnv;
10+
11+
/// Cache task for the block builder.
12+
///
13+
/// This tasks handles the ingestion of transactions and bundles into the cache.
14+
/// It keeps a receiver for the block environment and cleans the cache when
15+
/// the environment changes.
16+
#[derive(Debug)]
17+
pub struct CacheTask {
18+
/// The shared sim cache to populate.
19+
cache: SimCache,
20+
21+
/// The channel to receive the block environment.
22+
env: watch::Receiver<Option<BlockEnv>>,
23+
24+
/// The channel to receive the transaction bundles.
25+
bundles: mpsc::UnboundedReceiver<TxCacheBundle>,
26+
/// The channel to receive the transactions.
27+
txns: mpsc::UnboundedReceiver<TxEnvelope>,
28+
}
29+
30+
impl CacheTask {
31+
async fn task_future(mut self) {
32+
loop {
33+
let mut basefee = 0;
34+
tokio::select! {
35+
biased;
36+
res = self.env.changed() => {
37+
if res.is_err() {
38+
debug!("Cache task: env channel closed, exiting");
39+
break;
40+
}
41+
if let Some(env) = self.env.borrow_and_update().as_ref() {
42+
basefee = env.basefee;
43+
info!(basefee, number = env.number, timestamp = env.timestamp, "block env changed, clearing cache");
44+
self.cache.clean(
45+
env.number, env.timestamp
46+
);
47+
}
48+
}
49+
Some(bundle) = self.bundles.recv() => {
50+
self.cache.add_item(bundle.bundle, basefee);
51+
}
52+
Some(txn) = self.txns.recv() => {
53+
self.cache.add_item(txn, basefee);
54+
}
55+
}
56+
}
57+
}
58+
59+
/// Spawn the cache task.
60+
pub fn spawn(self) -> JoinHandle<()> {
61+
let fut = self.task_future();
62+
tokio::spawn(fut)
63+
}
64+
}

src/tasks/tx_poller.rs renamed to src/tasks/cache/tx.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use tokio::{sync::mpsc, task::JoinHandle, time};
1010

1111
/// Models a response from the transaction pool.
1212
#[derive(Debug, Clone, Serialize, Deserialize)]
13-
pub struct TxPoolResponse {
13+
struct TxPoolResponse {
1414
/// Holds the transactions property as a list on the response.
1515
transactions: Vec<TxEnvelope>,
1616
}

src/tasks/mod.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1-
/// Bundle poller task
2-
pub mod bundler;
1+
/// Block creation task
2+
pub mod block;
3+
4+
/// Cache ingestion task
5+
pub mod cache;
36

47
/// Tx submission metric task
58
pub mod metrics;
@@ -10,11 +13,5 @@ pub mod oauth;
1013
/// Tx submission task
1114
pub mod submit;
1215

13-
/// Tx polling task
14-
pub mod tx_poller;
15-
16-
/// Block simulation and environment
17-
pub mod block;
18-
1916
/// Constructs the simualtion environment.
2017
pub mod env;

tests/bundle_poller_test.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ mod tests {
88
let config = test_utils::setup_test_config().unwrap();
99
let auth = Authenticator::new(&config)?;
1010

11-
let mut bundle_poller = builder::tasks::bundler::BundlePoller::new(&config, auth.token());
11+
let mut bundle_poller = builder::tasks::cache::BundlePoller::new(&config, auth.token());
1212

1313
let _ = bundle_poller.check_bundle_cache().await?;
1414

tests/tx_poller_test.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
mod tests {
2-
use alloy::primitives::U256;
3-
use alloy::signers::local::PrivateKeySigner;
4-
use builder::config::BuilderConfig;
5-
use builder::tasks::tx_poller;
6-
use builder::test_utils::{new_signed_tx, setup_logging, setup_test_config}; // Import the refactored function
2+
use alloy::{primitives::U256, signers::local::PrivateKeySigner};
3+
use builder::{
4+
config::BuilderConfig,
5+
tasks::cache::TxPoller,
6+
test_utils::{new_signed_tx, setup_logging, setup_test_config},
7+
};
8+
// Import the refactored function
79
use eyre::{Ok, Result};
810

911
#[ignore = "integration test"]
@@ -18,7 +20,7 @@ mod tests {
1820
post_tx(&config).await?;
1921

2022
// Create a new poller
21-
let mut poller = tx_poller::TxPoller::new(&config);
23+
let mut poller = TxPoller::new(&config);
2224

2325
// Fetch transactions the pool
2426
let transactions = poller.check_tx_cache().await?;

0 commit comments

Comments
 (0)