@@ -3,7 +3,10 @@ use crate::{
3
3
primitives:: reth:: ExecutionInfo ,
4
4
tx_signer:: Signer ,
5
5
} ;
6
- use alloy_consensus:: { Eip658Value , Header , Transaction , Typed2718 , EMPTY_OMMER_ROOT_HASH } ;
6
+ use alloy_consensus:: {
7
+ constants:: EMPTY_WITHDRAWALS , Eip658Value , Header , Transaction , Typed2718 ,
8
+ EMPTY_OMMER_ROOT_HASH ,
9
+ } ;
7
10
use alloy_eips:: { merge:: BEACON_NONCE , Encodable2718 } ;
8
11
use alloy_op_evm:: block:: receipt_builder:: OpReceiptBuilder ;
9
12
use alloy_primitives:: { map:: HashMap , Address , Bytes , B256 , U256 } ;
@@ -29,7 +32,7 @@ use reth_evm::{
29
32
use reth_execution_types:: ExecutionOutcome ;
30
33
use reth_node_api:: { NodePrimitives , NodeTypesWithEngine , TxTy } ;
31
34
use reth_optimism_chainspec:: OpChainSpec ;
32
- use reth_optimism_consensus:: calculate_receipt_root_no_memo_optimism;
35
+ use reth_optimism_consensus:: { calculate_receipt_root_no_memo_optimism, isthmus } ;
33
36
use reth_optimism_evm:: { OpEvmConfig , OpNextBlockEnvAttributes } ;
34
37
use reth_optimism_forks:: OpHardforks ;
35
38
use reth_optimism_node:: OpEngineTypes ;
@@ -60,14 +63,25 @@ use rollup_boost::{
60
63
ExecutionPayloadBaseV1 , ExecutionPayloadFlashblockDeltaV1 , FlashblocksPayloadV1 ,
61
64
} ;
62
65
use serde:: { Deserialize , Serialize } ;
63
- use std:: sync:: { Arc , Mutex } ;
66
+ use std:: {
67
+ sync:: { Arc , Mutex } ,
68
+ time:: Duration ,
69
+ } ;
64
70
use tokio:: {
65
71
net:: { TcpListener , TcpStream } ,
66
72
sync:: mpsc,
67
73
} ;
68
74
use tokio_tungstenite:: { accept_async, WebSocketStream } ;
69
75
use tokio_util:: sync:: CancellationToken ;
70
- use tracing:: { debug, trace, warn} ;
76
+ use tracing:: { debug, error, trace, warn} ;
77
+
78
+ /// Flashblocks specific payload building errors.
79
+ #[ derive( Debug , thiserror:: Error ) ]
80
+ pub enum FlashblockPayloadBuilderError {
81
+ /// Thrown when the job was cancelled.
82
+ #[ error( "error sending build signal" ) ]
83
+ SendBuildSignalError ,
84
+ }
71
85
72
86
#[ derive( Debug , Serialize , Deserialize ) ]
73
87
struct FlashblocksMetadata < N : NodePrimitives > {
@@ -395,65 +409,115 @@ where
395
409
396
410
let mut flashblock_count = 0 ;
397
411
398
- // 2. loop every n time and try to build an increasing block
399
- loop {
400
- if ctx. cancel . is_cancelled ( ) {
401
- tracing:: info!(
402
- target: "payload_builder" ,
403
- "Job cancelled, stopping payload building" ,
404
- ) ;
405
- // if the job was cancelled, stop
406
- return Ok ( ( ) ) ;
407
- }
408
-
409
- println ! (
410
- "Building flashblock {} {}" ,
411
- ctx. payload_id( ) ,
412
- flashblock_count,
413
- ) ;
412
+ // Create a channel to coordinate flashblock building
413
+ let ( build_tx, mut build_rx) = mpsc:: channel ( 1 ) ;
414
414
415
- tracing:: info!(
416
- target: "payload_builder" ,
417
- "Building flashblock {}" ,
418
- flashblock_count,
419
- ) ;
420
-
421
- let state = StateProviderDatabase :: new ( & state_provider) ;
422
-
423
- let mut db = State :: builder ( )
424
- . with_database ( state)
425
- . with_bundle_update ( )
426
- . with_bundle_prestate ( bundle_state)
427
- . build ( ) ;
415
+ // Spawn the timer task that signals when to build a new flashblock
416
+ let cancel_clone = ctx. cancel . clone ( ) ;
417
+ let flashblock_block_time = self . flashblock_block_time ;
418
+ tokio:: spawn ( async move {
419
+ let mut interval = tokio:: time:: interval ( Duration :: from_millis ( flashblock_block_time) ) ;
420
+ loop {
421
+ tokio:: select! {
422
+ // Add a cancellation check that only runs every 10ms to avoid tight polling
423
+ _ = tokio:: time:: sleep( Duration :: from_millis( 10 ) ) => {
424
+ if cancel_clone. is_cancelled( ) {
425
+ tracing:: info!( target: "payload_builder" , "Job cancelled during sleep, stopping payload building" ) ;
426
+ drop( build_tx) ;
427
+ break ;
428
+ }
429
+ }
430
+ _ = interval. tick( ) => {
431
+ if let Err ( err) = build_tx. send( ( ) ) . await {
432
+ error!( target: "payload_builder" , "Error sending build signal: {}" , err) ;
433
+ break ;
434
+ }
435
+ }
436
+ }
437
+ }
438
+ } ) ;
428
439
429
- let best_txs = BestPayloadTransactions :: new (
430
- self . pool
431
- . best_transactions_with_attributes ( ctx. best_transaction_attributes ( ) ) ,
432
- ) ;
433
- ctx. execute_best_transactions ( & mut info, & mut db, best_txs, total_gas_per_batch) ?;
440
+ // Process flashblocks in a blocking loop
441
+ loop {
442
+ // Block on receiving a message, break on cancellation or closed channel
443
+ let received = tokio:: task:: block_in_place ( || {
444
+ // Get runtime handle
445
+ let rt = tokio:: runtime:: Handle :: current ( ) ;
446
+
447
+ // Run the async operation to completion, blocking the current thread
448
+ rt. block_on ( async {
449
+ // Check for cancellation first
450
+ if ctx. cancel . is_cancelled ( ) {
451
+ tracing:: info!(
452
+ target: "payload_builder" ,
453
+ "Job cancelled, stopping payload building" ,
454
+ ) ;
455
+ return None ;
456
+ }
434
457
435
- if ctx. cancel . is_cancelled ( ) {
436
- tracing:: info!(
437
- target: "payload_builder" ,
438
- "Job cancelled, stopping payload building" ,
439
- ) ;
440
- // if the job was cancelled, stop
441
- return Ok ( ( ) ) ;
442
- }
458
+ // Wait for next message
459
+ build_rx. recv ( ) . await
460
+ } )
461
+ } ) ;
443
462
444
- let ( payload, mut fb_payload, new_bundle_state) = build_block ( db, & ctx, & mut info) ?;
463
+ // Exit loop if channel closed or cancelled
464
+ match received {
465
+ Some ( ( ) ) => {
466
+ // Continue with flashblock building
467
+ tracing:: info!(
468
+ target: "payload_builder" ,
469
+ "Building flashblock {}" ,
470
+ flashblock_count,
471
+ ) ;
472
+
473
+ let state = StateProviderDatabase :: new ( & state_provider) ;
474
+
475
+ let mut db = State :: builder ( )
476
+ . with_database ( state)
477
+ . with_bundle_update ( )
478
+ . with_bundle_prestate ( bundle_state)
479
+ . build ( ) ;
480
+
481
+ let best_txs = BestPayloadTransactions :: new (
482
+ self . pool
483
+ . best_transactions_with_attributes ( ctx. best_transaction_attributes ( ) ) ,
484
+ ) ;
485
+ ctx. execute_best_transactions (
486
+ & mut info,
487
+ & mut db,
488
+ best_txs,
489
+ total_gas_per_batch,
490
+ ) ?;
491
+
492
+ if ctx. cancel . is_cancelled ( ) {
493
+ tracing:: info!(
494
+ target: "payload_builder" ,
495
+ "Job cancelled, stopping payload building" ,
496
+ ) ;
497
+ // if the job was cancelled, stop
498
+ return Ok ( ( ) ) ;
499
+ }
445
500
446
- best_payload. set ( payload. clone ( ) ) ;
501
+ let ( new_payload, mut fb_payload, new_bundle_state) =
502
+ build_block ( db, & ctx, & mut info) ?;
447
503
448
- fb_payload. index = flashblock_count + 1 ; // we do this because the fallback block is index 0
449
- fb_payload. base = None ;
450
- let _ = self . send_message ( serde_json:: to_string ( & fb_payload) . unwrap_or_default ( ) ) ;
504
+ fb_payload. index = flashblock_count + 1 ; // we do this because the fallback block is index 0
505
+ fb_payload. base = None ;
506
+ let _ =
507
+ self . send_message ( serde_json:: to_string ( & fb_payload) . unwrap_or_default ( ) ) ;
451
508
452
- bundle_state = new_bundle_state;
453
- total_gas_per_batch += gas_per_batch;
454
- flashblock_count += 1 ;
509
+ best_payload. set ( new_payload. clone ( ) ) ;
510
+ bundle_state = new_bundle_state;
511
+ total_gas_per_batch += gas_per_batch;
512
+ flashblock_count += 1 ;
455
513
456
- std:: thread:: sleep ( std:: time:: Duration :: from_millis ( self . flashblock_block_time ) ) ;
514
+ tracing:: info!( target: "payload_builder" , "Flashblock {} built" , flashblock_count) ;
515
+ }
516
+ None => {
517
+ // Exit loop if channel closed or cancelled
518
+ return Ok ( ( ) ) ;
519
+ }
520
+ }
457
521
}
458
522
}
459
523
}
@@ -501,6 +565,7 @@ where
501
565
block_number,
502
566
vec ! [ ] ,
503
567
) ;
568
+
504
569
let receipts_root = execution_outcome
505
570
. generic_receipts_root_slow ( block_number, |receipts| {
506
571
calculate_receipt_root_no_memo_optimism (
@@ -531,6 +596,25 @@ where
531
596
} ) ?
532
597
} ;
533
598
599
+ let withdrawals_root = if ctx
600
+ . chain_spec
601
+ . is_isthmus_active_at_timestamp ( ctx. attributes ( ) . timestamp ( ) )
602
+ {
603
+ // withdrawals root field in block header is used for storage root of L2 predeploy
604
+ // `l2tol1-message-passer`
605
+ Some (
606
+ isthmus:: withdrawals_root ( & execution_outcome. state ( ) , state. database . as_ref ( ) )
607
+ . map_err ( PayloadBuilderError :: other) ?,
608
+ )
609
+ } else if ctx
610
+ . chain_spec
611
+ . is_canyon_active_at_timestamp ( ctx. attributes ( ) . timestamp ( ) )
612
+ {
613
+ Some ( EMPTY_WITHDRAWALS )
614
+ } else {
615
+ None
616
+ } ;
617
+
534
618
// create the block header
535
619
let transactions_root = proofs:: calculate_transaction_root ( & info. executed_transactions ) ;
536
620
@@ -547,7 +631,7 @@ where
547
631
state_root,
548
632
transactions_root,
549
633
receipts_root,
550
- withdrawals_root : None ,
634
+ withdrawals_root,
551
635
logs_bloom,
552
636
timestamp : ctx. attributes ( ) . payload_attributes . timestamp ,
553
637
mix_hash : ctx. attributes ( ) . payload_attributes . prev_randao ,
@@ -961,30 +1045,19 @@ where
961
1045
where
962
1046
DB : Database < Error = ProviderError > ,
963
1047
{
964
- println ! ( "Executing best transactions" ) ;
965
1048
let base_fee = self . base_fee ( ) ;
966
1049
967
1050
let mut evm = self . evm_config . evm_with_env ( & mut * db, self . evm_env . clone ( ) ) ;
968
1051
969
1052
while let Some ( tx) = best_txs. next ( ( ) ) {
970
1053
let tx = tx. into_consensus ( ) ;
971
- println ! ( "tx: {:?}" , tx) ;
972
- println ! (
973
- "gas limit: {:?}, batch gas limit: {:?} cummulative gas used: {:?}" ,
974
- tx. gas_limit( ) ,
975
- batch_gas_limit,
976
- info. cumulative_gas_used
977
- ) ;
978
- // gas limit: 100816112, batch gas limit: 2500000000 cummulative gas used: 100062216
979
-
980
1054
// check in info if the txn has been executed already
981
1055
if info. executed_transactions . contains ( & tx) {
982
1056
continue ;
983
1057
}
984
1058
985
1059
// ensure we still have capacity for this transaction
986
1060
if info. is_tx_over_limits ( tx. inner ( ) , batch_gas_limit, None , None ) {
987
- println ! ( "A" ) ;
988
1061
// we can't fit this transaction into the block, so we need to mark it as
989
1062
// invalid which also removes all dependent transaction from
990
1063
// the iterator before we can continue
@@ -1001,11 +1074,9 @@ where
1001
1074
1002
1075
// check if the job was cancelled, if so we can exit early
1003
1076
if self . cancel . is_cancelled ( ) {
1004
- println ! ( "C" ) ;
1005
1077
return Ok ( Some ( ( ) ) ) ;
1006
1078
}
1007
1079
1008
- println ! ( "Start transaction" ) ;
1009
1080
let ResultAndState { result, state } = match evm. transact ( & tx) {
1010
1081
Ok ( res) => res,
1011
1082
Err ( err) => {
@@ -1026,7 +1097,6 @@ where
1026
1097
return Err ( PayloadBuilderError :: EvmExecutionError ( Box :: new ( err) ) ) ;
1027
1098
}
1028
1099
} ;
1029
- println ! ( "Finish transaction" ) ;
1030
1100
1031
1101
// add gas used by the transaction to cumulative gas used, before creating the
1032
1102
// receipt
0 commit comments