diff --git a/crates/storage/store_db/in_memory.rs b/crates/storage/store_db/in_memory.rs index eba25d5ab22..c95969095b8 100644 --- a/crates/storage/store_db/in_memory.rs +++ b/crates/storage/store_db/in_memory.rs @@ -39,7 +39,7 @@ pub struct StoreInner { // Maps transaction hashes to their blocks (height+hash) and index within the blocks. transaction_locations: HashMap>, receipts: HashMap>, - trie_cache: Arc, + trie_cache: Mutex>>, // Contains account trie nodes state_trie_nodes: NodeMap, pending_blocks: HashMap, @@ -91,58 +91,61 @@ impl StoreEngine for Store { let mut store = self.inner()?; // Store trie updates - let mut trie = TrieLayerCache::clone(&store.trie_cache); - let parent = update_batch + let block_header = &update_batch .blocks .first() .ok_or(StoreError::UpdateBatchNoBlocks)? - .header - .parent_hash; - - let pre_state_root = store + .header; + let state_root = store .headers - .get(&parent) + .get(&block_header.parent_hash) .map(|header| header.state_root) .unwrap_or_default(); - let last_state_root = update_batch - .blocks - .last() - .ok_or(StoreError::UpdateBatchNoBlocks)? - .header - .state_root; + let nodes = { + let trie_caches = store.trie_cache.get_mut().unwrap(); + + let mut trie_cache = Arc::clone(trie_caches.get_mut(&state_root).unwrap()); + let nodes = trie_cache.commit_and_put_iter( + update_batch + .storage_updates + .into_iter() + .flat_map(|(account_hash, nodes)| { + nodes + .into_iter() + .map(move |(path, node)| (apply_prefix(Some(account_hash), path), node)) + }) + .chain(update_batch.account_updates) + .map(|(key, value)| (key.into_vec().into(), value.into())), + ); + trie_caches.insert( + update_batch + .blocks + .last() + .ok_or(StoreError::UpdateBatchNoBlocks)? + .header + .state_root, + trie_cache, + ); + + nodes + }; { let mut state_trie = store .state_trie_nodes .lock() .map_err(|_| StoreError::LockError)?; - - if let Some(root) = trie.get_commitable(pre_state_root, COMMIT_THRESHOLD) { - let nodes = trie.commit(root).unwrap_or_default(); - for (key, value) in nodes { - if value.is_empty() { - state_trie.remove(&key); - } else { - state_trie.insert(key, value); - } + for (key, value) in nodes { + if value.is_empty() { + state_trie.remove(&*key); + } else { + // TODO: Maybe adapt `state_trie` to share references. + state_trie.insert(key.to_vec(), value.to_vec()); } } } - let key_values = update_batch - .storage_updates - .into_iter() - .flat_map(|(account_hash, nodes)| { - nodes - .into_iter() - .map(move |(path, node)| (apply_prefix(Some(account_hash), path), node)) - }) - .chain(update_batch.account_updates) - .collect(); - trie.put_batch(pre_state_root, last_state_root, key_values); - store.trie_cache = Arc::new(trie); - for block in update_batch.blocks { // store block let number = block.header.number; @@ -440,7 +443,14 @@ impl StoreEngine for Store { let db = Box::new(InMemoryTrieDB::new(trie_backend)); let wrap_db = Box::new(TrieWrapper { state_root, - inner: store.trie_cache.clone(), + inner: Arc::clone( + store + .trie_cache + .lock() + .unwrap() + .entry(state_root) + .or_insert_with(|| Arc::new(TrieLayerCache::default())), + ), db, prefix: Some(hashed_address), }); @@ -453,7 +463,14 @@ impl StoreEngine for Store { let db = Box::new(InMemoryTrieDB::new(trie_backend)); let wrap_db = Box::new(TrieWrapper { state_root, - inner: store.trie_cache.clone(), + inner: Arc::clone( + store + .trie_cache + .lock() + .unwrap() + .entry(state_root) + .or_insert_with(|| Arc::new(TrieLayerCache::default())), + ), db, prefix: None, }); diff --git a/crates/storage/store_db/rocksdb.rs b/crates/storage/store_db/rocksdb.rs index e041ef203db..45ca8afbe1a 100644 --- a/crates/storage/store_db/rocksdb.rs +++ b/crates/storage/store_db/rocksdb.rs @@ -19,7 +19,7 @@ use rocksdb::{ Options, WriteBatch, checkpoint::Checkpoint, }; use std::{ - collections::HashSet, + collections::{HashMap, HashSet}, path::{Path, PathBuf}, sync::{ Arc, Mutex, @@ -42,9 +42,6 @@ use ethrex_rlp::{ }; use std::fmt::Debug; -// TODO: use finalized hash to determine when to commit -const COMMIT_THRESHOLD: usize = 128; - /// Canonical block hashes column family: [`u8;_`] => [`Vec`] /// - [`u8;_`] = `block_number.to_le_bytes()` /// - [`Vec`] = `BlockHashRLP::from(block_hash).bytes().clone()` @@ -152,7 +149,7 @@ enum FKVGeneratorControlMessage { #[derive(Debug, Clone)] pub struct Store { db: Arc>, - trie_cache: Arc>>, + trie_cache: Arc>>>, flatkeyvalue_control_tx: std::sync::mpsc::SyncSender, trie_update_worker_tx: TriedUpdateWorkerTx, last_computed_flatkeyvalue: Arc>>, @@ -455,9 +452,9 @@ impl Store { // FIXME: what should we do on error? let _ = store_clone .apply_trie_updates( - notify, parent_state_root, child_state_root, + notify, account_updates, storage_updates, ) @@ -742,9 +739,9 @@ impl Store { fn apply_trie_updates( &self, + prev_state_root: H256, + curr_state_root: H256, notify: SyncSender>, - parent_state_root: H256, - child_state_root: H256, account_updates: Vec<(Nibbles, Vec)>, storage_updates: StorageUpdates, ) -> Result<(), StoreError> { @@ -753,38 +750,39 @@ impl Store { let trie_cache = &self.trie_cache; // Phase 1: update the in-memory diff-layers only, then notify block production. - let new_layer = storage_updates - .into_iter() - .flat_map(|(account_hash, nodes)| { - nodes + let commit_data = { + let mut trie_caches = trie_cache.lock().map_err(|_| StoreError::LockError)?; + + let mut trie_cache = Arc::clone(trie_caches.get(&prev_state_root).unwrap()); + let commit_data = trie_cache.commit_and_put_iter( + storage_updates .into_iter() - .map(move |(path, node)| (apply_prefix(Some(account_hash), path), node)) - }) - .chain(account_updates) - .collect(); - // Read-Copy-Update the trie cache with a new layer. - let trie = trie_cache - .lock() - .map_err(|_| StoreError::LockError)? - .clone(); - let mut trie_mut = (*trie).clone(); - trie_mut.put_batch(parent_state_root, child_state_root, new_layer); - let trie = Arc::new(trie_mut); - *trie_cache.lock().map_err(|_| StoreError::LockError)? = trie.clone(); + .flat_map(|(account_hash, nodes)| { + nodes + .into_iter() + .map(move |(path, node)| (apply_prefix(Some(account_hash), path), node)) + }) + .chain(account_updates) + .map(|(key, value)| (key.into_vec().into(), value.into())), + ); + trie_caches.insert(curr_state_root, trie_cache); + + commit_data + }; + // Update finished, signal block processing. notify.send(Ok(())).map_err(|_| StoreError::LockError)?; // Phase 2: update disk layer. - let Some(root) = trie.get_commitable(parent_state_root, COMMIT_THRESHOLD) else { - // Nothing to commit to disk, move on. + if commit_data.is_empty() { return Ok(()); - }; + } + // Stop the flat-key-value generator thread, as the underlying trie is about to change. // Ignore the error, if the channel is closed it means there is no worker to notify. let _ = fkv_ctl.send(FKVGeneratorControlMessage::Stop); // RCU to remove the bottom layer: update step needs to happen after disk layer is updated. - let mut trie_mut = (*trie).clone(); let mut batch = WriteBatch::default(); let [ cf_accounts_trie_nodes, @@ -809,12 +807,11 @@ impl Store { // the account address (32 bytes) + storage path (up to 32 bytes). // Commit removes the bottom layer and returns it, this is the mutation step. - let nodes = trie_mut.commit(root).unwrap_or_default(); - for (key, value) in nodes { + for (key, value) in commit_data { let is_leaf = key.len() == 65 || key.len() == 131; let is_account = key.len() <= 65; - if is_leaf && key > last_written { + if is_leaf && &*key > last_written.as_slice() { continue; } let cf = if is_leaf { @@ -838,8 +835,7 @@ impl Store { // We want to send this message even if there was an error during the batch write let _ = fkv_ctl.send(FKVGeneratorControlMessage::Continue); result?; - // Phase 3: update diff layers with the removal of bottom layer. - *trie_cache.lock().map_err(|_| StoreError::LockError)? = Arc::new(trie_mut); + Ok(()) } @@ -1547,6 +1543,8 @@ impl StoreEngine for Store { .trie_cache .lock() .map_err(|_| StoreError::LockError)? + .entry(state_root) + .or_default() .clone(), db, prefix: Some(hashed_address), @@ -1569,6 +1567,8 @@ impl StoreEngine for Store { .trie_cache .lock() .map_err(|_| StoreError::LockError)? + .entry(state_root) + .or_default() .clone(), db, prefix: None, @@ -1616,6 +1616,8 @@ impl StoreEngine for Store { .trie_cache .lock() .map_err(|_| StoreError::LockError)? + .entry(state_root) + .or_default() .clone(), db, prefix: None, @@ -1642,6 +1644,8 @@ impl StoreEngine for Store { .trie_cache .lock() .map_err(|_| StoreError::LockError)? + .entry(state_root) + .or_default() .clone(), db, prefix: Some(hashed_address), diff --git a/crates/storage/trie_db/layering.rs b/crates/storage/trie_db/layering.rs index a1e9e2b227e..87f5c8e925b 100644 --- a/crates/storage/trie_db/layering.rs +++ b/crates/storage/trie_db/layering.rs @@ -1,181 +1,88 @@ use ethrex_common::H256; -use rustc_hash::FxHashMap; -use std::sync::Arc; - use ethrex_trie::{Nibbles, TrieDB, TrieError}; +use rustc_hash::FxHashMap; +use std::{collections::hash_map::Entry, mem, sync::Arc}; -#[derive(Debug, Clone)] -struct TrieLayer { - nodes: FxHashMap, Vec>, - parent: H256, - id: usize, -} - -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub struct TrieLayerCache { - /// Monotonically increasing ID for layers, starting at 1. - /// TODO: this implementation panics on overflow - last_id: usize, - layers: FxHashMap>, - /// Global bloom that accrues all layer blooms. - /// - /// The bloom filter is used to avoid looking up all layers when the given path doesn't exist in any - /// layer, thus going directly to the database. - /// - /// In case a bloom filter insert or merge fails, we need to mark the bloom filter as poisoned - /// so we never use it again, because if we don't we may be misled into believing a key is not present - /// on a diff layer when it is (i.e. a false negative), leading to wrong executions. - bloom: Option, -} - -impl Default for TrieLayerCache { - fn default() -> Self { - // Try to create the bloom filter, if it fails use poison mode. - let bloom = Self::create_filter().ok(); - Self { - bloom, - last_id: 0, - layers: Default::default(), - } - } + /// Mapping from keys to entries (from all layers). + data: FxHashMap, TrieLayerCacheEntry>>, } impl TrieLayerCache { - // TODO: tune this - fn create_filter() -> Result { - qfilter::Filter::new_resizeable(1_000_000, 100_000_000, 0.02) - .inspect_err(|e| tracing::warn!("could not create trie layering bloom filter {e}")) + /// Obtain the cached value from any layer given its key. + pub fn get(&self, key: &[u8]) -> Option<&[u8]> { + self.data.get(key).map(|entry| &*entry.value) } - pub fn get(&self, state_root: H256, key: &[u8]) -> Option> { - // Fast check to know if any layer may contains the given key. - // We can only be certain it doesn't exist, but if it returns true it may or not exist (false positive). - if let Some(filter) = &self.bloom - && !filter.contains(key) - { - // TrieWrapper goes to db when returning None. - return None; - } - - let mut current_state_root = state_root; + // /// Write a batch of items into the cache at the last layer. + // /// + // /// Items that were already present in that layer will be overwritten. Use + // /// [`TrieLayerCache::commit`] to advance the layers before putting items into the new layer. + // pub fn put_iter(&mut self, iter: impl IntoIterator, Arc<[u8]>)>) { + // } - while let Some(layer) = self.layers.get(¤t_state_root) { - if let Some(value) = layer.nodes.get(key) { - return Some(value.clone()); - } - current_state_root = layer.parent; - if current_state_root == state_root { - // TODO: check if this is possible in practice - // This can't happen in L1, due to system contracts irreversibly modifying state - // at each block. - // On L2, if no transactions are included in a block, the state root remains the same, - // but we handle that case in put_batch. It may happen, however, if someone modifies - // state with a privileged tx and later reverts it (since it doesn't update nonce). - panic!("State cycle found"); + /// Return an iterator to extract the elements of the 128th layer. + /// + /// If there are not yet 128 layers, it'll return an empty vec. + pub fn commit_and_put_iter( + self: &mut Arc, + iter: impl IntoIterator, Arc<[u8]>)>, + ) -> Vec<(Arc<[u8]>, Arc<[u8]>)> { + let self_mut = Arc::make_mut(self); + + // Commit last layer. + let mut items = Vec::new(); + self_mut.data.retain(|key, entry| { + entry.layers <<= 1; + if entry.layers == 0 { + items.push(( + Arc::clone(key), + if entry.previous.is_empty() { + mem::take(&mut entry.value) + } else { + entry.previous.remove(0) + }, + )); + + true + } else { + false } - } - None - } + }); - // TODO: use finalized hash to know when to commit - pub fn get_commitable(&self, mut state_root: H256, commit_threshold: usize) -> Option { - let mut counter = 0; - while let Some(layer) = self.layers.get(&state_root) { - state_root = layer.parent; - counter += 1; - if counter > commit_threshold { - return Some(state_root); - } - } - None - } + // Put items. + for (key, value) in iter { + match self_mut.data.entry(key) { + Entry::Occupied(entry) => { + let entry = entry.into_mut(); - pub fn put_batch( - &mut self, - parent: H256, - state_root: H256, - key_values: Vec<(Nibbles, Vec)>, - ) { - if parent == state_root && key_values.is_empty() { - return; - } else if parent == state_root { - tracing::error!("Inconsistent state: parent == state_root but key_values not empty"); - return; - } - if self.layers.contains_key(&state_root) { - tracing::warn!("tried to insert a state_root that's already inserted"); - return; - } + let prev_value = mem::replace(&mut entry.value, value); + if entry.layers & 1 == 0 { + entry.previous.push(prev_value); + } - // add this new bloom to the global one. - if let Some(filter) = &mut self.bloom { - for (p, _) in &key_values { - if let Err(qfilter::Error::CapacityExceeded) = filter.insert(p.as_ref()) { - tracing::warn!("TrieLayerCache: put_batch capacity exceeded"); - self.bloom = None; - break; + entry.layers |= 1; } - } - } - - let nodes: FxHashMap, Vec> = key_values - .into_iter() - .map(|(path, value)| (path.into_vec(), value)) - .collect(); - - self.last_id += 1; - let entry = TrieLayer { - nodes, - parent, - id: self.last_id, - }; - self.layers.insert(state_root, Arc::new(entry)); - } - - /// Rebuilds the global bloom filter by inserting all keys from all layers. - pub fn rebuild_bloom(&mut self) { - let Ok(mut new_global_filter) = Self::create_filter() else { - tracing::warn!( - "TrieLayerCache: rebuild_bloom could not create new filter. Poisoning bloom." - ); - self.bloom = None; - return; - }; - - for layer in self.layers.values() { - for path in layer.nodes.keys() { - if let Err(qfilter::Error::CapacityExceeded) = new_global_filter.insert(path) { - tracing::warn!( - "TrieLayerCache: rebuild_bloom capacity exceeded. Poisoning bloom." - ); - self.bloom = None; - return; + Entry::Vacant(entry) => { + entry.insert(TrieLayerCacheEntry { + value, + layers: 1u128, + previous: Vec::new(), + }); } } } - self.bloom = Some(new_global_filter); + items } +} - pub fn commit(&mut self, state_root: H256) -> Option, Vec)>> { - let mut layers_to_commit = vec![]; - let mut current_state_root = state_root; - while let Some(layer) = self.layers.remove(¤t_state_root) { - let layer = Arc::unwrap_or_clone(layer); - current_state_root = layer.parent; - layers_to_commit.push(layer); - } - let top_layer_id = layers_to_commit.first()?.id; - // older layers are useless - self.layers.retain(|_, item| item.id > top_layer_id); - self.rebuild_bloom(); // layers removed, rebuild global bloom filter. - let nodes_to_commit = layers_to_commit - .into_iter() - .rev() - .flat_map(|layer| layer.nodes) - .collect(); - Some(nodes_to_commit) - } +#[derive(Clone, Debug)] +struct TrieLayerCacheEntry { + value: T, + layers: u128, + previous: Vec, } pub struct TrieWrapper { @@ -203,8 +110,8 @@ impl TrieDB for TrieWrapper { } fn get(&self, key: Nibbles) -> Result>, TrieError> { let key = apply_prefix(self.prefix, key); - if let Some(value) = self.inner.get(self.state_root, key.as_ref()) { - return Ok(Some(value)); + if let Some(value) = self.inner.get(key.as_ref()) { + return Ok(Some(value.to_vec())); } self.db.get(key) }