diff --git a/Cargo.lock b/Cargo.lock index d4966001b6..7a5a47e7c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3591,7 +3591,7 @@ dependencies = [ "serde", "serde_json", "spawned-concurrency", - "spawned-rt", + "spawned-rt 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "thiserror 2.0.17", "tikv-jemallocator", "tokio", @@ -3760,7 +3760,7 @@ dependencies = [ "serde_json", "serde_with", "spawned-concurrency", - "spawned-rt", + "spawned-rt 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "tabwriter", "thiserror 2.0.17", "tokio", @@ -3912,7 +3912,7 @@ dependencies = [ "sha2", "snap", "spawned-concurrency", - "spawned-rt", + "spawned-rt 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "thiserror 2.0.17", "tokio", "tokio-stream", @@ -4004,7 +4004,7 @@ dependencies = [ "serde_json", "sha2", "spawned-concurrency", - "spawned-rt", + "spawned-rt 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "thiserror 2.0.17", "tokio", "tokio-util", @@ -4054,6 +4054,7 @@ dependencies = [ "anyhow", "async-trait", "bytes", + "crossbeam 0.8.4", "ethereum-types 0.15.1", "ethrex-common", "ethrex-crypto", @@ -4068,6 +4069,8 @@ dependencies = [ "rustc-hash 2.1.1", "serde", "serde_json", + "spawned-concurrency", + "spawned-rt 0.4.3 (registry+https://github.com/rust-lang/crates.io-index)", "tempfile", "thiserror 2.0.17", "tokio", @@ -12399,12 +12402,11 @@ dependencies = [ [[package]] name = "spawned-concurrency" version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d99b97ba5429336b6fa8b6e8ecbe09a1c7324f8b9fa63a11262dbd3a44e577b" +source = "git+https://github.com/lambdaclass/spawned?rev=ce47cf1bf04a46e3ecd94d38843748b587c1c2ef#ce47cf1bf04a46e3ecd94d38843748b587c1c2ef" dependencies = [ "futures", "pin-project-lite", - "spawned-rt", + "spawned-rt 0.4.3 (git+https://github.com/lambdaclass/spawned?rev=ce47cf1bf04a46e3ecd94d38843748b587c1c2ef)", "thiserror 2.0.17", "tracing", ] @@ -12423,6 +12425,19 @@ dependencies = [ "tracing-subscriber 0.3.21", ] +[[package]] +name = "spawned-rt" +version = "0.4.3" +source = "git+https://github.com/lambdaclass/spawned?rev=ce47cf1bf04a46e3ecd94d38843748b587c1c2ef#ce47cf1bf04a46e3ecd94d38843748b587c1c2ef" +dependencies = [ + "crossbeam 0.7.3", + "tokio", + "tokio-stream", + "tokio-util", + "tracing", + "tracing-subscriber 0.3.21", +] + [[package]] name = "spin" version = "0.5.2" diff --git a/Cargo.toml b/Cargo.toml index c6e83275d4..14f7e73779 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -108,7 +108,7 @@ kzg-rs = "0.2.6" libsql = "0.9.10" futures = "0.3.31" aligned-sdk = { git = "https://github.com/yetanotherco/aligned_layer", rev = "c60d7eb147edbdf12bb7a7c6e92ec178d9f8da23" } -spawned-concurrency = "0.4.2" +spawned-concurrency = { git = "https://github.com/lambdaclass/spawned", version = "0.4.3", rev = "ce47cf1bf04a46e3ecd94d38843748b587c1c2ef" } spawned-rt = "0.4.2" lambdaworks-crypto = "0.13.0" tui-logger = { version = "0.17.3", features = ["tracing-support"] } diff --git a/crates/storage/Cargo.toml b/crates/storage/Cargo.toml index 18265586de..ab3034d85a 100644 --- a/crates/storage/Cargo.toml +++ b/crates/storage/Cargo.toml @@ -28,6 +28,9 @@ tokio = { workspace = true, features = ["rt"] } qfilter = "0.2.5" rayon.workspace = true lru = "0.16.2" +spawned-rt.workspace = true +spawned-concurrency.workspace = true +crossbeam.workspace = true [features] default = [] diff --git a/crates/storage/store.rs b/crates/storage/store.rs index 9bdfdfbcf2..75767ddb1b 100644 --- a/crates/storage/store.rs +++ b/crates/storage/store.rs @@ -39,6 +39,10 @@ use ethrex_trie::{EMPTY_TRIE_HASH, Nibbles, Trie, TrieLogger, TrieNode, TrieWitn use ethrex_trie::{Node, NodeRLP}; use lru::LruCache; use rustc_hash::FxBuildHasher; +use spawned_concurrency::{ + messages::Unused, + threads::{CastResponse, GenServer}, +}; use std::{ collections::{BTreeMap, HashMap, hash_map::Entry}, fmt::Debug, @@ -128,7 +132,7 @@ pub struct Store { chain_config: ChainConfig, trie_cache: Arc>>, flatkeyvalue_control_tx: std::sync::mpsc::SyncSender, - trie_update_worker_tx: std::sync::mpsc::SyncSender, + trie_update_worker_tx: crossbeam::channel::Sender, /// Keeps the latest canonical block header /// It's wrapped in an Arc to allow for cheap reads with infrequent writes /// Reading an out-of-date value is acceptable, since it's only used as: @@ -1363,7 +1367,7 @@ impl Store { debug!("Initializing Store with {commit_threshold} in-memory diff-layers"); let (fkv_tx, fkv_rx) = std::sync::mpsc::sync_channel(0); - let (trie_upd_tx, trie_upd_rx) = std::sync::mpsc::sync_channel(0); + let (trie_upd_tx, trie_upd_rx) = crossbeam::channel::bounded(0); let last_written = { let tx = backend.begin_read()?; @@ -1376,12 +1380,13 @@ impl Store { last_written } }; + let trie_cache = Arc::new(Mutex::new(Arc::new(TrieLayerCache::new(commit_threshold)))); let store = Self { db_path, backend, chain_config: Default::default(), latest_block_header: Default::default(), - trie_cache: Arc::new(Mutex::new(Arc::new(TrieLayerCache::new(commit_threshold)))), + trie_cache, flatkeyvalue_control_tx: fkv_tx, trie_update_worker_tx: trie_upd_tx, last_computed_flatkeyvalue: Arc::new(Mutex::new(last_written)), @@ -1410,46 +1415,11 @@ impl Store { let flatkeyvalue_control_tx = store.flatkeyvalue_control_tx.clone(); let trie_cache = store.trie_cache.clone(); /* - When a block is executed, the write of the bottom-most diff layer to disk is done in the background through this thread. + When a block is executed, the write of the bottom-most diff layer to disk is done in the background through this worker. This is to improve block execution times, since it's not necessary when executing the next block to have this layer flushed to disk. - - This background thread receives messages through a channel to apply new trie updates and does three things: - - - First, it updates the top-most in-memory diff layer and notifies the process that sent the message (i.e. the - block production thread) so it can continue with block execution (block execution cannot proceed without the - diff layers updated, otherwise it would see wrong state when reading from the trie). This section is done in an RCU manner: - a shared pointer with the trie is kept behind a lock. This thread first acquires the lock, then copies the pointer and drops the lock; - afterwards it makes a deep copy of the trie layer and mutates it, then takes the lock again, replaces the pointer with the updated copy, - then drops the lock again. - - - Second, it performs the logic of persisting the bottom-most diff layer to disk. This is the part of the logic that block execution does not - need to proceed. What does need to be aware of this section is the process in charge of generating the snapshot (a.k.a. FlatKeyValue). - Because of this, this section first sends a message to pause the FlatKeyValue generation, then persists the diff layer to disk, then notifies - again for FlatKeyValue generation to continue. - - - Third, it removes the (no longer needed) bottom-most diff layer from the trie layers in the same way as the first step. + This background worker receives messages through a channel to apply new trie updates. */ - std::thread::spawn(move || { - let rx = trie_upd_rx; - loop { - match rx.recv() { - Ok(trie_update) => { - // FIXME: what should we do on error? - let _ = apply_trie_updates( - backend.as_ref(), - &flatkeyvalue_control_tx, - &trie_cache, - trie_update, - ) - .inspect_err(|err| error!("apply_trie_updates failed: {err}")); - } - Err(err) => { - debug!("Trie update sender disconnected: {err}"); - return; - } - } - } - }); + TrieUpdateWorker::new(backend, flatkeyvalue_control_tx, trie_cache, trie_upd_rx).start(); Ok(store) } @@ -2461,6 +2431,7 @@ impl Store { type TrieNodesUpdate = Vec<(Nibbles, Vec)>; +#[derive(Debug, Clone)] struct TrieUpdate { result_sender: std::sync::mpsc::SyncSender>, parent_state_root: H256, @@ -2471,6 +2442,22 @@ struct TrieUpdate { // NOTE: we don't receive `Store` here to avoid cyclic dependencies // with the other end of `fkv_ctl` +/* +This function does three things: + - First, it updates the top-most in-memory diff layer and notifies the process that sent the message (i.e. the + block production thread) so it can continue with block execution (block execution cannot proceed without the + diff layers updated, otherwise it would see wrong state when reading from the trie). This section is done in an RCU manner: + a shared pointer with the trie is kept behind a lock. This function first acquires the lock, then copies the pointer and drops the lock; + afterwards it makes a deep copy of the trie layer and mutates it, then takes the lock again, replaces the pointer with the updated copy, + then drops the lock again. + + - Second, it performs the logic of persisting the bottom-most diff layer to disk. This is the part of the logic that block execution does not + need to proceed. What does need to be aware of this section is the process in charge of generating the snapshot (a.k.a. FlatKeyValue). + Because of this, this section first sends a message to pause the FlatKeyValue generation, then persists the diff layer to disk, then notifies + again for FlatKeyValue generation to continue. + + - Third, it removes the (no longer needed) bottom-most diff layer from the trie layers in the same way as the first step. +*/ fn apply_trie_updates( backend: &dyn StorageBackend, fkv_ctl: &SyncSender, @@ -3233,3 +3220,98 @@ mod tests { } } } + +#[derive(Debug, thiserror::Error)] +enum TrieUpdateWorkerError {} + +#[derive(Debug, Clone)] +enum TrieUpdateWorkerInMessage { + TrieUpdates(TrieUpdate), +} + +#[derive(Debug, Clone)] +enum TrieUpdateWorkerOutMessage {} + +#[derive(Debug)] +struct TrieUpdateWorker { + backend: Arc, + flatkeyvalue_control_tx: SyncSender, + trie_cache: Arc>>, + trie_upd_rx: crossbeam::channel::Receiver, +} + +impl TrieUpdateWorker { + fn new( + backend: Arc, + flatkeyvalue_control_tx: SyncSender, + trie_cache: Arc>>, + trie_upd_rx: crossbeam::channel::Receiver, + ) -> Self { + Self { + backend, + flatkeyvalue_control_tx, + trie_cache, + trie_upd_rx, + } + } + + fn apply_trie_updates(&self, trie_update: TrieUpdate) { + // FIXME: what should we do on error? + let _ = apply_trie_updates( + self.backend.as_ref(), + &self.flatkeyvalue_control_tx, + &self.trie_cache, + trie_update, + ) + .inspect_err(|err| error!("apply_trie_updates failed: {err}")); + } +} + +impl GenServer for TrieUpdateWorker { + type CallMsg = Unused; + type CastMsg = TrieUpdateWorkerInMessage; + type OutMsg = TrieUpdateWorkerOutMessage; + type Error = TrieUpdateWorkerError; + + fn init( + self, + handle: &spawned_concurrency::threads::GenServerHandle, + ) -> Result { + let mut handle = handle.clone(); + let rx = self.trie_upd_rx.clone(); + let _ = std::thread::spawn(move || { + let mut cancellation_token = handle.cancellation_token(); + loop { + match rx.recv() { + Ok(updates) => match handle.cast(Self::CastMsg::TrieUpdates(updates)) { + Ok(_) => {} + Err(e) => { + error!("Failed to send update: {e}"); + break; + } + }, + Err(err) => { + debug!("Trie update sender disconnected: {err}"); + return; + } + } + if cancellation_token.is_cancelled() { + tracing::trace!("TrieUpdateWorker stopped"); + break; + } + } + }); + Ok(self) + } + + fn handle_cast( + &mut self, + message: Self::CastMsg, + _handle: &spawned_concurrency::threads::GenServerHandle, + ) -> spawned_concurrency::threads::CastResponse { + match message { + Self::CastMsg::TrieUpdates(trie_update) => self.apply_trie_updates(trie_update), + }; + CastResponse::NoReply + } +}