Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 10 additions & 6 deletions crates/derivation-pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl<P> Stream for DerivationPipeline<P>
where
P: L1Provider + Clone + Unpin + Send + Sync + 'static,
{
type Item = ScrollPayloadAttributesWithBatchInfo;
type Item = WithBlockNumber<ScrollPayloadAttributesWithBatchInfo>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
Expand All @@ -191,7 +191,7 @@ where

// return attributes from the queue if any.
if let Some(attribute) = this.attributes_queue.pop_front() {
return Poll::Ready(Some(attribute.inner))
return Poll::Ready(Some(attribute))
}

// if future is None and the batch queue is empty, store the waker and return.
Expand Down Expand Up @@ -465,8 +465,10 @@ mod tests {

// check the correctness of the last attribute.
let mut attribute = ScrollPayloadAttributes::default();
while let Some(ScrollPayloadAttributesWithBatchInfo { payload_attributes: a, .. }) =
pipeline.next().await
while let Some(WithBlockNumber {
inner: ScrollPayloadAttributesWithBatchInfo { payload_attributes: a, .. },
..
}) = pipeline.next().await
{
if a.payload_attributes.timestamp == 1696935657 {
attribute = a;
Expand Down Expand Up @@ -523,8 +525,10 @@ mod tests {

// check the correctness of the last attribute.
let mut attribute = ScrollPayloadAttributes::default();
while let Some(ScrollPayloadAttributesWithBatchInfo { payload_attributes: a, .. }) =
pipeline.next().await
while let Some(WithBlockNumber {
inner: ScrollPayloadAttributesWithBatchInfo { payload_attributes: a, .. },
..
}) = pipeline.next().await
{
if a.payload_attributes.timestamp == 1696935657 {
attribute = a;
Expand Down
68 changes: 62 additions & 6 deletions crates/engine/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use crate::{
use alloy_provider::Provider;
use futures::{ready, task::AtomicWaker, FutureExt, Stream};
use rollup_node_primitives::{
BlockInfo, ChainImport, MeteredFuture, ScrollPayloadAttributesWithBatchInfo,

BlockInfo, ChainImport, MeteredFuture, ScrollPayloadAttributesWithBatchInfo, WithBlockNumber,
,
};
use scroll_alloy_hardforks::ScrollHardforks;
use scroll_alloy_network::Scroll;
Expand Down Expand Up @@ -38,7 +40,7 @@ pub struct EngineDriver<EC, CS, P> {
/// Block building duration.
block_building_duration: Duration,
/// The pending payload attributes derived from batches on L1.
l1_payload_attributes: VecDeque<ScrollPayloadAttributesWithBatchInfo>,
l1_payload_attributes: VecDeque<WithBlockNumber<ScrollPayloadAttributesWithBatchInfo>>,
/// The pending block imports received over the network.
chain_imports: VecDeque<ChainImport>,
/// The latest optimistic sync target.
Expand Down Expand Up @@ -121,6 +123,57 @@ where
}
}

/// Handle L1 reorg, with the L1 block number reorged to, and whether this reorged the head or
/// batches.
pub fn handle_l1_reorg(
&mut self,
l1_block_number: u64,
reorged_unsafe_head: Option<BlockInfo>,
reorged_safe_head: Option<BlockInfo>,
) {
// On an unsafe head reorg: clear the payload building future, reset the unsafe head and
// drop the engine future if it's a `NewPayload` or `BlockImport` with block number > L2
// reorged number.
if let Some(l2_head_block_info) = reorged_unsafe_head {
self.payload_building_future = None;
self.set_head_block_info(l2_head_block_info);
if let Some(MeteredFuture { fut, .. }) = self.engine_future.as_ref() {
match fut {
EngineFuture::BlockImport(WithBlockNumber { number, .. })
if number > &l2_head_block_info.number =>
{
self.engine_future = None
}
// `NewPayload` future is ONLY instantiated when the payload building future is
// done, and we want to issue the payload to the EN. Thus, we also clear it on a
// L2 reorg.
EngineFuture::NewPayload(_) => self.engine_future = None,
_ => {}
}
}
}

// On a safe head reorg: reset the safe head.
if let Some(safe_block_info) = reorged_safe_head {
self.set_safe_block_info(safe_block_info);
}

// drop the engine future if it's a `L1Consolidation` future associated with a L1 block
// number > l1_block_number.
if matches!(
self.engine_future.as_ref(),
Some(MeteredFuture {
fut: EngineFuture::L1Consolidation(WithBlockNumber { number, .. }),
..
}) if number > &l1_block_number
) {
self.engine_future = None;
}

// retain the L1 payload attributes with block number <= L1 block.
self.l1_payload_attributes.retain(|attribute| attribute.number <= l1_block_number);
}

/// Handles a block import request by adding it to the queue and waking up the driver.
pub fn handle_chain_import(&mut self, chain_import: ChainImport) {
tracing::trace!(target: "scroll::engine", head = %chain_import.chain.last().unwrap().hash_slow(), "new chain import request received");
Expand All @@ -145,7 +198,10 @@ where

/// Handles a [`ScrollPayloadAttributes`] sourced from L1 by initiating a task sending the
/// attribute to the EN via the [`EngineDriver`].
pub fn handle_l1_consolidation(&mut self, attributes: ScrollPayloadAttributesWithBatchInfo) {
pub fn handle_l1_consolidation(
&mut self,
attributes: WithBlockNumber<ScrollPayloadAttributesWithBatchInfo>,
) {
self.l1_payload_attributes.push_back(attributes);
self.waker.wake();
}
Expand Down Expand Up @@ -193,7 +249,7 @@ where
self.metrics.block_import_duration.record(duration.as_secs_f64());

// Return the block import outcome
return block_import_outcome.map(EngineDriverEvent::BlockImportOutcome)
return block_import_outcome.map(EngineDriverEvent::BlockImportOutcome);
}
Err(err) => {
tracing::error!(target: "scroll::engine", ?err, "failed to import block");
Expand All @@ -220,7 +276,7 @@ where
// record the metric.
self.metrics.l1_consolidation_duration.record(duration.as_secs_f64());

return Some(EngineDriverEvent::L1BlockConsolidated(consolidation_outcome))
return Some(EngineDriverEvent::L1BlockConsolidated(consolidation_outcome));
}
Err(err) => {
tracing::error!(target: "scroll::engine", ?err, "failed to consolidate block derived from L1");
Expand All @@ -247,7 +303,7 @@ where
self.metrics.build_new_payload_duration.record(duration.as_secs_f64());
self.metrics.gas_per_block.record(block.gas_used as f64);

return Some(EngineDriverEvent::NewPayload(block))
return Some(EngineDriverEvent::NewPayload(block));
}
Err(err) => {
tracing::error!(target: "scroll::engine", ?err, "failed to build new payload");
Expand Down
4 changes: 2 additions & 2 deletions crates/engine/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use alloy_rpc_types_engine::PayloadError;
use rollup_node_primitives::ScrollPayloadAttributesWithBatchInfo;
use rollup_node_primitives::{ScrollPayloadAttributesWithBatchInfo, WithBlockNumber};
use scroll_alloy_provider::ScrollEngineApiError;
use scroll_alloy_rpc_types_engine::ScrollPayloadAttributes;

Expand All @@ -21,7 +21,7 @@ pub enum EngineDriverError {
/// The payload id field is missing in the forkchoice update response for an L1 consolidation
/// job.
#[error("Forkchoice update response missing payload id for L1 consolidation job")]
L1ConsolidationMissingPayloadId(ScrollPayloadAttributesWithBatchInfo),
L1ConsolidationMissingPayloadId(WithBlockNumber<ScrollPayloadAttributesWithBatchInfo>),
/// The payload id field is missing in the forkchoice update response for a payload building
/// job.
#[error("Forkchoice update response missing payload id for payload building job")]
Expand Down
31 changes: 17 additions & 14 deletions crates/engine/src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use reth_scroll_engine_primitives::try_into_block;
use reth_scroll_primitives::ScrollBlock;
use rollup_node_primitives::{
BatchInfo, BlockInfo, ChainImport, L2BlockInfoWithL1Messages, MeteredFuture,
ScrollPayloadAttributesWithBatchInfo,
ScrollPayloadAttributesWithBatchInfo, WithBlockNumber,
};
use scroll_alloy_hardforks::ScrollHardforks;
use scroll_alloy_network::Scroll;
Expand Down Expand Up @@ -47,7 +47,7 @@ type L1ConsolidationFuture =
Pin<Box<dyn Future<Output = Result<ConsolidationOutcome, EngineDriverError>> + Send>>;

/// An enum that represents the different outcomes of an L1 consolidation job.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ConsolidationOutcome {
/// Represents a successful consolidation outcome with the consolidated block info and batch
/// info.
Expand Down Expand Up @@ -98,7 +98,7 @@ pub(crate) type OptimisticSyncFuture =
/// It can be a block import job, an L1 consolidation job, or a new payload processing.
pub(crate) enum EngineFuture {
ChainImport(ChainImportFuture),
L1Consolidation(L1ConsolidationFuture),
L1Consolidation(WithBlockNumber<L1ConsolidationFuture>),
NewPayload(NewPayloadFuture),
OptimisticSync(OptimisticSyncFuture),
}
Expand Down Expand Up @@ -127,18 +127,21 @@ impl EngineFuture {
client: Arc<EC>,
execution_payload_provider: P,
fcs: ForkchoiceState,
payload_attributes: ScrollPayloadAttributesWithBatchInfo,
payload_attributes: WithBlockNumber<ScrollPayloadAttributesWithBatchInfo>,
) -> Self
where
EC: ScrollEngineApi + Unpin + Send + Sync + 'static,
P: Provider<Scroll> + Unpin + Send + Sync + 'static,
{
Self::L1Consolidation(Box::pin(handle_payload_attributes(
client,
execution_payload_provider,
fcs,
payload_attributes,
)))
Self::L1Consolidation(WithBlockNumber::new(
payload_attributes.number,
Box::pin(handle_payload_attributes(
client,
execution_payload_provider,
fcs,
payload_attributes,
)),
))
}

/// Creates a new [`EngineFuture::NewPayload`] future from the provided parameters.
Expand All @@ -162,8 +165,8 @@ impl Future for EngineFuture {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<EngineDriverFutureResult> {
let this = self.get_mut();
match this {
Self::ChainImport(fut) => fut.as_mut().poll(cx).map(Into::into),
Self::L1Consolidation(fut) => fut.as_mut().poll(cx).map(Into::into),
Self::ChainImport(fut) => fut.inner.as_mut().poll(cx).map(Into::into),
Self::L1Consolidation(fut) => fut.inner.as_mut().poll(cx).map(Into::into),
Self::NewPayload(fut) => fut.as_mut().poll(cx).map(Into::into),
Self::OptimisticSync(fut) => fut.as_mut().poll(cx).map(Into::into),
}
Expand Down Expand Up @@ -258,7 +261,7 @@ async fn handle_payload_attributes<EC, P>(
client: Arc<EC>,
provider: P,
fcs: ForkchoiceState,
payload_attributes_with_batch_info: ScrollPayloadAttributesWithBatchInfo,
payload_attributes_with_batch_info: WithBlockNumber<ScrollPayloadAttributesWithBatchInfo>,
) -> Result<ConsolidationOutcome, EngineDriverError>
where
EC: ScrollEngineApi + Unpin + Send + Sync + 'static,
Expand All @@ -267,7 +270,7 @@ where
tracing::trace!(target: "scroll::engine::future", ?fcs, ?payload_attributes_with_batch_info, "handling payload attributes");

let ScrollPayloadAttributesWithBatchInfo { mut payload_attributes, batch_info } =
payload_attributes_with_batch_info.clone();
payload_attributes_with_batch_info.inner.clone();

let maybe_execution_payload = provider
.get_block((fcs.safe_block_info().number + 1).into())
Expand Down
2 changes: 1 addition & 1 deletion crates/manager/src/manager/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use scroll_engine::ConsolidationOutcome;
use scroll_network::NewBlockWithPeer;

/// An event that can be emitted by the rollup node manager.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub enum RollupManagerEvent {
/// A new block has been received from the network.
NewBlockReceived(NewBlockWithPeer),
Expand Down
15 changes: 6 additions & 9 deletions crates/manager/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,15 +276,12 @@ where
l2_head_block_info,
l2_safe_block_info,
} => {
// Update the [`EngineDriver`] fork choice state with the new L2 head info.
if let Some(l2_head_block_info) = l2_head_block_info {
self.engine.set_head_block_info(l2_head_block_info);
}

// Update the [`EngineDriver`] fork choice state with the new L2 safe info.
if let Some(safe_block_info) = l2_safe_block_info {
self.engine.set_safe_block_info(safe_block_info);
}
// Handle the reorg in the engine driver.
self.engine.handle_l1_reorg(
l1_block_number,
l2_head_block_info,
l2_safe_block_info,
);

// Update the [`Sequencer`] with the new L1 head info and queue index.
if let Some(sequencer) = self.sequencer.as_mut() {
Expand Down
2 changes: 1 addition & 1 deletion crates/network/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use reth_network_api::PeerId;
use reth_scroll_primitives::ScrollBlock;

/// A new block with the peer id that it was received from.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub struct NewBlockWithPeer {
pub peer_id: PeerId,
pub block: ScrollBlock,
Expand Down
Loading
Loading