Skip to content
Merged
589 changes: 364 additions & 225 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions crates/rbuilder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ crossbeam = "0.8.4"
parking_lot.workspace = true
dashmap = "6.1.0"

# IPC state provider deps
reipc = { git = "https://github.com/nethermindeth/reipc.git", rev = "8a9c31f7a4b7dfdd828020222ae1ccdff802cbc9" }
quick_cache = "0.6.11"



[build-dependencies]
built = { version = "0.7.1", features = ["git2", "chrono"] }
Expand Down
2 changes: 1 addition & 1 deletion crates/rbuilder/src/backtest/backtest_build_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ where
result
};

let provider_factory = config.base_config().create_provider_factory(true)?;
let provider_factory = config.base_config().create_reth_provider_factory(true)?;
let chain_spec = config.base_config().chain_spec()?;

let mut profits = Vec::new();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl<ConfigType: LiveBuilderConfig>
&self,
) -> eyre::Result<ProviderFactoryReopener<NodeTypesWithDBAdapter<EthereumNode, Arc<DatabaseEnv>>>>
{
self.config.base_config().create_provider_factory(true)
self.config.base_config().create_reth_provider_factory(true)
}

fn create_block_building_context(&self) -> eyre::Result<BlockBuildingContext> {
Expand Down
2 changes: 1 addition & 1 deletion crates/rbuilder/src/backtest/redistribute/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ where
let mut historical_data_storage =
HistoricalDataStorage::new_from_path(&config.base_config().backtest_fetch_output_file)
.await?;
let provider = config.base_config().create_provider_factory(true)?;
let provider = config.base_config().create_reth_provider_factory(true)?;
let csv_writer = cli
.csv
.map(|path| -> io::Result<_> { CSVResultWriter::new(path) })
Expand Down
2 changes: 1 addition & 1 deletion crates/rbuilder/src/bin/debug-bench-machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async fn main() -> eyre::Result<()> {

let chain_spec = config.base_config().chain_spec()?;

let provider_factory = config.base_config().create_provider_factory(false)?;
let provider_factory = config.base_config().create_reth_provider_factory(false)?;

let last_block = provider_factory.last_block_number()?;

Expand Down
6 changes: 3 additions & 3 deletions crates/rbuilder/src/bin/dummy-builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use rbuilder::{
block_list_provider::NullBlockListProvider,
config::create_provider_factory,
order_input::{
OrderInputConfig, DEFAULT_INPUT_CHANNEL_BUFFER_SIZE, DEFAULT_RESULTS_CHANNEL_TIMEOUT,
DEFAULT_SERVE_MAX_CONNECTIONS,
MempoolSource, OrderInputConfig, DEFAULT_INPUT_CHANNEL_BUFFER_SIZE,
DEFAULT_RESULTS_CHANNEL_TIMEOUT, DEFAULT_SERVE_MAX_CONNECTIONS,
},
payload_events::{MevBoostSlotData, MevBoostSlotDataGenerator},
simulation::SimulatedOrderCommand,
Expand Down Expand Up @@ -76,7 +76,7 @@ async fn main() -> eyre::Result<()> {
let order_input_config = OrderInputConfig::new(
false,
true,
Some(PathBuf::from(DEFAULT_EL_NODE_IPC_PATH)),
Some(MempoolSource::Ipc(PathBuf::from(DEFAULT_EL_NODE_IPC_PATH))),
DEFAULT_INCOMING_BUNDLES_PORT,
default_ip(),
DEFAULT_SERVE_MAX_CONNECTIONS,
Expand Down
24 changes: 22 additions & 2 deletions crates/rbuilder/src/live_builder/base_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
use crate::{
building::builders::UnfinishedBlockBuildingSinkFactory,
live_builder::{order_input::OrderInputConfig, LiveBuilder},
provider::StateProviderFactory,
provider::{
ipc_state_provider::{IpcProviderConfig, IpcStateProviderFactory},
StateProviderFactory,
},
roothash::RootHashContext,
telemetry::{setup_reloadable_tracing_subscriber, LoggerConfig},
utils::{
Expand Down Expand Up @@ -132,6 +135,9 @@ pub struct BaseConfig {
/// List of `builders` to be used for live building
pub live_builders: Vec<String>,

/// Config for IPC state provider
pub ipc_provider: Option<IpcProviderConfig>,

// backtest config
backtest_fetch_mempool_data_dir: EnvOrValue<String>,
pub backtest_fetch_eth_rpc_url: String,
Expand Down Expand Up @@ -271,7 +277,7 @@ impl BaseConfig {

/// Open reth db and DB should be opened once per process but it can be cloned and moved to different threads.
/// skip_root_hash -> will create a mock roothasher. Used on backtesting since reth can't compute roothashes on the past.
pub fn create_provider_factory(
pub fn create_reth_provider_factory(
&self,
skip_root_hash: bool,
) -> eyre::Result<ProviderFactoryReopener<NodeTypesWithDBAdapter<EthereumNode, Arc<DatabaseEnv>>>>
Expand All @@ -290,6 +296,19 @@ impl BaseConfig {
)
}

/// Opens IPC connection to node that will provide the sate
pub fn create_ipc_provider_factory(&self) -> eyre::Result<IpcStateProviderFactory> {
let ipc_provider_config = self
.ipc_provider
.as_ref()
.ok_or_else(|| eyre::eyre!("IPC provider not configured"))?;

Ok(IpcStateProviderFactory::new(
&ipc_provider_config.ipc_path,
Duration::from_millis(ipc_provider_config.request_timeout_ms),
))
}

/// live_root_hash_config creates a root hash thread pool
/// so it should be called once on the startup and cloned if needed
pub fn live_root_hash_config(&self) -> eyre::Result<RootHashContext> {
Expand Down Expand Up @@ -554,6 +573,7 @@ impl Default for BaseConfig {
sbundle_mergeable_signers: None,
sbundle_mergeabe_signers: None,
require_non_empty_blocklist: Some(DEFAULT_REQUIRE_NON_EMPTY_BLOCKLIST),
ipc_provider: None,
}
}
}
Expand Down
23 changes: 19 additions & 4 deletions crates/rbuilder/src/live_builder/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,6 @@ where
let config: ConfigType = load_config_toml_and_env(cli.config)?;
config.base_config().setup_tracing_subscriber()?;

let cancel = CancellationToken::new();

// Spawn redacted server that is safe for tdx builders to expose
telemetry::servers::redacted::spawn(config.base_config().redacted_telemetry_server_address())
.await?;
Expand All @@ -120,7 +118,25 @@ where
config.base_config().log_enable_dynamic,
)
.await?;
let provider = config.base_config().create_provider_factory(false)?;
if config.base_config().ipc_provider.is_some() {
let provider = config.base_config().create_ipc_provider_factory()?;
run_builder(provider, config, on_run).await
} else {
let provider = config.base_config().create_reth_provider_factory(false)?;
run_builder(provider, config, on_run).await
}
}

async fn run_builder<P, ConfigType>(
provider: P,
config: ConfigType,
on_run: Option<fn()>,
) -> eyre::Result<()>
where
ConfigType: LiveBuilderConfig,
P: StateProviderFactory + Clone + 'static,
{
let cancel = CancellationToken::new();
let builder = config.new_builder(provider, cancel.clone()).await?;

let ctrlc = tokio::spawn(async move {
Expand All @@ -131,7 +147,6 @@ where
on_run();
}
builder.run().await?;

ctrlc.await.unwrap_or_default();
Ok(())
}
37 changes: 23 additions & 14 deletions crates/rbuilder/src/live_builder/order_input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,21 @@ impl Drop for AutoRemovingOrderPoolSubscriptionId {
}
}

#[derive(Debug, Clone)]
pub enum MempoolSource {
Ipc(PathBuf),
Ws(String),
}

/// All the info needed to start all the order related jobs (mempool, rcp, clean)
#[derive(Debug, Clone)]
pub struct OrderInputConfig {
/// if true - cancellations are disabled.
ignore_cancellable_orders: bool,
/// if true -- txs with blobs are ignored
ignore_blobs: bool,
/// Path to reth ipc
ipc_path: Option<PathBuf>,
/// Tx pool source
mempool_source: Option<MempoolSource>,
/// Input RPC port
server_port: u16,
/// Input RPC ip
Expand All @@ -103,7 +109,7 @@ impl OrderInputConfig {
pub fn new(
ignore_cancellable_orders: bool,
ignore_blobs: bool,
ipc_path: Option<PathBuf>,
mempool_source: Option<MempoolSource>,
server_port: u16,
server_ip: Ipv4Addr,
serve_max_connections: u32,
Expand All @@ -113,7 +119,7 @@ impl OrderInputConfig {
Self {
ignore_cancellable_orders,
ignore_blobs,
ipc_path,
mempool_source,
server_port,
server_ip,
serve_max_connections,
Expand All @@ -123,16 +129,19 @@ impl OrderInputConfig {
}

pub fn from_config(config: &BaseConfig) -> eyre::Result<Self> {
let el_node_ipc_path = config
.el_node_ipc_path
.as_ref()
.map(|p| expand_path(p.as_path()))
.transpose()?;
let mempool = if let Some(provider) = &config.ipc_provider {
Some(MempoolSource::Ws(provider.mempool_server_url.clone()))
} else if let Some(path) = &config.el_node_ipc_path {
let expanded_path = expand_path(path.as_path())?;
Some(MempoolSource::Ipc(expanded_path))
} else {
None
};

Ok(OrderInputConfig {
ignore_cancellable_orders: config.ignore_cancellable_orders,
ignore_blobs: config.ignore_blobs,
ipc_path: el_node_ipc_path,
mempool_source: mempool,
server_port: config.jsonrpc_server_port,
server_ip: config.jsonrpc_server_ip,
serve_max_connections: 4096,
Expand All @@ -143,7 +152,7 @@ impl OrderInputConfig {

pub fn default_e2e() -> Self {
Self {
ipc_path: Some(PathBuf::from("/tmp/anvil.ipc")),
mempool_source: Some(MempoolSource::Ipc(PathBuf::from("/tmp/anvil.ipc"))),
results_channel_timeout: Duration::new(5, 0),
ignore_cancellable_orders: false,
ignore_blobs: false,
Expand Down Expand Up @@ -222,8 +231,8 @@ where

let mut handles = vec![clean_job, rpc_server];

if config.ipc_path.is_some() {
info!("IPC path configured, starting txpool subscription");
if config.mempool_source.is_some() {
info!("Txpool source configured, starting txpool subscription");
let txpool_fetcher = txpool_fetcher::subscribe_to_txpool_with_blobs(
config.clone(),
order_sender.clone(),
Expand All @@ -232,7 +241,7 @@ where
.await?;
handles.push(txpool_fetcher);
} else {
info!("No IPC path configured, skipping txpool subscription");
info!("No Txpool source configured, skipping txpool subscription");
}

let handle = tokio::spawn(async move {
Expand Down
25 changes: 17 additions & 8 deletions crates/rbuilder/src/live_builder/order_input/txpool_fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{OrderInputConfig, ReplaceableOrderPoolCommand};
use super::{MempoolSource, OrderInputConfig, ReplaceableOrderPoolCommand};
use crate::{
primitives::{MempoolTx, Order, TransactionSignedEcRecoveredWithBlobs},
telemetry::{add_txfetcher_time_to_query, mark_command_received},
Expand All @@ -24,11 +24,20 @@ pub async fn subscribe_to_txpool_with_blobs(
results: mpsc::Sender<ReplaceableOrderPoolCommand>,
global_cancel: CancellationToken,
) -> eyre::Result<JoinHandle<()>> {
let ipc_path = config
.ipc_path
.ok_or_else(|| eyre::eyre!("No IPC path configured"))?;
let ipc = IpcConnect::new(ipc_path);
let provider = ProviderBuilder::new().on_ipc(ipc).await?;
let mempool = config
.mempool_source
.ok_or_else(|| eyre::eyre!("No txpool source configured"))?;

let provider = match mempool {
MempoolSource::Ipc(path) => {
let ipc = IpcConnect::new(path);
ProviderBuilder::new().on_ipc(ipc).await?
}
MempoolSource::Ws(url) => {
let ws_conn = alloy_provider::WsConnect::new(url);
ProviderBuilder::new().on_ws(ws_conn).await?
}
};

let handle = tokio::spawn(async move {
info!("Subscribe to txpool with blobs: started");
Expand All @@ -55,7 +64,7 @@ pub async fn subscribe_to_txpool_with_blobs(
continue;
}
Err(err) => {
error!(?tx_hash, ?err, "Failed to get tx pool");
error!(?tx_hash, ?err, "Failed to get tx from pool");
continue;
}
};
Expand Down Expand Up @@ -127,7 +136,7 @@ mod test {
let (sender, mut receiver) = mpsc::channel(10);
subscribe_to_txpool_with_blobs(
OrderInputConfig {
ipc_path: Some(PathBuf::from("/tmp/anvil.ipc")),
mempool_source: Some(MempoolSource::Ipc(PathBuf::from("/tmp/anvil.ipc"))),
..OrderInputConfig::default_e2e()
},
sender,
Expand Down
Loading
Loading