Skip to content

Commit 8d487db

Browse files
feat(orchestrator): parallelize consolidation pattern (#396)
* feat(orchestrator): parallelize consolidation pattern * fix: lint * fix: use buffered chunks --------- Co-authored-by: frisitano <[email protected]>
1 parent 765264e commit 8d487db

File tree

1 file changed

+44
-22
lines changed
  • crates/chain-orchestrator/src

1 file changed

+44
-22
lines changed

crates/chain-orchestrator/src/lib.rs

Lines changed: 44 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use alloy_eips::Encodable2718;
55
use alloy_primitives::{b256, bytes::Bytes, keccak256, B256};
66
use alloy_provider::Provider;
77
use alloy_rpc_types_engine::ExecutionPayloadV1;
8-
use futures::StreamExt;
8+
use futures::{stream, StreamExt, TryStreamExt};
99
use reth_chainspec::EthChainSpec;
1010
use reth_network_api::{BlockDownloaderProvider, FullNetwork};
1111
use reth_network_p2p::{sync::SyncState as RethSyncState, FullBlockClient};
@@ -87,6 +87,12 @@ const HEADER_FETCH_COUNT: u64 = 100;
8787
/// The size of the event channel used to broadcast events to listeners.
8888
const EVENT_CHANNEL_SIZE: usize = 5000;
8989

90+
/// The batch size for batch validation.
91+
#[cfg(not(any(test, feature = "test-utils")))]
92+
const BATCH_SIZE: usize = 100;
93+
#[cfg(any(test, feature = "test-utils"))]
94+
const BATCH_SIZE: usize = 1;
95+
9096
/// The [`ChainOrchestrator`] is responsible for orchestrating the progression of the L2 chain
9197
/// based on data consolidated from L1 and the data received over the p2p network.
9298
#[derive(Debug)]
@@ -1082,28 +1088,44 @@ impl<
10821088
if head_block_number == safe_block_number {
10831089
tracing::trace!(target: "scroll::chain_orchestrator", "No unsafe blocks to consolidate");
10841090
} else {
1085-
let start_block_number = safe_block_number + 1;
1086-
// TODO: Make fetching parallel but ensure concurrency limits are respected.
1087-
let mut blocks_to_validate = vec![];
1088-
for block_number in start_block_number..=head_block_number {
1089-
let block = self
1090-
.l2_client
1091-
.get_block_by_number(block_number.into())
1092-
.full()
1093-
.await?
1094-
.ok_or(ChainOrchestratorError::L2BlockNotFoundInL2Client(block_number))?
1095-
.into_consensus()
1096-
.map_transactions(|tx| tx.inner.into_inner());
1097-
blocks_to_validate.push(block);
1098-
}
1091+
let block_stream = stream::iter(safe_block_number + 1..=head_block_number)
1092+
.map(|block_number| {
1093+
let client = self.l2_client.clone();
10991094

1100-
self.validate_l1_messages(&blocks_to_validate).await?;
1101-
1102-
self.database
1103-
.update_l1_messages_from_l2_blocks(
1104-
blocks_to_validate.into_iter().map(|b| (&b).into()).collect(),
1105-
)
1106-
.await?;
1095+
async move {
1096+
client
1097+
.get_block_by_number(block_number.into())
1098+
.full()
1099+
.await?
1100+
.ok_or(ChainOrchestratorError::L2BlockNotFoundInL2Client(block_number))
1101+
.map(|b| {
1102+
b.into_consensus().map_transactions(|tx| tx.inner.into_inner())
1103+
})
1104+
}
1105+
})
1106+
.buffered(BATCH_SIZE);
1107+
1108+
let mut block_chunks = block_stream.try_chunks(BATCH_SIZE);
1109+
1110+
while let Some(blocks_result) = block_chunks.next().await {
1111+
let blocks_to_validate =
1112+
blocks_result.map_err(|_| ChainOrchestratorError::InvalidBlock)?;
1113+
1114+
if let Err(e) = self.validate_l1_messages(&blocks_to_validate).await {
1115+
tracing::error!(
1116+
target: "scroll::chain_orchestrator",
1117+
error = ?e,
1118+
"Validation failed — purging all L1→L2 message mappings"
1119+
);
1120+
self.database.purge_l1_message_to_l2_block_mappings(None).await?;
1121+
return Err(e);
1122+
}
1123+
self.database
1124+
.update_l1_messages_from_l2_blocks(
1125+
blocks_to_validate.iter().map(|b| b.into()).collect(),
1126+
)
1127+
.await?;
1128+
}
11071129
};
11081130

11091131
// send a notification to the network that the chain is synced such that it accepts

0 commit comments

Comments
 (0)