Skip to content

feat: FlashblocksPubSubManager #369

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 68 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
b60eee0
chore: simplify logic
0xKitsune Jun 27, 2025
a73f64b
chore: clippy
0xKitsune Jun 27, 2025
5351ca7
feat: add flashblocks manager
0xKitsune Jun 28, 2025
f5aa80b
docs: comments
0xKitsune Jun 28, 2025
081e892
feat: spawn ping task
0xKitsune Jun 29, 2025
c5cfac0
chore: update spawn ping function
0xKitsune Jun 29, 2025
ac94115
feat: flashblocks provider
0xKitsune Jun 29, 2025
06eb3c7
feat: handle flashblocks stream
0xKitsune Jun 29, 2025
d0ce931
feat: flashblock stream reconnect logic
0xKitsune Jun 29, 2025
5285dfb
wip: fix associated methods
0xKitsune Jun 29, 2025
aaecd24
feat: reestablish flashblocks stream upon missed pong
0xKitsune Jun 30, 2025
3455df4
wip: flashblocks publisher
0xKitsune Jun 30, 2025
5788b4b
fix: broadcast channel lifetimes
0xKitsune Jun 30, 2025
82f2251
chore: update error type
0xKitsune Jun 30, 2025
983a189
wip: update publisher spawn function
0xKitsune Jun 30, 2025
2046c94
wip: flashblocks provider
0xKitsune Jun 30, 2025
65c60af
fix: fix flashblocks initilization
0xKitsune Jun 30, 2025
8ba1749
feat: engine api ext logic for flashblock provider
0xKitsune Jul 3, 2025
5814a54
chore: clippy
0xKitsune Jul 3, 2025
c740581
wip: update FlashblockProvider::new
0xKitsune Jul 3, 2025
468668d
chore: fix function visibility
0xKitsune Jul 3, 2025
b7112fb
wip: update stream handling logic
0xKitsune Jul 4, 2025
669d637
wip: publisher logic
0xKitsune Jul 4, 2025
22f8cc2
feat: simplify ws streaming logic
0xKitsune Jul 4, 2025
12982d5
docs: remove comment
0xKitsune Jul 4, 2025
c6e1d6e
feat: use parking_lot::Mutex
0xKitsune Jul 4, 2025
f51e67f
feat: add flashblock provider metrics
0xKitsune Jul 4, 2025
0c5f86d
feat: simplify pubsub manager and provider
0xKitsune Jul 5, 2025
5a717e0
feat: update builder stream handling
0xKitsune Jul 5, 2025
9652dd7
wip: tests
0xKitsune Jul 5, 2025
d8649e4
chore: remove unused mods
0xKitsune Jul 6, 2025
9312acc
chore: merge main
0xKitsune Jul 6, 2025
1ebf099
chore: update metrics
0xKitsune Jul 6, 2025
49518f5
test: subscriber ping pong
0xKitsune Jul 7, 2025
5d2daba
test: fix missing pong assertion
0xKitsune Jul 7, 2025
1a4395c
test: fix missing pong test
0xKitsune Jul 7, 2025
5225f2a
test: send flashblock, payload id mismatch
0xKitsune Jul 8, 2025
d3dde72
test: fix tests
0xKitsune Jul 8, 2025
5c245ff
fix: fix payload id mismatch test
0xKitsune Jul 8, 2025
6a07842
chore: update pubsub test importa
0xKitsune Jul 8, 2025
4d2e422
test: malformed payload
0xKitsune Jul 8, 2025
7fe063c
fix: malformed flashblocks test
0xKitsune Jul 8, 2025
cfe14a4
chore: remove unused flashblock payload
0xKitsune Jul 8, 2025
c66147b
chore: handle error when accepting listener
0xKitsune Jul 8, 2025
33e708d
test: publish flashblock
0xKitsune Jul 9, 2025
1e5a873
chore: update publish flashblock test
0xKitsune Jul 9, 2025
4d04633
test: update publish flashblock test
0xKitsune Jul 9, 2025
e9a7fe8
test: remove test
0xKitsune Jul 9, 2025
de4d520
chore: clippy
0xKitsune Jul 9, 2025
eeb7b57
test: provider tests
0xKitsune Jul 9, 2025
f44c747
fix: fix send flashblock test
0xKitsune Jul 9, 2025
a50c81f
chore: merge main
0xKitsune Jul 9, 2025
930b10d
feat: metrics
0xKitsune Jul 10, 2025
646d643
feat: make ws backoff configurable
0xKitsune Jul 10, 2025
b0c987b
chore: clippy
0xKitsune Jul 10, 2025
6fc7081
chore: clippy
0xKitsune Jul 10, 2025
ad7a5b9
chore: merge main
0xKitsune Jul 10, 2025
e9c1a8d
chore: fix nits, log errors
0xKitsune Jul 11, 2025
88f9b51
test: fix malformed payload test
0xKitsune Jul 11, 2025
5fb732d
test: fix test_payload_id_mismatch
0xKitsune Jul 11, 2025
985cd42
fix: wip send flashblocks test
0xKitsune Jul 11, 2025
c6bc7b5
test: test reconnect stream, fix missing pong test
0xKitsune Jul 16, 2025
faf9096
test: fix publish flashblocks
0xKitsune Jul 16, 2025
761ebd4
Merge branch 'main' into kit/flashblocks-manager
0xKitsune Jul 16, 2025
819c4b7
test: fix test_send_flashblock
0xKitsune Jul 16, 2025
d9ea023
chore: clippy
0xKitsune Jul 16, 2025
dcdb0c9
chore: fmt
0xKitsune Jul 16, 2025
65647e9
test: fix publish flashblock test
0xKitsune Jul 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions crates/flashblocks-rpc/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ use reth_primitives::Recovered;
use reth_primitives_traits::block::body::BlockBody;

use reth_rpc_eth_api::{RpcBlock, RpcReceipt};
use rollup_boost::{
FlashblockBuilder, FlashblocksPayloadV1, OpExecutionPayloadEnvelope, PayloadVersion,
};
use rollup_boost::provider::FlashblockBuilder;
use rollup_boost::{FlashblocksPayloadV1, OpExecutionPayloadEnvelope, PayloadVersion};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, str::FromStr, sync::Arc};

Expand Down
11 changes: 5 additions & 6 deletions crates/flashblocks-rpc/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ mod tests {
let tx2 = Bytes::from_str("0xf8cd82016d8316e5708302c01c94f39635f2adf40608255779ff742afe13de31f57780b8646e530e9700000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000001bc16d674ec8000000000000000000000000000000000000000000000000000156ddc81eed2a36d68302948ba0a608703e79b22164f74523d188a11f81c25a65dd59535bab1cd1d8b30d115f3ea07f4cfbbad77a139c9209d3bded89091867ff6b548dd714109c61d1f8e7a84d14").unwrap();

// Send another test flashblock payload
let payload = FlashblocksPayloadV1 {

FlashblocksPayloadV1 {
payload_id: PayloadId::new([0; 8]),
index: 1,
base: None,
Expand Down Expand Up @@ -222,9 +223,7 @@ mod tests {
},
})
.unwrap(),
};

payload
}
}

#[tokio::test]
Expand All @@ -243,7 +242,7 @@ mod tests {
let pending_block = provider
.get_block_by_number(alloy_eips::BlockNumberOrTag::Pending)
.await?;
assert_eq!(pending_block.is_none(), true);
assert!(pending_block.is_none());

let base_payload = create_first_payload();
node.send_payload(base_payload).await?;
Expand Down Expand Up @@ -296,7 +295,7 @@ mod tests {
let provider = node.provider().await?;

let receipt = provider.get_transaction_receipt(TX1_HASH).await?;
assert_eq!(receipt.is_none(), true);
assert!(receipt.is_none());

node.send_test_payloads().await?;

Expand Down
50 changes: 28 additions & 22 deletions crates/rollup-boost/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
use crate::{
BlockSelectionPolicy, FlashblocksArgs, ProxyLayer, RollupBoostServer, RpcClient,
client::rpc::{BuilderArgs, L2ClientArgs},
debug_api::ExecutionMode,
get_version, init_metrics,
payload::PayloadSource,
probe::ProbeLayer,
provider::FlashblocksProvider,
pubsub::FlashblocksPubSubManager,
};
use alloy_rpc_types_engine::JwtSecret;
use clap::Parser;
use eyre::bail;
Expand All @@ -8,18 +18,13 @@ use std::{
path::PathBuf,
str::FromStr,
sync::Arc,
time::Duration,
};
use tokio::signal::unix::{SignalKind, signal as unix_signal};
use tracing::{Level, info};

use crate::{
BlockSelectionPolicy, Flashblocks, FlashblocksArgs, ProxyLayer, RollupBoostServer, RpcClient,
client::rpc::{BuilderArgs, L2ClientArgs},
debug_api::ExecutionMode,
get_version, init_metrics,
payload::PayloadSource,
probe::ProbeLayer,
use tokio::{
net::TcpListener,
signal::unix::{SignalKind, signal as unix_signal},
};
use tracing::{Level, info};

#[derive(Clone, Parser, Debug)]
#[clap(author, version = get_version(), about)]
Expand Down Expand Up @@ -140,23 +145,24 @@ impl RollupBoostArgs {
let execution_mode = Arc::new(Mutex::new(self.execution_mode));

let (rpc_module, health_handle): (RpcModule<()>, _) = if self.flashblocks.flashblocks {
let flashblocks_args = self.flashblocks;
let inbound_url = flashblocks_args.flashblocks_builder_url;
let outbound_addr = SocketAddr::new(
IpAddr::from_str(&flashblocks_args.flashblocks_host)?,
flashblocks_args.flashblocks_port,
let builder_ws_url = self.flashblocks.flashblocks_builder_url;
let listener_addr = SocketAddr::new(
IpAddr::from_str(&self.flashblocks.flashblocks_host)?,
self.flashblocks.flashblocks_port,
);

let builder_client = Arc::new(Flashblocks::run(
builder_client.clone(),
inbound_url,
outbound_addr,
flashblocks_args.flashblock_builder_ws_reconnect_ms,
)?);
let listener = TcpListener::bind(listener_addr).await?;
let flashblocks_provider = Arc::new(FlashblocksProvider::new(builder_client));
FlashblocksPubSubManager::spawn(
builder_ws_url,
listener,
flashblocks_provider.clone(),
Duration::from_millis(self.flashblocks.flashblock_builder_ws_reconnect_ms),
);

let rollup_boost = RollupBoostServer::new(
l2_client,
builder_client,
flashblocks_provider,
execution_mode.clone(),
self.block_selection_policy,
probes.clone(),
Expand Down
Loading
Loading