Skip to content
Merged
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Perf

### 2025-11-19

- Parallelize merkleization [#5377](https://github.com/lambdaclass/ethrex/pull/5377)

### 2025-11-17

- Avoid temporary allocations when decoding and hashing trie nodes [#5353](https://github.com/lambdaclass/ethrex/pull/5353)
Expand Down
198 changes: 190 additions & 8 deletions crates/blockchain/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use ethrex_common::types::{
};
use ethrex_common::types::{ELASTICITY_MULTIPLIER, P2PTransaction};
use ethrex_common::types::{Fork, MempoolTransaction};
use ethrex_common::{Address, H256, TrieLogger};
use ethrex_common::utils::keccak;
use ethrex_common::{Address, H160, H256, TrieLogger};
use ethrex_metrics::metrics;
use ethrex_rlp::decode::RLPDecode;
use ethrex_rlp::encode::RLPEncode;
Expand Down Expand Up @@ -109,6 +110,12 @@ impl Default for BlockchainOptions {
}
}

struct PartialMerkleizationResults {
state_updates: FxHashMap<Nibbles, Vec<u8>>,
storage_updates: StoreUpdatesMap,
code_updates: FxHashMap<H256, Code>,
}

#[derive(Debug, Clone)]
pub struct BatchBlockProcessingFailure {
pub last_valid_hash: H256,
Expand Down Expand Up @@ -242,12 +249,14 @@ impl Blockchain {
Ok((execution_result, exec_end_instant))
})
.expect("Failed to spawn block_executor exec thread");
let parent_header_ref = &parent_header; // Avoid moving to thread
let merkleize_handle = std::thread::Builder::new()
.name("block_executor_merkleizer".to_string())
.spawn_scoped(s, move || -> Result<_, StoreError> {
let account_updates_list = self.handle_merkleization(
s,
rx,
&parent_header,
parent_header_ref,
queue_length_ref,
max_queue_length_ref,
)?;
Expand Down Expand Up @@ -285,13 +294,175 @@ impl Blockchain {
))
}

fn handle_merkleization_subtrie(
&self,
rx: Receiver<Vec<(H256, AccountUpdate)>>,
parent_header: &BlockHeader,
) -> Result<PartialMerkleizationResults, StoreError> {
let mut state_trie = self
.storage
.state_trie(parent_header.hash())?
.ok_or(StoreError::MissingStore)?;
let mut state_updates_map: FxHashMap<Nibbles, Vec<u8>> = Default::default();
let mut storage_updates_map: StoreUpdatesMap = Default::default();
let mut code_updates: FxHashMap<H256, Code> = Default::default();
let mut account_states: FxHashMap<H256, AccountState> = Default::default();
for updates in rx {
Self::process_incoming_update_message(
&self.storage,
&mut state_trie,
updates,
&mut storage_updates_map,
parent_header,
&mut state_updates_map,
&mut code_updates,
&mut account_states,
)?;
}

Ok(PartialMerkleizationResults {
state_updates: state_updates_map,
storage_updates: storage_updates_map,
code_updates,
})
}

#[instrument(
level = "trace",
name = "Trie update",
skip_all,
fields(namespace = "block_execution")
)]
fn handle_merkleization(
fn handle_merkleization<'a, 's, 'b>(
&'a self,
scope: &'s std::thread::Scope<'s, '_>,
rx: Receiver<Vec<AccountUpdate>>,
parent_header: &'b BlockHeader,
queue_length: &AtomicUsize,
max_queue_length: &mut usize,
) -> Result<AccountUpdatesList, StoreError>
where
'a: 's,
'b: 's,
{
// Fetch the old root from the DB and decode it
let old_root_opt = self
.storage
.state_trie(parent_header.hash())?
.ok_or(StoreError::MissingStore)?
.db()
.get(Nibbles::default())?
.map(|v| Node::decode(&v))
.transpose()?;

// If there's no root, or it's not a branch node, we fallback to sequential processing.
let Some(Node::Branch(old_root)) = old_root_opt else {
return self.handle_merkleization_sequential(
rx,
parent_header,
queue_length,
max_queue_length,
);
};
// If there are less than 3 subtries, we fallback to sequential processing.
// This simplifies the handling of shard results.
if old_root.choices.iter().filter(|c| c.is_valid()).count() < 3 {
return self.handle_merkleization_sequential(
rx,
parent_header,
queue_length,
max_queue_length,
);
}
let mut workers_tx = Vec::with_capacity(16);
let mut workers_handles = Vec::with_capacity(16);
for i in 0..16 {
let (tx, rx) = channel();
let handle = std::thread::Builder::new()
.name(format!("block_executor_merkleization_shard_worker_{i}"))
.spawn_scoped(scope, move || {
self.handle_merkleization_subtrie(rx, parent_header)
})
.map_err(|e| StoreError::Custom(format!("spawn failed: {e:?}",)))?;
workers_handles.push(handle);
workers_tx.push(tx);
}
let mut state_updates_map: FxHashMap<Nibbles, Vec<u8>> = Default::default();
let mut storage_updates_map: StoreUpdatesMap = Default::default();
let mut code_updates: FxHashMap<H256, Code> = Default::default();
let mut hashed_address_cache: FxHashMap<H160, H256> = Default::default();
for updates in rx {
let current_length = queue_length.fetch_sub(1, Ordering::Acquire);
*max_queue_length = current_length.max(*max_queue_length);
let mut hashed_updates: Vec<_> = updates
.into_iter()
.map(|u| {
let hashed_address = hashed_address_cache
.entry(u.address)
.or_insert_with(|| keccak(u.address));
(*hashed_address, u)
})
.collect();
hashed_updates.sort_by_key(|(h, _)| h.0[0]);
for sharded_update in hashed_updates.chunk_by(|l, r| l.0.0[0] & 0xf0 == r.0.0[0] & 0xf0)
{
let shard_message = sharded_update.to_vec();
workers_tx[(shard_message[0].0.0[0] >> 4) as usize]
.send(shard_message)
.map_err(|e| StoreError::Custom(format!("send failed: {e}")))?;
}
}
drop(workers_tx);
let mut real_root = old_root;
for (choice, worker) in workers_handles.into_iter().enumerate() {
let worker_result = worker
.join()
.map_err(|e| StoreError::Custom(format!("join failed: {e:?}",)))??;
let Some(root_node) = worker_result.state_updates.get(&Nibbles::default()) else {
continue;
};
let root_node = Node::decode(root_node)?;
let Node::Branch(mut subtrie_branch) = root_node else {
unreachable!("the result can only remove one of the >2 subtries we had")
};
real_root.choices[choice] = std::mem::take(&mut subtrie_branch.choices[choice]);

code_updates.extend(worker_result.code_updates);
storage_updates_map.extend(worker_result.storage_updates);
state_updates_map.extend(worker_result.state_updates);
}

if real_root.choices.iter().filter(|c| c.is_valid()).count() < 2 {
// On most chains, there's no way to go from a branch root node to a leaf or extension.
// There are exceptions in networks engineered to trigger this case, but it's
// not expected in normal operation.
//
// Example: network starts with a single account in genesis, transfers to other addresses,
// generating more subtries, then deploys a contract which self-destructs in each of those
// addresses, reverting back to the base case.
//
// TODO(#5387): support this case
todo!("real root has less than 2 valid subtries after merkleization");
}
let root_node = real_root.encode_to_vec();
let state_trie_hash = keccak(&root_node);
state_updates_map.insert(Nibbles::default(), root_node);
let state_updates = state_updates_map.into_iter().collect();
let storage_updates = storage_updates_map
.into_iter()
.map(|(a, (_, s))| (a, s.into_iter().collect()))
.collect();
let code_updates = code_updates.into_iter().collect();

Ok(AccountUpdatesList {
state_trie_hash,
state_updates,
storage_updates,
code_updates,
})
}

fn handle_merkleization_sequential(
&self,
rx: Receiver<Vec<AccountUpdate>>,
parent_header: &BlockHeader,
Expand All @@ -307,13 +478,25 @@ impl Blockchain {
let mut storage_updates_map: StoreUpdatesMap = Default::default();
let mut code_updates: FxHashMap<H256, Code> = Default::default();
let mut account_states: FxHashMap<H256, AccountState> = Default::default();

let mut hashed_address_cache: FxHashMap<H160, H256> = Default::default();

for updates in rx {
let current_length = queue_length.fetch_sub(1, Ordering::Acquire);
*max_queue_length = current_length.max(*max_queue_length);
let hashed_updates: Vec<_> = updates
.into_iter()
.map(|u| {
let hashed_address = hashed_address_cache
.entry(u.address)
.or_insert_with(|| keccak(u.address));
(*hashed_address, u)
})
.collect();
state_trie_hash = Self::process_incoming_update_message(
&self.storage,
&mut state_trie,
updates,
hashed_updates,
&mut storage_updates_map,
parent_header,
&mut state_updates_map,
Expand Down Expand Up @@ -342,7 +525,7 @@ impl Blockchain {
fn process_incoming_update_message(
storage: &Store,
state_trie: &mut Trie,
updates: Vec<AccountUpdate>,
updates: Vec<(H256, AccountUpdate)>,
storage_updates_map: &mut StoreUpdatesMap,
parent_header: &BlockHeader,
state_updates_map: &mut FxHashMap<Nibbles, Vec<u8>>,
Expand All @@ -351,9 +534,8 @@ impl Blockchain {
) -> Result<H256, StoreError> {
trace!("Execute block pipeline: Received {} updates", updates.len());
// Apply the account updates over the last block's state and compute the new state root
for update in updates {
let hashed_address = hash_address(&update.address);
let hashed_address_h256 = H256::from_slice(&hashed_address);
for (hashed_address_h256, update) in updates {
let hashed_address = hashed_address_h256.0.to_vec();
trace!(
"Execute block pipeline: Update cycle for {}",
hex::encode(&hashed_address)
Expand Down