Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ kzg-rs = "0.2.6"
libsql = "0.9.10"
futures = "0.3.31"
aligned-sdk = { git = "https://github.com/yetanotherco/aligned_layer", rev = "c60d7eb147edbdf12bb7a7c6e92ec178d9f8da23" }
spawned-concurrency = "0.4.2"
spawned-concurrency = { git = "https://github.com/lambdaclass/spawned", version = "0.4.3", rev = "ce47cf1bf04a46e3ecd94d38843748b587c1c2ef" }
spawned-rt = "0.4.2"
lambdaworks-crypto = "0.13.0"
tui-logger = { version = "0.17.3", features = ["tracing-support"] }
Expand Down
3 changes: 3 additions & 0 deletions crates/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ tokio = { workspace = true, features = ["rt"] }
qfilter = "0.2.5"
rayon.workspace = true
lru = "0.16.2"
spawned-rt.workspace = true
spawned-concurrency.workspace = true
crossbeam.workspace = true

[features]
default = []
Expand Down
164 changes: 123 additions & 41 deletions crates/storage/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ use ethrex_trie::{EMPTY_TRIE_HASH, Nibbles, Trie, TrieLogger, TrieNode, TrieWitn
use ethrex_trie::{Node, NodeRLP};
use lru::LruCache;
use rustc_hash::FxBuildHasher;
use spawned_concurrency::{
messages::Unused,
threads::{CastResponse, GenServer},
};
use std::{
collections::{BTreeMap, HashMap, hash_map::Entry},
fmt::Debug,
Expand Down Expand Up @@ -128,7 +132,7 @@ pub struct Store {
chain_config: ChainConfig,
trie_cache: Arc<Mutex<Arc<TrieLayerCache>>>,
flatkeyvalue_control_tx: std::sync::mpsc::SyncSender<FKVGeneratorControlMessage>,
trie_update_worker_tx: std::sync::mpsc::SyncSender<TrieUpdate>,
trie_update_worker_tx: crossbeam::channel::Sender<TrieUpdate>,
/// Keeps the latest canonical block header
/// It's wrapped in an Arc to allow for cheap reads with infrequent writes
/// Reading an out-of-date value is acceptable, since it's only used as:
Expand Down Expand Up @@ -1363,7 +1367,7 @@ impl Store {

debug!("Initializing Store with {commit_threshold} in-memory diff-layers");
let (fkv_tx, fkv_rx) = std::sync::mpsc::sync_channel(0);
let (trie_upd_tx, trie_upd_rx) = std::sync::mpsc::sync_channel(0);
let (trie_upd_tx, trie_upd_rx) = crossbeam::channel::bounded(0);

let last_written = {
let tx = backend.begin_read()?;
Expand All @@ -1376,12 +1380,13 @@ impl Store {
last_written
}
};
let trie_cache = Arc::new(Mutex::new(Arc::new(TrieLayerCache::new(commit_threshold))));
let store = Self {
db_path,
backend,
chain_config: Default::default(),
latest_block_header: Default::default(),
trie_cache: Arc::new(Mutex::new(Arc::new(TrieLayerCache::new(commit_threshold)))),
trie_cache,
flatkeyvalue_control_tx: fkv_tx,
trie_update_worker_tx: trie_upd_tx,
last_computed_flatkeyvalue: Arc::new(Mutex::new(last_written)),
Expand Down Expand Up @@ -1410,46 +1415,11 @@ impl Store {
let flatkeyvalue_control_tx = store.flatkeyvalue_control_tx.clone();
let trie_cache = store.trie_cache.clone();
/*
When a block is executed, the write of the bottom-most diff layer to disk is done in the background through this thread.
When a block is executed, the write of the bottom-most diff layer to disk is done in the background through this worker.
This is to improve block execution times, since it's not necessary when executing the next block to have this layer flushed to disk.

This background thread receives messages through a channel to apply new trie updates and does three things:

- First, it updates the top-most in-memory diff layer and notifies the process that sent the message (i.e. the
block production thread) so it can continue with block execution (block execution cannot proceed without the
diff layers updated, otherwise it would see wrong state when reading from the trie). This section is done in an RCU manner:
a shared pointer with the trie is kept behind a lock. This thread first acquires the lock, then copies the pointer and drops the lock;
afterwards it makes a deep copy of the trie layer and mutates it, then takes the lock again, replaces the pointer with the updated copy,
then drops the lock again.

- Second, it performs the logic of persisting the bottom-most diff layer to disk. This is the part of the logic that block execution does not
need to proceed. What does need to be aware of this section is the process in charge of generating the snapshot (a.k.a. FlatKeyValue).
Because of this, this section first sends a message to pause the FlatKeyValue generation, then persists the diff layer to disk, then notifies
again for FlatKeyValue generation to continue.

- Third, it removes the (no longer needed) bottom-most diff layer from the trie layers in the same way as the first step.
This background worker receives messages through a channel to apply new trie updates.
*/
std::thread::spawn(move || {
let rx = trie_upd_rx;
loop {
match rx.recv() {
Ok(trie_update) => {
// FIXME: what should we do on error?
let _ = apply_trie_updates(
backend.as_ref(),
&flatkeyvalue_control_tx,
&trie_cache,
trie_update,
)
.inspect_err(|err| error!("apply_trie_updates failed: {err}"));
}
Err(err) => {
debug!("Trie update sender disconnected: {err}");
return;
}
}
}
});
TrieUpdateWorker::new(backend, flatkeyvalue_control_tx, trie_cache, trie_upd_rx).start();
Ok(store)
}

Expand Down Expand Up @@ -2461,6 +2431,7 @@ impl Store {

type TrieNodesUpdate = Vec<(Nibbles, Vec<u8>)>;

#[derive(Debug, Clone)]
struct TrieUpdate {
result_sender: std::sync::mpsc::SyncSender<Result<(), StoreError>>,
parent_state_root: H256,
Expand All @@ -2471,6 +2442,22 @@ struct TrieUpdate {

// NOTE: we don't receive `Store` here to avoid cyclic dependencies
// with the other end of `fkv_ctl`
Comment on lines 2443 to 2444
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this as a footnote to the bigger comment. With the changes it looks strange.

/*
This function does three things:
- First, it updates the top-most in-memory diff layer and notifies the process that sent the message (i.e. the
block production thread) so it can continue with block execution (block execution cannot proceed without the
diff layers updated, otherwise it would see wrong state when reading from the trie). This section is done in an RCU manner:
a shared pointer with the trie is kept behind a lock. This function first acquires the lock, then copies the pointer and drops the lock;
afterwards it makes a deep copy of the trie layer and mutates it, then takes the lock again, replaces the pointer with the updated copy,
then drops the lock again.

- Second, it performs the logic of persisting the bottom-most diff layer to disk. This is the part of the logic that block execution does not
need to proceed. What does need to be aware of this section is the process in charge of generating the snapshot (a.k.a. FlatKeyValue).
Because of this, this section first sends a message to pause the FlatKeyValue generation, then persists the diff layer to disk, then notifies
again for FlatKeyValue generation to continue.

- Third, it removes the (no longer needed) bottom-most diff layer from the trie layers in the same way as the first step.
*/
fn apply_trie_updates(
backend: &dyn StorageBackend,
fkv_ctl: &SyncSender<FKVGeneratorControlMessage>,
Expand Down Expand Up @@ -3233,3 +3220,98 @@ mod tests {
}
}
}

#[derive(Debug, thiserror::Error)]
enum TrieUpdateWorkerError {}

#[derive(Debug, Clone)]
enum TrieUpdateWorkerInMessage {
TrieUpdates(TrieUpdate),
}

#[derive(Debug, Clone)]
enum TrieUpdateWorkerOutMessage {}

#[derive(Debug)]
struct TrieUpdateWorker {
backend: Arc<dyn StorageBackend>,
flatkeyvalue_control_tx: SyncSender<FKVGeneratorControlMessage>,
trie_cache: Arc<Mutex<Arc<TrieLayerCache>>>,
trie_upd_rx: crossbeam::channel::Receiver<TrieUpdate>,
}

impl TrieUpdateWorker {
fn new(
backend: Arc<dyn StorageBackend>,
flatkeyvalue_control_tx: SyncSender<FKVGeneratorControlMessage>,
trie_cache: Arc<Mutex<Arc<TrieLayerCache>>>,
trie_upd_rx: crossbeam::channel::Receiver<TrieUpdate>,
) -> Self {
Self {
backend,
flatkeyvalue_control_tx,
trie_cache,
trie_upd_rx,
}
}

fn apply_trie_updates(&self, trie_update: TrieUpdate) {
// FIXME: what should we do on error?
let _ = apply_trie_updates(
self.backend.as_ref(),
&self.flatkeyvalue_control_tx,
&self.trie_cache,
trie_update,
)
.inspect_err(|err| error!("apply_trie_updates failed: {err}"));
}
}

impl GenServer for TrieUpdateWorker {
type CallMsg = Unused;
type CastMsg = TrieUpdateWorkerInMessage;
type OutMsg = TrieUpdateWorkerOutMessage;
type Error = TrieUpdateWorkerError;

fn init(
self,
handle: &spawned_concurrency::threads::GenServerHandle<Self>,
) -> Result<Self, Self::Error> {
let mut handle = handle.clone();
let rx = self.trie_upd_rx.clone();
let _ = std::thread::spawn(move || {
let mut cancellation_token = handle.cancellation_token();
loop {
Comment on lines +3282 to +3284
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused, with the new actor shouldn't this be handled by spawned? I thought the goal would be for the worker implementation to only need to worry about the loop body.

match rx.recv() {
Ok(updates) => match handle.cast(Self::CastMsg::TrieUpdates(updates)) {
Ok(_) => {}
Err(e) => {
error!("Failed to send update: {e}");
break;
}
},
Err(err) => {
debug!("Trie update sender disconnected: {err}");
return;
}
}
if cancellation_token.is_cancelled() {
tracing::trace!("TrieUpdateWorker stopped");
break;
}
}
});
Ok(self)
}

fn handle_cast(
&mut self,
message: Self::CastMsg,
_handle: &spawned_concurrency::threads::GenServerHandle<Self>,
) -> spawned_concurrency::threads::CastResponse {
match message {
Self::CastMsg::TrieUpdates(trie_update) => self.apply_trie_updates(trie_update),
};
CastResponse::NoReply
}
}
Loading