Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/ethrex/l2/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ impl Command {
}

// Execute block
blockchain.add_block(block.clone())?;
blockchain.add_block_pipeline(block.clone())?;

// Add fee config to rollup store
rollup_store
Expand Down
61 changes: 46 additions & 15 deletions crates/blockchain/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ impl Blockchain {
fn execute_block_pipeline(
&self,
block: &Block,
parent_header: &BlockHeader,
mut vm: Evm,
) -> Result<
(
BlockExecutionResult,
Expand All @@ -209,22 +211,13 @@ impl Blockchain {
ChainError,
> {
let start_instant = Instant::now();
// Validate if it can be the new head and find the parent
let Ok(parent_header) = find_parent_header(&block.header, &self.storage) else {
// If the parent is not present, we store it as pending.
self.storage.add_pending_block(block.clone())?;
return Err(ChainError::ParentNotFound);
};

let chain_config = self.storage.get_chain_config();

// Validate the block pre-execution
validate_block(block, &parent_header, &chain_config, ELASTICITY_MULTIPLIER)?;
validate_block(block, parent_header, &chain_config, ELASTICITY_MULTIPLIER)?;
let block_validated_instant = Instant::now();

let vm_db = StoreVmDatabase::new(self.storage.clone(), parent_header.clone());
let mut vm = self.new_evm(vm_db)?;

let exec_merkle_start = Instant::now();
let queue_length = AtomicUsize::new(0);
let queue_length_ref = &queue_length;
Expand Down Expand Up @@ -1091,8 +1084,26 @@ impl Blockchain {
}

pub fn add_block_pipeline(&self, block: Block) -> Result<(), ChainError> {
self.add_block_pipeline_with_fee_config(block, None)
}

pub fn add_block_pipeline_with_fee_config(
&self,
block: Block,
fee_config: Option<FeeConfig>,
) -> Result<(), ChainError> {
// Validate if it can be the new head and find the parent
let Ok(parent_header) = find_parent_header(&block.header, &self.storage) else {
// If the parent is not present, we store it as pending.
self.storage.add_pending_block(block)?;
return Err(ChainError::ParentNotFound);
};

let vm_db = StoreVmDatabase::new(self.storage.clone(), parent_header.clone());
let vm = self.new_evm_with_fee_config(vm_db, fee_config)?;

let (res, account_updates_list, merkle_queue_length, instants) =
self.execute_block_pipeline(&block)?;
self.execute_block_pipeline(&block, &parent_header, vm)?;

let (gas_used, gas_limit, block_number, transactions_count) = (
block.header.gas_used,
Expand Down Expand Up @@ -1650,6 +1661,14 @@ impl Blockchain {
new_evm(&self.options.r#type, vm_db)
}

pub fn new_evm_with_fee_config(
&self,
vm_db: StoreVmDatabase,
fee_config: Option<FeeConfig>,
) -> Result<Evm, EvmError> {
new_evm_with_fee_config(&self.options.r#type, vm_db, fee_config)
}

/// Get the current fork of the chain, based on the latest block's timestamp
pub async fn current_fork(&self) -> Result<Fork, StoreError> {
let chain_config = self.storage.get_chain_config();
Expand All @@ -1663,13 +1682,25 @@ impl Blockchain {
}

pub fn new_evm(blockchain_type: &BlockchainType, vm_db: StoreVmDatabase) -> Result<Evm, EvmError> {
new_evm_with_fee_config(blockchain_type, vm_db, None)
}

pub fn new_evm_with_fee_config(
blockchain_type: &BlockchainType,
vm_db: StoreVmDatabase,
fee_config: Option<FeeConfig>,
) -> Result<Evm, EvmError> {
let evm = match blockchain_type {
BlockchainType::L1 => Evm::new_for_l1(vm_db),
BlockchainType::L2(l2_config) => {
let fee_config = *l2_config
.fee_config
.read()
.map_err(|_| EvmError::Custom("Fee config lock was poisoned".to_string()))?;
let fee_config = if let Some(fee_config) = fee_config {
fee_config
} else {
*l2_config
.fee_config
.read()
.map_err(|_| EvmError::Custom("Fee config lock was poisoned".to_string()))?
};
Evm::new_for_l2(vm_db, fee_config)?
}
};
Expand Down
123 changes: 33 additions & 90 deletions crates/l2/sequencer/l1_committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
};
use bytes::Bytes;
use ethrex_blockchain::{
Blockchain, BlockchainOptions, BlockchainType, L2Config, error::ChainError, vm::StoreVmDatabase,
Blockchain, BlockchainOptions, BlockchainType, L2Config, error::ChainError,
};
use ethrex_common::{
Address, H256, U256,
Expand Down Expand Up @@ -50,7 +50,7 @@ use ethrex_rpc::{
use ethrex_storage::EngineType;
use ethrex_storage::Store;
use ethrex_storage_rollup::StoreRollup;
use ethrex_vm::{BlockExecutionResult, Evm};
use ethrex_vm::BlockExecutionResult;
use rand::Rng;
use serde::Serialize;
use std::{
Expand Down Expand Up @@ -414,15 +414,11 @@ impl L1Committer {
let (one_time_checkpoint_path, one_time_checkpoint_store, one_time_checkpoint_blockchain) =
self.generate_one_time_checkpoint(batch.number).await?;

self.execute_batch_to_generate_checkpoint(
batch,
one_time_checkpoint_store.clone(),
one_time_checkpoint_blockchain,
)
.await
.inspect_err(|_| {
let _ = self.remove_one_time_checkpoint(&one_time_checkpoint_path);
})?;
self.execute_batch_to_generate_checkpoint(batch, one_time_checkpoint_blockchain)
.await
.inspect_err(|_| {
let _ = self.remove_one_time_checkpoint(&one_time_checkpoint_path);
})?;

// Create the next checkpoint from the one-time checkpoint used
let new_checkpoint_path = self
Expand All @@ -444,7 +440,6 @@ impl L1Committer {
async fn execute_batch_to_generate_checkpoint(
&self,
batch: &Batch,
one_time_checkpoint_store: Store,
one_time_checkpoint_blockchain: Arc<Blockchain>,
) -> Result<(), CommitterError> {
info!("Generating missing checkpoint for batch {}", batch.number);
Expand All @@ -463,46 +458,8 @@ impl L1Committer {
"FeeConfig not found for witness generation".to_string(),
))?;

let parent_header = self
.store
.get_block_header_by_hash(block.header.parent_hash)?
.ok_or(CommitterError::ChainError(ChainError::ParentNotFound))?;

// Here we use the checkpoint store because we need the previous
// state available (i.e. not pruned) for re-execution.
let vm_db = StoreVmDatabase::new(one_time_checkpoint_store.clone(), parent_header);

let mut vm = Evm::new_for_l2(vm_db, *fee_config)?;

vm.execute_block(block)?;

let account_updates = vm.get_state_transitions()?;
let account_updates_list = one_time_checkpoint_store
.apply_account_updates_batch(block.header.parent_hash, &account_updates)?
.ok_or(CommitterError::FailedToGetInformationFromStorage(
"no account updated".to_owned(),
))?;

let mut receipts = vec![];
for (index, _) in block.body.transactions.iter().enumerate() {
let receipt = self
.store
.get_receipt(block.header.number, index.try_into()?)
.await?
.ok_or(CommitterError::RetrievalError(
"Transactions in a block should have a receipt".to_owned(),
))?;
receipts.push(receipt);
}

one_time_checkpoint_blockchain.store_block(
block.clone(),
account_updates_list,
BlockExecutionResult {
receipts,
requests: vec![],
},
)?;
one_time_checkpoint_blockchain
.add_block_pipeline_with_fee_config(block.clone(), Some(*fee_config))?;
}

Ok(())
Expand Down Expand Up @@ -732,46 +689,15 @@ impl L1Committer {
get_block_l1_privileged_transactions(&txs, self.store.chain_config.chain_id);

// Get block account updates.
let account_updates = if let Some(account_updates) = self
if let Some(account_updates) = self
.rollup_store
.get_account_updates_by_block_number(block_to_commit_number)
.await?
{
account_updates
} else {
warn!(
"Could not find execution cache result for block {}, falling back to re-execution",
last_added_block_number + 1
);
let parent_header = self
.store
.get_block_header_by_hash(potential_batch_block.header.parent_hash)?
.ok_or(CommitterError::ChainError(ChainError::ParentNotFound))?;

// Here we use the checkpoint store because we need the previous
// state available (i.e. not pruned) for re-execution.
let vm_db = StoreVmDatabase::new(checkpoint_store.clone(), parent_header);

let fee_config = self
.rollup_store
.get_fee_config_by_block(block_to_commit_number)
.await?
.ok_or(CommitterError::FailedToGetInformationFromStorage(
"Failed to get fee config for re-execution".to_owned(),
))?;

let mut vm = Evm::new_for_l2(vm_db, fee_config)?;

vm.execute_block(&potential_batch_block)?;

vm.get_state_transitions()?
};

// The checkpoint store's state corresponds to the parent state of
// the first block of the batch. Therefore, we need to apply the
// account updates of each block as we go, to be able to continue
// re-executing the next blocks in the batch.
{
// The checkpoint store's state corresponds to the parent state of
// the first block of the batch. Therefore, we need to apply the
// account updates of each block as we go, to be able to continue
// re-executing the next blocks in the batch.
let account_updates_list = checkpoint_store
.apply_account_updates_batch(
potential_batch_block.header.parent_hash,
Expand All @@ -780,7 +706,6 @@ impl L1Committer {
.ok_or(CommitterError::FailedToGetInformationFromStorage(
"no account updated".to_owned(),
))?;

checkpoint_blockchain.store_block(
potential_batch_block.clone(),
account_updates_list,
Expand All @@ -789,7 +714,25 @@ impl L1Committer {
requests: vec![],
},
)?;
}
} else {
warn!(
"Could not find execution cache result for block {}, falling back to re-execution",
last_added_block_number + 1
);

let fee_config = self
.rollup_store
.get_fee_config_by_block(block_to_commit_number)
.await?
.ok_or(CommitterError::FailedToGetInformationFromStorage(
"Failed to get fee config for re-execution".to_owned(),
))?;

checkpoint_blockchain.add_block_pipeline_with_fee_config(
potential_batch_block.clone(),
Some(fee_config),
)?
};

// Accumulate block data with the rest of the batch.
acc_privileged_txs.extend(privileged_transactions.clone());
Expand Down
21 changes: 12 additions & 9 deletions crates/networking/p2p/rlpx/l2/l2_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,15 +400,18 @@ async fn process_new_block(
let block = Arc::<Block>::try_unwrap(block).map_err(|_| {
PeerConnectionError::InternalError("Failed to take ownership of block".to_string())
})?;
established.blockchain.add_block(block).inspect_err(|e| {
error!(
peer=%established.node,
error=%e,
block_number,
?block_hash,
"Error adding new block",
);
})?;
established
.blockchain
.add_block_pipeline(block)
.inspect_err(|e| {
error!(
peer=%established.node,
error=%e,
block_number,
?block_hash,
"Error adding new block",
);
})?;

apply_fork_choice(&established.storage, block_hash, block_hash, block_hash)
.await
Expand Down