From 922abbdf59516688d9005e5a5fc28f0effc5bf59 Mon Sep 17 00:00:00 2001 From: Esteve Soler Arderiu Date: Mon, 17 Nov 2025 13:42:23 +0100 Subject: [PATCH 1/5] Refactor `TrieLayerCache` structure. --- crates/storage/trie_db/layering.rs | 225 ++++++++--------------------- 1 file changed, 63 insertions(+), 162 deletions(-) diff --git a/crates/storage/trie_db/layering.rs b/crates/storage/trie_db/layering.rs index a1e9e2b227e..e4945187d11 100644 --- a/crates/storage/trie_db/layering.rs +++ b/crates/storage/trie_db/layering.rs @@ -1,181 +1,82 @@ use ethrex_common::H256; -use rustc_hash::FxHashMap; -use std::sync::Arc; - use ethrex_trie::{Nibbles, TrieDB, TrieError}; +use rustc_hash::FxHashMap; +use std::{collections::hash_map::Entry, mem, sync::Arc}; -#[derive(Debug, Clone)] -struct TrieLayer { - nodes: FxHashMap, Vec>, - parent: H256, - id: usize, -} - -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub struct TrieLayerCache { - /// Monotonically increasing ID for layers, starting at 1. - /// TODO: this implementation panics on overflow - last_id: usize, - layers: FxHashMap>, - /// Global bloom that accrues all layer blooms. - /// - /// The bloom filter is used to avoid looking up all layers when the given path doesn't exist in any - /// layer, thus going directly to the database. - /// - /// In case a bloom filter insert or merge fails, we need to mark the bloom filter as poisoned - /// so we never use it again, because if we don't we may be misled into believing a key is not present - /// on a diff layer when it is (i.e. a false negative), leading to wrong executions. - bloom: Option, -} - -impl Default for TrieLayerCache { - fn default() -> Self { - // Try to create the bloom filter, if it fails use poison mode. - let bloom = Self::create_filter().ok(); - Self { - bloom, - last_id: 0, - layers: Default::default(), - } - } + /// Mapping from keys to entries (from all layers). + data: FxHashMap, TrieLayerCacheEntry>>, } impl TrieLayerCache { - // TODO: tune this - fn create_filter() -> Result { - qfilter::Filter::new_resizeable(1_000_000, 100_000_000, 0.02) - .inspect_err(|e| tracing::warn!("could not create trie layering bloom filter {e}")) - } - - pub fn get(&self, state_root: H256, key: &[u8]) -> Option> { - // Fast check to know if any layer may contains the given key. - // We can only be certain it doesn't exist, but if it returns true it may or not exist (false positive). - if let Some(filter) = &self.bloom - && !filter.contains(key) - { - // TrieWrapper goes to db when returning None. - return None; - } - - let mut current_state_root = state_root; - - while let Some(layer) = self.layers.get(¤t_state_root) { - if let Some(value) = layer.nodes.get(key) { - return Some(value.clone()); - } - current_state_root = layer.parent; - if current_state_root == state_root { - // TODO: check if this is possible in practice - // This can't happen in L1, due to system contracts irreversibly modifying state - // at each block. - // On L2, if no transactions are included in a block, the state root remains the same, - // but we handle that case in put_batch. It may happen, however, if someone modifies - // state with a privileged tx and later reverts it (since it doesn't update nonce). - panic!("State cycle found"); - } - } - None + /// Obtain the cached value from any layer given its key. + pub fn get(&self, key: &[u8]) -> Option<&[u8]> { + self.data.get(key).map(|entry| entry.value.as_slice()) } - // TODO: use finalized hash to know when to commit - pub fn get_commitable(&self, mut state_root: H256, commit_threshold: usize) -> Option { - let mut counter = 0; - while let Some(layer) = self.layers.get(&state_root) { - state_root = layer.parent; - counter += 1; - if counter > commit_threshold { - return Some(state_root); - } - } - None - } - - pub fn put_batch( - &mut self, - parent: H256, - state_root: H256, - key_values: Vec<(Nibbles, Vec)>, - ) { - if parent == state_root && key_values.is_empty() { - return; - } else if parent == state_root { - tracing::error!("Inconsistent state: parent == state_root but key_values not empty"); - return; - } - if self.layers.contains_key(&state_root) { - tracing::warn!("tried to insert a state_root that's already inserted"); - return; - } - - // add this new bloom to the global one. - if let Some(filter) = &mut self.bloom { - for (p, _) in &key_values { - if let Err(qfilter::Error::CapacityExceeded) = filter.insert(p.as_ref()) { - tracing::warn!("TrieLayerCache: put_batch capacity exceeded"); - self.bloom = None; - break; + /// Write a batch of items into the cache at the last layer. + /// + /// Items that were already present in that layer will be overwritten. Use + /// [`TrieLayerCache::commit`] to advance the layers before putting items into the new layer. + pub fn put_iter(&mut self, iter: impl IntoIterator, Vec)>) { + for (key, value) in iter { + match self.data.entry(key) { + Entry::Occupied(entry) => { + let entry = entry.into_mut(); + + let prev_value = mem::replace(&mut entry.value, value); + if entry.layers & 1 == 0 { + entry.previous.push(prev_value); + } + + entry.layers |= 1; + } + Entry::Vacant(entry) => { + entry.insert(TrieLayerCacheEntry { + value, + layers: 1u128, + previous: Vec::new(), + }); } } } - - let nodes: FxHashMap, Vec> = key_values - .into_iter() - .map(|(path, value)| (path.into_vec(), value)) - .collect(); - - self.last_id += 1; - let entry = TrieLayer { - nodes, - parent, - id: self.last_id, - }; - self.layers.insert(state_root, Arc::new(entry)); } - /// Rebuilds the global bloom filter by inserting all keys from all layers. - pub fn rebuild_bloom(&mut self) { - let Ok(mut new_global_filter) = Self::create_filter() else { - tracing::warn!( - "TrieLayerCache: rebuild_bloom could not create new filter. Poisoning bloom." - ); - self.bloom = None; - return; - }; - - for layer in self.layers.values() { - for path in layer.nodes.keys() { - if let Err(qfilter::Error::CapacityExceeded) = new_global_filter.insert(path) { - tracing::warn!( - "TrieLayerCache: rebuild_bloom capacity exceeded. Poisoning bloom." - ); - self.bloom = None; - return; - } + /// Return an iterator to extract the elements of the 128th layer. + /// + /// If there are not yet 128 layers, it'll return an empty iterator. + /// Dropping the iterator will not leave the cache in an inconsistent state. All remaining items + /// will be dropped. + pub fn commit(&mut self) -> Vec<(Vec, Vec)> { + let mut items = Vec::new(); + self.data.retain(|key, entry| { + entry.layers <<= 1; + if entry.layers == 0 { + items.push(( + key.clone(), + if entry.previous.is_empty() { + mem::take(&mut entry.value) + } else { + entry.previous.remove(0) + }, + )); + + true + } else { + false } - } + }); - self.bloom = Some(new_global_filter); + items } +} - pub fn commit(&mut self, state_root: H256) -> Option, Vec)>> { - let mut layers_to_commit = vec![]; - let mut current_state_root = state_root; - while let Some(layer) = self.layers.remove(¤t_state_root) { - let layer = Arc::unwrap_or_clone(layer); - current_state_root = layer.parent; - layers_to_commit.push(layer); - } - let top_layer_id = layers_to_commit.first()?.id; - // older layers are useless - self.layers.retain(|_, item| item.id > top_layer_id); - self.rebuild_bloom(); // layers removed, rebuild global bloom filter. - let nodes_to_commit = layers_to_commit - .into_iter() - .rev() - .flat_map(|layer| layer.nodes) - .collect(); - Some(nodes_to_commit) - } +#[derive(Clone, Debug)] +struct TrieLayerCacheEntry { + value: T, + layers: u128, + previous: Vec, } pub struct TrieWrapper { @@ -203,8 +104,8 @@ impl TrieDB for TrieWrapper { } fn get(&self, key: Nibbles) -> Result>, TrieError> { let key = apply_prefix(self.prefix, key); - if let Some(value) = self.inner.get(self.state_root, key.as_ref()) { - return Ok(Some(value)); + if let Some(value) = self.inner.get(key.as_ref()) { + return Ok(Some(value.to_vec())); } self.db.get(key) } From f810c4547eaa199906554e2a4f9ae75af586fa92 Mon Sep 17 00:00:00 2001 From: Esteve Soler Arderiu Date: Mon, 17 Nov 2025 13:57:34 +0100 Subject: [PATCH 2/5] Fix `crates/storage/store_db/in_memory.rs`. --- crates/storage/store_db/in_memory.rs | 66 ++++++++++------------------ 1 file changed, 24 insertions(+), 42 deletions(-) diff --git a/crates/storage/store_db/in_memory.rs b/crates/storage/store_db/in_memory.rs index eba25d5ab22..c52612c2ca9 100644 --- a/crates/storage/store_db/in_memory.rs +++ b/crates/storage/store_db/in_memory.rs @@ -91,58 +91,40 @@ impl StoreEngine for Store { let mut store = self.inner()?; // Store trie updates - let mut trie = TrieLayerCache::clone(&store.trie_cache); - let parent = update_batch - .blocks - .first() - .ok_or(StoreError::UpdateBatchNoBlocks)? - .header - .parent_hash; - - let pre_state_root = store - .headers - .get(&parent) - .map(|header| header.state_root) - .unwrap_or_default(); - - let last_state_root = update_batch - .blocks - .last() - .ok_or(StoreError::UpdateBatchNoBlocks)? - .header - .state_root; + let nodes = { + let trie = Arc::make_mut(&mut store.trie_cache); + + let nodes = trie.commit(); + trie.put_iter( + update_batch + .storage_updates + .into_iter() + .flat_map(|(account_hash, nodes)| { + nodes + .into_iter() + .map(move |(path, node)| (apply_prefix(Some(account_hash), path), node)) + }) + .chain(update_batch.account_updates) + .map(|(key, value)| (key.into_vec(), value)), + ); + + nodes + }; { let mut state_trie = store .state_trie_nodes .lock() .map_err(|_| StoreError::LockError)?; - - if let Some(root) = trie.get_commitable(pre_state_root, COMMIT_THRESHOLD) { - let nodes = trie.commit(root).unwrap_or_default(); - for (key, value) in nodes { - if value.is_empty() { - state_trie.remove(&key); - } else { - state_trie.insert(key, value); - } + for (key, value) in nodes { + if value.is_empty() { + state_trie.remove(&key); + } else { + state_trie.insert(key, value); } } } - let key_values = update_batch - .storage_updates - .into_iter() - .flat_map(|(account_hash, nodes)| { - nodes - .into_iter() - .map(move |(path, node)| (apply_prefix(Some(account_hash), path), node)) - }) - .chain(update_batch.account_updates) - .collect(); - trie.put_batch(pre_state_root, last_state_root, key_values); - store.trie_cache = Arc::new(trie); - for block in update_batch.blocks { // store block let number = block.header.number; From 66737f193856862d4d697237f5dc5d543ea01cdb Mon Sep 17 00:00:00 2001 From: Esteve Soler Arderiu Date: Tue, 18 Nov 2025 16:08:58 +0100 Subject: [PATCH 3/5] Fix `crates/storage/store_db/rocksdb.rs`. --- crates/storage/store_db/rocksdb.rs | 65 ++++++++++++------------------ 1 file changed, 26 insertions(+), 39 deletions(-) diff --git a/crates/storage/store_db/rocksdb.rs b/crates/storage/store_db/rocksdb.rs index 4792bd446fd..44a0b37ab21 100644 --- a/crates/storage/store_db/rocksdb.rs +++ b/crates/storage/store_db/rocksdb.rs @@ -42,9 +42,6 @@ use ethrex_rlp::{ }; use std::fmt::Debug; -// TODO: use finalized hash to determine when to commit -const COMMIT_THRESHOLD: usize = 128; - /// Canonical block hashes column family: [`u8;_`] => [`Vec`] /// - [`u8;_`] = `block_number.to_le_bytes()` /// - [`Vec`] = `BlockHashRLP::from(block_hash).bytes().clone()` @@ -447,20 +444,14 @@ impl Store { match rx.recv() { Ok(( notify, - parent_state_root, - child_state_root, + _parent_state_root, + _child_state_root, account_updates, storage_updates, )) => { // FIXME: what should we do on error? let _ = store_clone - .apply_trie_updates( - notify, - parent_state_root, - child_state_root, - account_updates, - storage_updates, - ) + .apply_trie_updates(notify, account_updates, storage_updates) .inspect_err(|err| error!("apply_trie_updates failed: {err}")); } Err(err) => error!("Error while reading diff layer: {err}"), @@ -743,8 +734,6 @@ impl Store { fn apply_trie_updates( &self, notify: SyncSender>, - parent_state_root: H256, - child_state_root: H256, account_updates: Vec<(Nibbles, Vec)>, storage_updates: StorageUpdates, ) -> Result<(), StoreError> { @@ -753,38 +742,38 @@ impl Store { let trie_cache = &self.trie_cache; // Phase 1: update the in-memory diff-layers only, then notify block production. - let new_layer = storage_updates - .into_iter() - .flat_map(|(account_hash, nodes)| { - nodes + let commit_data; + { + let mut lock = trie_cache.lock().map_err(|_| StoreError::LockError)?; + let trie = Arc::make_mut(&mut *lock); + commit_data = trie.commit(); + + trie.put_iter( + storage_updates .into_iter() - .map(move |(path, node)| (apply_prefix(Some(account_hash), path), node)) - }) - .chain(account_updates) - .collect(); - // Read-Copy-Update the trie cache with a new layer. - let trie = trie_cache - .lock() - .map_err(|_| StoreError::LockError)? - .clone(); - let mut trie_mut = (*trie).clone(); - trie_mut.put_batch(parent_state_root, child_state_root, new_layer); - let trie = Arc::new(trie_mut); - *trie_cache.lock().map_err(|_| StoreError::LockError)? = trie.clone(); + .flat_map(|(account_hash, nodes)| { + nodes + .into_iter() + .map(move |(path, node)| (apply_prefix(Some(account_hash), path), node)) + }) + .chain(account_updates) + .map(|(key, value)| (key.into_vec(), value)), + ); + } + // Update finished, signal block processing. notify.send(Ok(())).map_err(|_| StoreError::LockError)?; // Phase 2: update disk layer. - let Some(root) = trie.get_commitable(parent_state_root, COMMIT_THRESHOLD) else { - // Nothing to commit to disk, move on. + if commit_data.is_empty() { return Ok(()); - }; + } + // Stop the flat-key-value generator thread, as the underlying trie is about to change. // Ignore the error, if the channel is closed it means there is no worker to notify. let _ = fkv_ctl.send(FKVGeneratorControlMessage::Stop); // RCU to remove the bottom layer: update step needs to happen after disk layer is updated. - let mut trie_mut = (*trie).clone(); let mut batch = WriteBatch::default(); let [ cf_accounts_trie_nodes, @@ -809,8 +798,7 @@ impl Store { // the account address (32 bytes) + storage path (up to 32 bytes). // Commit removes the bottom layer and returns it, this is the mutation step. - let nodes = trie_mut.commit(root).unwrap_or_default(); - for (key, value) in nodes { + for (key, value) in commit_data { let is_leaf = key.len() == 65 || key.len() == 131; let is_account = key.len() <= 65; @@ -838,8 +826,7 @@ impl Store { // We want to send this message even if there was an error during the batch write let _ = fkv_ctl.send(FKVGeneratorControlMessage::Continue); result?; - // Phase 3: update diff layers with the removal of bottom layer. - *trie_cache.lock().map_err(|_| StoreError::LockError)? = Arc::new(trie_mut); + Ok(()) } From 23acb7aa3a92904b23a2143420ee7685595904d8 Mon Sep 17 00:00:00 2001 From: Esteve Soler Arderiu Date: Thu, 20 Nov 2025 12:16:24 +0100 Subject: [PATCH 4/5] Make data references shared. --- crates/storage/store_db/in_memory.rs | 7 ++++--- crates/storage/store_db/rocksdb.rs | 4 ++-- crates/storage/trie_db/layering.rs | 10 +++++----- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/crates/storage/store_db/in_memory.rs b/crates/storage/store_db/in_memory.rs index c52612c2ca9..56c422fd59a 100644 --- a/crates/storage/store_db/in_memory.rs +++ b/crates/storage/store_db/in_memory.rs @@ -105,7 +105,7 @@ impl StoreEngine for Store { .map(move |(path, node)| (apply_prefix(Some(account_hash), path), node)) }) .chain(update_batch.account_updates) - .map(|(key, value)| (key.into_vec(), value)), + .map(|(key, value)| (key.into_vec().into(), value.into())), ); nodes @@ -118,9 +118,10 @@ impl StoreEngine for Store { .map_err(|_| StoreError::LockError)?; for (key, value) in nodes { if value.is_empty() { - state_trie.remove(&key); + state_trie.remove(&*key); } else { - state_trie.insert(key, value); + // TODO: Maybe adapt `state_trie` to share references. + state_trie.insert(key.to_vec(), value.to_vec()); } } } diff --git a/crates/storage/store_db/rocksdb.rs b/crates/storage/store_db/rocksdb.rs index df4693a1096..edb4cb3b2c1 100644 --- a/crates/storage/store_db/rocksdb.rs +++ b/crates/storage/store_db/rocksdb.rs @@ -757,7 +757,7 @@ impl Store { .map(move |(path, node)| (apply_prefix(Some(account_hash), path), node)) }) .chain(account_updates) - .map(|(key, value)| (key.into_vec(), value)), + .map(|(key, value)| (key.into_vec().into(), value.into())), ); } @@ -802,7 +802,7 @@ impl Store { let is_leaf = key.len() == 65 || key.len() == 131; let is_account = key.len() <= 65; - if is_leaf && key > last_written { + if is_leaf && &*key > last_written.as_slice() { continue; } let cf = if is_leaf { diff --git a/crates/storage/trie_db/layering.rs b/crates/storage/trie_db/layering.rs index e4945187d11..9e4bbe05a4a 100644 --- a/crates/storage/trie_db/layering.rs +++ b/crates/storage/trie_db/layering.rs @@ -6,20 +6,20 @@ use std::{collections::hash_map::Entry, mem, sync::Arc}; #[derive(Clone, Debug, Default)] pub struct TrieLayerCache { /// Mapping from keys to entries (from all layers). - data: FxHashMap, TrieLayerCacheEntry>>, + data: FxHashMap, TrieLayerCacheEntry>>, } impl TrieLayerCache { /// Obtain the cached value from any layer given its key. pub fn get(&self, key: &[u8]) -> Option<&[u8]> { - self.data.get(key).map(|entry| entry.value.as_slice()) + self.data.get(key).map(|entry| &*entry.value) } /// Write a batch of items into the cache at the last layer. /// /// Items that were already present in that layer will be overwritten. Use /// [`TrieLayerCache::commit`] to advance the layers before putting items into the new layer. - pub fn put_iter(&mut self, iter: impl IntoIterator, Vec)>) { + pub fn put_iter(&mut self, iter: impl IntoIterator, Arc<[u8]>)>) { for (key, value) in iter { match self.data.entry(key) { Entry::Occupied(entry) => { @@ -48,13 +48,13 @@ impl TrieLayerCache { /// If there are not yet 128 layers, it'll return an empty iterator. /// Dropping the iterator will not leave the cache in an inconsistent state. All remaining items /// will be dropped. - pub fn commit(&mut self) -> Vec<(Vec, Vec)> { + pub fn commit(&mut self) -> Vec<(Arc<[u8]>, Arc<[u8]>)> { let mut items = Vec::new(); self.data.retain(|key, entry| { entry.layers <<= 1; if entry.layers == 0 { items.push(( - key.clone(), + Arc::clone(key), if entry.previous.is_empty() { mem::take(&mut entry.value) } else { From 49e850534a84e657f3ddccafd96c9d25ccf1cab2 Mon Sep 17 00:00:00 2001 From: Esteve Soler Arderiu Date: Mon, 24 Nov 2025 13:54:45 +0100 Subject: [PATCH 5/5] Fix normal make test. --- crates/storage/store_db/in_memory.rs | 46 ++++++++++++++++--- crates/storage/store_db/rocksdb.rs | 41 ++++++++++++----- crates/storage/trie_db/layering.rs | 68 +++++++++++++++------------- 3 files changed, 106 insertions(+), 49 deletions(-) diff --git a/crates/storage/store_db/in_memory.rs b/crates/storage/store_db/in_memory.rs index 56c422fd59a..c95969095b8 100644 --- a/crates/storage/store_db/in_memory.rs +++ b/crates/storage/store_db/in_memory.rs @@ -39,7 +39,7 @@ pub struct StoreInner { // Maps transaction hashes to their blocks (height+hash) and index within the blocks. transaction_locations: HashMap>, receipts: HashMap>, - trie_cache: Arc, + trie_cache: Mutex>>, // Contains account trie nodes state_trie_nodes: NodeMap, pending_blocks: HashMap, @@ -91,11 +91,22 @@ impl StoreEngine for Store { let mut store = self.inner()?; // Store trie updates + let block_header = &update_batch + .blocks + .first() + .ok_or(StoreError::UpdateBatchNoBlocks)? + .header; + let state_root = store + .headers + .get(&block_header.parent_hash) + .map(|header| header.state_root) + .unwrap_or_default(); + let nodes = { - let trie = Arc::make_mut(&mut store.trie_cache); + let trie_caches = store.trie_cache.get_mut().unwrap(); - let nodes = trie.commit(); - trie.put_iter( + let mut trie_cache = Arc::clone(trie_caches.get_mut(&state_root).unwrap()); + let nodes = trie_cache.commit_and_put_iter( update_batch .storage_updates .into_iter() @@ -107,6 +118,15 @@ impl StoreEngine for Store { .chain(update_batch.account_updates) .map(|(key, value)| (key.into_vec().into(), value.into())), ); + trie_caches.insert( + update_batch + .blocks + .last() + .ok_or(StoreError::UpdateBatchNoBlocks)? + .header + .state_root, + trie_cache, + ); nodes }; @@ -423,7 +443,14 @@ impl StoreEngine for Store { let db = Box::new(InMemoryTrieDB::new(trie_backend)); let wrap_db = Box::new(TrieWrapper { state_root, - inner: store.trie_cache.clone(), + inner: Arc::clone( + store + .trie_cache + .lock() + .unwrap() + .entry(state_root) + .or_insert_with(|| Arc::new(TrieLayerCache::default())), + ), db, prefix: Some(hashed_address), }); @@ -436,7 +463,14 @@ impl StoreEngine for Store { let db = Box::new(InMemoryTrieDB::new(trie_backend)); let wrap_db = Box::new(TrieWrapper { state_root, - inner: store.trie_cache.clone(), + inner: Arc::clone( + store + .trie_cache + .lock() + .unwrap() + .entry(state_root) + .or_insert_with(|| Arc::new(TrieLayerCache::default())), + ), db, prefix: None, }); diff --git a/crates/storage/store_db/rocksdb.rs b/crates/storage/store_db/rocksdb.rs index edb4cb3b2c1..45ca8afbe1a 100644 --- a/crates/storage/store_db/rocksdb.rs +++ b/crates/storage/store_db/rocksdb.rs @@ -19,7 +19,7 @@ use rocksdb::{ Options, WriteBatch, checkpoint::Checkpoint, }; use std::{ - collections::HashSet, + collections::{HashMap, HashSet}, path::{Path, PathBuf}, sync::{ Arc, Mutex, @@ -149,7 +149,7 @@ enum FKVGeneratorControlMessage { #[derive(Debug, Clone)] pub struct Store { db: Arc>, - trie_cache: Arc>>, + trie_cache: Arc>>>, flatkeyvalue_control_tx: std::sync::mpsc::SyncSender, trie_update_worker_tx: TriedUpdateWorkerTx, last_computed_flatkeyvalue: Arc>>, @@ -444,14 +444,20 @@ impl Store { match rx.recv() { Ok(( notify, - _parent_state_root, - _child_state_root, + parent_state_root, + child_state_root, account_updates, storage_updates, )) => { // FIXME: what should we do on error? let _ = store_clone - .apply_trie_updates(notify, account_updates, storage_updates) + .apply_trie_updates( + parent_state_root, + child_state_root, + notify, + account_updates, + storage_updates, + ) .inspect_err(|err| error!("apply_trie_updates failed: {err}")); } Err(err) => error!("Error while reading diff layer: {err}"), @@ -733,6 +739,8 @@ impl Store { fn apply_trie_updates( &self, + prev_state_root: H256, + curr_state_root: H256, notify: SyncSender>, account_updates: Vec<(Nibbles, Vec)>, storage_updates: StorageUpdates, @@ -742,13 +750,11 @@ impl Store { let trie_cache = &self.trie_cache; // Phase 1: update the in-memory diff-layers only, then notify block production. - let commit_data; - { - let mut lock = trie_cache.lock().map_err(|_| StoreError::LockError)?; - let trie = Arc::make_mut(&mut *lock); - commit_data = trie.commit(); + let commit_data = { + let mut trie_caches = trie_cache.lock().map_err(|_| StoreError::LockError)?; - trie.put_iter( + let mut trie_cache = Arc::clone(trie_caches.get(&prev_state_root).unwrap()); + let commit_data = trie_cache.commit_and_put_iter( storage_updates .into_iter() .flat_map(|(account_hash, nodes)| { @@ -759,7 +765,10 @@ impl Store { .chain(account_updates) .map(|(key, value)| (key.into_vec().into(), value.into())), ); - } + trie_caches.insert(curr_state_root, trie_cache); + + commit_data + }; // Update finished, signal block processing. notify.send(Ok(())).map_err(|_| StoreError::LockError)?; @@ -1534,6 +1543,8 @@ impl StoreEngine for Store { .trie_cache .lock() .map_err(|_| StoreError::LockError)? + .entry(state_root) + .or_default() .clone(), db, prefix: Some(hashed_address), @@ -1556,6 +1567,8 @@ impl StoreEngine for Store { .trie_cache .lock() .map_err(|_| StoreError::LockError)? + .entry(state_root) + .or_default() .clone(), db, prefix: None, @@ -1603,6 +1616,8 @@ impl StoreEngine for Store { .trie_cache .lock() .map_err(|_| StoreError::LockError)? + .entry(state_root) + .or_default() .clone(), db, prefix: None, @@ -1629,6 +1644,8 @@ impl StoreEngine for Store { .trie_cache .lock() .map_err(|_| StoreError::LockError)? + .entry(state_root) + .or_default() .clone(), db, prefix: Some(hashed_address), diff --git a/crates/storage/trie_db/layering.rs b/crates/storage/trie_db/layering.rs index 9e4bbe05a4a..87f5c8e925b 100644 --- a/crates/storage/trie_db/layering.rs +++ b/crates/storage/trie_db/layering.rs @@ -15,13 +15,45 @@ impl TrieLayerCache { self.data.get(key).map(|entry| &*entry.value) } - /// Write a batch of items into the cache at the last layer. + // /// Write a batch of items into the cache at the last layer. + // /// + // /// Items that were already present in that layer will be overwritten. Use + // /// [`TrieLayerCache::commit`] to advance the layers before putting items into the new layer. + // pub fn put_iter(&mut self, iter: impl IntoIterator, Arc<[u8]>)>) { + // } + + /// Return an iterator to extract the elements of the 128th layer. /// - /// Items that were already present in that layer will be overwritten. Use - /// [`TrieLayerCache::commit`] to advance the layers before putting items into the new layer. - pub fn put_iter(&mut self, iter: impl IntoIterator, Arc<[u8]>)>) { + /// If there are not yet 128 layers, it'll return an empty vec. + pub fn commit_and_put_iter( + self: &mut Arc, + iter: impl IntoIterator, Arc<[u8]>)>, + ) -> Vec<(Arc<[u8]>, Arc<[u8]>)> { + let self_mut = Arc::make_mut(self); + + // Commit last layer. + let mut items = Vec::new(); + self_mut.data.retain(|key, entry| { + entry.layers <<= 1; + if entry.layers == 0 { + items.push(( + Arc::clone(key), + if entry.previous.is_empty() { + mem::take(&mut entry.value) + } else { + entry.previous.remove(0) + }, + )); + + true + } else { + false + } + }); + + // Put items. for (key, value) in iter { - match self.data.entry(key) { + match self_mut.data.entry(key) { Entry::Occupied(entry) => { let entry = entry.into_mut(); @@ -41,32 +73,6 @@ impl TrieLayerCache { } } } - } - - /// Return an iterator to extract the elements of the 128th layer. - /// - /// If there are not yet 128 layers, it'll return an empty iterator. - /// Dropping the iterator will not leave the cache in an inconsistent state. All remaining items - /// will be dropped. - pub fn commit(&mut self) -> Vec<(Arc<[u8]>, Arc<[u8]>)> { - let mut items = Vec::new(); - self.data.retain(|key, entry| { - entry.layers <<= 1; - if entry.layers == 0 { - items.push(( - Arc::clone(key), - if entry.previous.is_empty() { - mem::take(&mut entry.value) - } else { - entry.previous.remove(0) - }, - )); - - true - } else { - false - } - }); items }