Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
c6875b6
Initial RLPxConnection refactor to use spawned
ElFantasma Jun 5, 2025
4983a1a
Moved handshake into connection spawned process
ElFantasma Jun 11, 2025
ee013d6
Connected backend channels and added periodic checks
ElFantasma Jun 13, 2025
49fc0ee
Improved periodic tasks handling
ElFantasma Jun 13, 2025
6295648
Added broadcast channel handling
ElFantasma Jun 17, 2025
b5585cf
Simplified RLPxConnection initialization
ElFantasma Jun 18, 2025
a35930f
Removed Arc<Mutex<...>>> for listening stream
ElFantasma Jun 19, 2025
d5b0131
Updated to newest spawned version
ElFantasma Jun 24, 2025
b5a3389
Updated to newest spawned version
ElFantasma Jun 24, 2025
0c72703
Merge branch 'main' into spawned_p2p
ElFantasma Jun 24, 2025
377e47b
Fixed merge problems
ElFantasma Jun 25, 2025
bb5b9f3
Merge branch 'main' into spawned_p2p
ElFantasma Jun 25, 2025
e897781
Fixed linter issues
ElFantasma Jun 25, 2025
1a0942b
Fixed linter issues
ElFantasma Jun 25, 2025
30fc9c4
Fixed failing test on discv4
ElFantasma Jun 25, 2025
0c0b7ef
Breaking listen loop when stream is finished
ElFantasma Jun 26, 2025
95f2ac7
Merge branch 'main' into spawned_p2p
ElFantasma Jun 26, 2025
a084f8e
Improved listen loop syntax
ElFantasma Jun 26, 2025
46d5406
Improved error handling on connection initialization
ElFantasma Jun 26, 2025
2c6269b
Merge branch 'main' into spawned_p2p
ElFantasma Jun 26, 2025
4003852
updated spawned use in metrics module
ElFantasma Jun 26, 2025
0dcd1ab
updated spawned use in metrics module
ElFantasma Jun 26, 2025
99db63e
emptying service.nix spawned sha to obtain a new one from CI
ElFantasma Jun 26, 2025
a292833
updated spawned in Cargo.lock
ElFantasma Jun 26, 2025
937373c
updated service.nix with proper sha
ElFantasma Jun 26, 2025
7632394
Merge branch 'main' into spawned_p2p
ElFantasma Jun 26, 2025
d866f74
Merge branch 'main' into spawned_p2p
ElFantasma Jun 26, 2025
e98aa41
Corrected potential problem on tcp stream handling
ElFantasma Jun 27, 2025
8477e54
Merge branch 'main' into spawned_p2p
ElFantasma Jun 27, 2025
fb4a959
Added TODO comments and links to issues
ElFantasma Jun 27, 2025
1d0ff65
Merge branch 'main' into spawned_p2p
ElFantasma Jun 27, 2025
075b31e
Merge branch 'main' into spawned_p2p
ElFantasma Jun 27, 2025
3221778
Merge branch 'main' into spawned_p2p
ElFantasma Jun 30, 2025
3291f4f
Added TODO comments and links to issues
ElFantasma Jun 30, 2025
9abbd8f
Merge branch 'main' into spawned_p2p
ElFantasma Jun 30, 2025
3112459
Merge branch 'main' into spawned_p2p
ElFantasma Jun 30, 2025
010ce63
Fixed linter issues
ElFantasma Jun 30, 2025
a9f7ccf
Merge branch 'main' into spawned_p2p
ElFantasma Jun 30, 2025
20fa919
Merge branch 'main' into spawned_p2p
ElFantasma Jun 30, 2025
d05f77e
Updated to newest spawned version
ElFantasma Jul 1, 2025
8b9d4b8
feat(l2): implement ERC20 bridge (#3241)
iovoid Jun 30, 2025
014b690
feat(l2): add instance info to Grafana alerts (#3333)
ManuelBilbao Jun 30, 2025
fde712a
feat(l2): exchange commit hash in node-prover communication (#3339)
avilagaston9 Jun 30, 2025
cc27f5f
refactor(l1, l2, levm): remove `l2` feature flag from crates `ethrex-…
ilitteri Jun 30, 2025
b05bfe0
fix(core): more accurate throughput (#3412)
Oppen Jun 30, 2025
385642d
perf(levm): refactor `CacheDB` to use more efficient APIs (#3259)
azteca1998 Jul 1, 2025
b8fea06
perf(levm): add fib recursive bench (#3391)
edg-l Jul 1, 2025
0fbe568
feat(l2): replace custom merkle tree with `OpenZeppelin` + `lambdawor…
LeanSerra Jul 1, 2025
aac658c
feat(l2): burn gas when sending privileged transactions (#3407)
iovoid Jul 1, 2025
9f8c49a
Merge branch 'main' into spawned_p2p
ElFantasma Jul 1, 2025
48e8bbb
Fixed quote-gen Cargo.lock and reseted spawned sha
ElFantasma Jul 1, 2025
ef9864d
Updated spawned sha
ElFantasma Jul 1, 2025
fae2eac
Merge branch 'main' into spawned_p2p
ElFantasma Jul 2, 2025
992b61e
Merge branch 'main' into spawned_p2p
ElFantasma Jul 2, 2025
4fca73f
Addressed PR review comments
ElFantasma Jul 2, 2025
4979b29
Merge branch 'main' into spawned_p2p
ElFantasma Jul 2, 2025
0085d47
Fixed wronged merge
ElFantasma Jul 2, 2025
14bb43b
Merge branch 'main' into spawned_p2p
ElFantasma Jul 2, 2025
aaf300e
Merge branch 'main' into spawned_p2p
ElFantasma Jul 2, 2025
81929b5
Merge branch 'main' into spawned_p2p
ElFantasma Jul 3, 2025
973e0b9
Merge branch 'main' into spawned_p2p
ElFantasma Jul 3, 2025
5884bc2
Removed unused struct
ElFantasma Jul 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
282 changes: 196 additions & 86 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ crc32fast = "1.4.2"
lazy_static = "1.5.0"
sha2 = "0.10.8"
sha3 = "0.10.8"
tokio-util = { version = "0.7.12", features = ["rt"] }
tokio-util = { version = "0.7.15", features = ["rt"] }
jsonwebtoken = "9.3.0"
rand = "0.8.5"
cfg-if = "1.0.0"
Expand All @@ -95,6 +95,10 @@ eyre = "0.6.12"
kzg-rs = "0.2.6"
libsql = "0.9.10"
futures = "0.3.31"
# Changing the tag for spawned will break the TDX image build
# When updating it try to build the TDX image and update service.nix with the new hash
spawned-concurrency = {git = "https://github.com/lambdaclass/spawned.git", tag = "v0.1.2-alpha"}
spawned-rt = {git = "https://github.com/lambdaclass/spawned.git", tag = "v0.1.2-alpha"}
lambdaworks-crypto = "0.11.0"

[patch.crates-io]
Expand Down
6 changes: 2 additions & 4 deletions crates/l2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,11 @@ keccak-hash.workspace = true
envy = "0.4.2"
rand.workspace = true
thiserror.workspace = true
spawned-rt.workspace = true
spawned-concurrency.workspace = true
directories = "5.0.1"
bincode = "1.3.3"
serde_with = "3.11.0"
# Changing the tag for spawned will break the TDX image build
# When updating it try to build the TDX image and update service.nix with the new hash
spawned-concurrency = { git = "https://github.com/lambdaclass/spawned.git", tag = "v0.1.0-alpha" }
spawned-rt = { git = "https://github.com/lambdaclass/spawned.git", tag = "v0.1.0-alpha" }
lazy_static.workspace = true
aligned-sdk = { git = "https://github.com/yetanotherco/aligned_layer", tag = "v0.16.1" }
ethers = "2.0"
Expand Down
40 changes: 19 additions & 21 deletions crates/l2/based/block_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ use ethrex_rpc::{EthClient, types::receipt::RpcLog};
use ethrex_storage::Store;
use ethrex_storage_rollup::{RollupStoreError, StoreRollup};
use keccak_hash::keccak;
use spawned_concurrency::{CallResponse, CastResponse, GenServer, send_after};
use spawned_concurrency::{
error::GenServerError,
messages::Unused,
tasks::{CastResponse, GenServer, GenServerHandle, send_after},
};
use tracing::{debug, error, info};

use crate::{
Expand Down Expand Up @@ -52,12 +56,14 @@ pub enum BlockFetcherError {
EvmError(#[from] ethrex_vm::EvmError),
#[error("Failed to produce the blob bundle")]
BlobBundleError,
#[error("Failed to compute privileged transactions hash: {0}")]
#[error("Failed to compute deposit logs hash: {0}")]
PrivilegedTransactionError(
#[from] ethrex_l2_common::privileged_transactions::PrivilegedTransactionError,
),
// TODO: Avoid propagating GenServerErrors outside GenServer modules
// See https://github.com/lambdaclass/ethrex/issues/3376
#[error("Spawned GenServer Error")]
GenServerError(spawned_concurrency::GenServerError),
GenServerError(GenServerError),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following @mpaulucci feedback, we should be just using internal errors for the GenServerErrors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment and an issue to fix it

}

#[derive(Clone)]
Expand Down Expand Up @@ -131,7 +137,8 @@ impl BlockFetcher {
}

impl GenServer for BlockFetcher {
type InMsg = InMessage;
type CallMsg = Unused;
type CastMsg = InMessage;
type OutMsg = OutMessage;
type State = BlockFetcherState;
type Error = BlockFetcherError;
Expand All @@ -140,32 +147,23 @@ impl GenServer for BlockFetcher {
Self {}
}

async fn handle_call(
&mut self,
_message: Self::InMsg,
_tx: &spawned_rt::mpsc::Sender<spawned_concurrency::GenServerInMsg<Self>>,
_state: &mut Self::State,
) -> spawned_concurrency::CallResponse<Self::OutMsg> {
CallResponse::Reply(OutMessage::Done)
}

async fn handle_cast(
&mut self,
_message: Self::InMsg,
_tx: &spawned_rt::mpsc::Sender<spawned_concurrency::GenServerInMsg<Self>>,
state: &mut Self::State,
) -> spawned_concurrency::CastResponse {
_message: Self::CastMsg,
handle: &GenServerHandle<Self>,
mut state: Self::State,
) -> CastResponse<Self> {
if let SequencerStatus::Following = state.sequencer_state.status().await {
let _ = fetch(state).await.inspect_err(|err| {
let _ = fetch(&mut state).await.inspect_err(|err| {
error!("Block Fetcher Error: {err}");
});
}
send_after(
Duration::from_millis(state.fetch_interval_ms),
_tx.clone(),
Self::InMsg::Fetch,
handle.clone(),
Self::CastMsg::Fetch,
);
CastResponse::NoReply
CastResponse::NoReply(state)
}
}

Expand Down
36 changes: 17 additions & 19 deletions crates/l2/based/state_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ use ethrex_l2_sdk::calldata::encode_calldata;
use ethrex_rpc::{EthClient, clients::Overrides};
use ethrex_storage::Store;
use ethrex_storage_rollup::{RollupStoreError, StoreRollup};
use spawned_concurrency::{CallResponse, CastResponse, GenServer, GenServerError, send_after};
use spawned_concurrency::{
error::GenServerError,
messages::Unused,
tasks::{CastResponse, GenServer, GenServerHandle, send_after},
};
use tracing::{debug, error, info, warn};

use crate::{
Expand All @@ -32,6 +36,8 @@ pub enum StateUpdaterError {
InvalidForkChoice(#[from] ethrex_blockchain::error::InvalidForkChoice),
#[error("Internal Error: {0}")]
InternalError(String),
// TODO: Avoid propagating GenServerErrors outside GenServer modules
// See https://github.com/lambdaclass/ethrex/issues/3376
#[error("Spawned GenServer Error")]
GenServerError(GenServerError),
}
Expand Down Expand Up @@ -99,7 +105,8 @@ impl StateUpdater {
}

impl GenServer for StateUpdater {
type InMsg = InMessage;
type CallMsg = Unused;
type CastMsg = InMessage;
type OutMsg = OutMessage;
type State = StateUpdaterState;
type Error = StateUpdaterError;
Expand All @@ -108,30 +115,21 @@ impl GenServer for StateUpdater {
Self {}
}

async fn handle_call(
&mut self,
_message: Self::InMsg,
_tx: &spawned_rt::mpsc::Sender<spawned_concurrency::GenServerInMsg<Self>>,
_state: &mut Self::State,
) -> spawned_concurrency::CallResponse<Self::OutMsg> {
CallResponse::Reply(OutMessage::Done)
}

async fn handle_cast(
&mut self,
_message: Self::InMsg,
tx: &spawned_rt::mpsc::Sender<spawned_concurrency::GenServerInMsg<Self>>,
state: &mut Self::State,
) -> spawned_concurrency::CastResponse {
let _ = update_state(state)
_message: Self::CastMsg,
handle: &GenServerHandle<Self>,
mut state: Self::State,
) -> CastResponse<Self> {
let _ = update_state(&mut state)
.await
.inspect_err(|err| error!("State Updater Error: {err}"));
send_after(
Duration::from_millis(state.check_interval_ms),
tx.clone(),
Self::InMsg::UpdateState,
handle.clone(),
Self::CastMsg::UpdateState,
);
CastResponse::NoReply
CastResponse::NoReply(state)
}
}

Expand Down
34 changes: 14 additions & 20 deletions crates/l2/sequencer/block_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ use ethrex_storage_rollup::StoreRollup;
use ethrex_vm::BlockExecutionResult;
use keccak_hash::H256;
use payload_builder::build_payload;
use spawned_concurrency::{CallResponse, CastResponse, GenServer, GenServerInMsg, send_after};
use spawned_rt::mpsc::Sender;
use spawned_concurrency::{
messages::Unused,
tasks::{CastResponse, GenServer, GenServerHandle, send_after},
};
use tracing::{debug, error, info};

use crate::{
Expand Down Expand Up @@ -104,7 +106,8 @@ impl BlockProducer {
}

impl GenServer for BlockProducer {
type InMsg = InMessage;
type CallMsg = Unused;
type CastMsg = InMessage;
type OutMsg = OutMessage;
type State = BlockProducerState;

Expand All @@ -114,33 +117,24 @@ impl GenServer for BlockProducer {
Self {}
}

async fn handle_call(
&mut self,
_message: Self::InMsg,
_tx: &Sender<GenServerInMsg<Self>>,
_state: &mut Self::State,
) -> CallResponse<Self::OutMsg> {
CallResponse::Reply(OutMessage::Done)
}

async fn handle_cast(
&mut self,
_message: Self::InMsg,
tx: &Sender<GenServerInMsg<Self>>,
state: &mut Self::State,
) -> CastResponse {
_message: Self::CastMsg,
handle: &GenServerHandle<Self>,
state: Self::State,
) -> CastResponse<Self> {
// Right now we only have the Produce message, so we ignore the message
if let SequencerStatus::Sequencing = state.sequencer_state.status().await {
let _ = produce_block(state)
let _ = produce_block(&state)
.await
.inspect_err(|e| error!("Block Producer Error: {e}"));
}
send_after(
Duration::from_millis(state.block_time_ms),
tx.clone(),
Self::InMsg::Produce,
handle.clone(),
Self::CastMsg::Produce,
);
CastResponse::NoReply
CastResponse::NoReply(state)
}
}

Expand Down
14 changes: 13 additions & 1 deletion crates/l2/sequencer/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use ethrex_rpc::clients::eth::errors::{CalldataEncodeError, EthClientError};
use ethrex_storage::error::StoreError;
use ethrex_storage_rollup::RollupStoreError;
use ethrex_vm::{EvmError, ProverDBError};
use spawned_concurrency::GenServerError;
use spawned_concurrency::error::GenServerError;
use tokio::task::JoinError;

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -61,6 +61,8 @@ pub enum L1WatcherError {
FailedAccessingRollUpStore(#[from] RollupStoreError),
#[error("{0}")]
Custom(String),
// TODO: Avoid propagating GenServerErrors outside GenServer modules
// See https://github.com/lambdaclass/ethrex/issues/3376
#[error("Spawned GenServer Error")]
GenServerError(GenServerError),
}
Expand Down Expand Up @@ -123,6 +125,8 @@ pub enum ProofSenderError {
InternalError(String),
#[error("Failed to parse OnChainProposer response: {0}")]
FailedToParseOnChainProposerResponse(String),
// TODO: Avoid propagating GenServerErrors outside GenServer modules
// See https://github.com/lambdaclass/ethrex/issues/3376
#[error("Spawned GenServer Error")]
GenServerError(GenServerError),
#[error("Proof Sender failed because of a rollup store error: {0}")]
Expand Down Expand Up @@ -189,6 +193,8 @@ pub enum BlockProducerError {
FailedToEncodeAccountStateDiff(#[from] StateDiffError),
#[error("Failed to get data from: {0}")]
FailedToGetDataFrom(String),
// TODO: Avoid propagating GenServerErrors outside GenServer modules
// See https://github.com/lambdaclass/ethrex/issues/3376
#[error("Spawned GenServer Error")]
GenServerError(GenServerError),
}
Expand Down Expand Up @@ -235,6 +241,8 @@ pub enum CommitterError {
FailedToGetWithdrawals(#[from] UtilsError),
#[error("Privileged Transaction error: {0}")]
PrivilegedTransactionError(#[from] PrivilegedTransactionError),
// TODO: Avoid propagating GenServerErrors outside GenServer modules
// See https://github.com/lambdaclass/ethrex/issues/3376
#[error("Spawned GenServer Error")]
GenServerError(GenServerError),
}
Expand All @@ -261,6 +269,8 @@ pub enum MetricsGathererError {
EthClientError(#[from] EthClientError),
#[error("MetricsGatherer: {0}")]
TryInto(String),
// TODO: Avoid propagating GenServerErrors outside GenServer modules
// See https://github.com/lambdaclass/ethrex/issues/3376
#[error("Spawned GenServer Error")]
GenServerError(GenServerError),
}
Expand All @@ -275,6 +285,8 @@ pub enum ExecutionCacheError {

#[derive(Debug, thiserror::Error)]
pub enum ConnectionHandlerError {
// TODO: Avoid propagating GenServerErrors outside GenServer modules
// See https://github.com/lambdaclass/ethrex/issues/3376
#[error("Spawned GenServer Error")]
GenServerError(GenServerError),
}
32 changes: 13 additions & 19 deletions crates/l2/sequencer/l1_committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ use std::{collections::HashMap, sync::Arc};
use tracing::{debug, error, info, warn};

use super::{errors::BlobEstimationError, utils::random_duration};
use spawned_concurrency::{CallResponse, CastResponse, GenServer, GenServerInMsg, send_after};
use spawned_rt::mpsc::Sender;
use spawned_concurrency::{
messages::Unused,
tasks::{CastResponse, GenServer, GenServerHandle, send_after},
};

const COMMIT_FUNCTION_SIGNATURE_BASED: &str =
"commitBatch(uint256,bytes32,bytes32,bytes32,bytes32,bytes[])";
Expand Down Expand Up @@ -136,7 +138,8 @@ impl L1Committer {
}

impl GenServer for L1Committer {
type InMsg = InMessage;
type CallMsg = Unused;
type CastMsg = InMessage;
type OutMsg = OutMessage;
type State = CommitterState;

Expand All @@ -146,30 +149,21 @@ impl GenServer for L1Committer {
Self {}
}

async fn handle_call(
&mut self,
_message: Self::InMsg,
_tx: &Sender<GenServerInMsg<Self>>,
_state: &mut Self::State,
) -> CallResponse<Self::OutMsg> {
CallResponse::Reply(OutMessage::Done)
}

async fn handle_cast(
&mut self,
_message: Self::InMsg,
tx: &Sender<GenServerInMsg<Self>>,
state: &mut Self::State,
) -> CastResponse {
_message: Self::CastMsg,
handle: &GenServerHandle<Self>,
mut state: Self::State,
) -> CastResponse<Self> {
// Right now we only have the Commit message, so we ignore the message
if let SequencerStatus::Sequencing = state.sequencer_state.status().await {
let _ = commit_next_batch_to_l1(state)
let _ = commit_next_batch_to_l1(&mut state)
.await
.inspect_err(|err| error!("L1 Committer Error: {err}"));
}
let check_interval = random_duration(state.commit_time_ms);
send_after(check_interval, tx.clone(), Self::InMsg::Commit);
CastResponse::NoReply
send_after(check_interval, handle.clone(), Self::CastMsg::Commit);
CastResponse::NoReply(state)
}
}

Expand Down
Loading
Loading