diff --git a/Cargo.toml b/Cargo.toml index df68f0d..6689b57 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] -name = "zenith-builder-example" +name = "builder" version = "0.1.1" -description = "Zenith Builder Example" +description = "signet builder example" edition = "2024" rust-version = "1.85" diff --git a/bin/builder.rs b/bin/builder.rs index 8a7cc0e..a6a2f33 100644 --- a/bin/builder.rs +++ b/bin/builder.rs @@ -1,27 +1,27 @@ use builder::{ config::BuilderConfig, - service::serve_builder_with_span, + service::serve_builder, tasks::{ block::Simulator, bundler, metrics::MetricsTask, oauth::Authenticator, submit::SubmitTask, tx_poller, }, }; +use init4_bin_base::{deps::tracing, utils::calc::SlotCalculator}; use signet_sim::SimCache; -use signet_types::SlotCalculator; use std::sync::Arc; use tokio::select; +use tracing::info_span; // Note: Must be set to `multi_thread` to support async tasks. // See: https://docs.rs/tokio/latest/tokio/attr.main.html #[tokio::main(flavor = "multi_thread")] async fn main() -> eyre::Result<()> { let _guard = init4_bin_base::init4(); - - let span = tracing::info_span!("zenith-builder"); + let init_span_guard = info_span!("builder initialization"); let config = BuilderConfig::load_from_env()?.clone(); let constants = config.load_pecorino_constants(); - let authenticator = Authenticator::new(&config); + let authenticator = Authenticator::new(&config)?; let (host_provider, ru_provider, sequencer_signer) = tokio::try_join!( config.connect_host_provider(), @@ -35,7 +35,7 @@ async fn main() -> eyre::Result<()> { let (tx_channel, metrics_jh) = metrics.spawn(); let submit = SubmitTask { - authenticator: authenticator.clone(), + token: authenticator.token(), host_provider, zenith, client: reqwest::Client::new(), @@ -47,7 +47,7 @@ async fn main() -> eyre::Result<()> { let tx_poller = tx_poller::TxPoller::new(&config); let (tx_receiver, tx_poller_jh) = tx_poller.spawn(); - let bundle_poller = bundler::BundlePoller::new(&config, authenticator.clone()); + let bundle_poller = bundler::BundlePoller::new(&config, authenticator.token()); let (bundle_receiver, bundle_poller_jh) = bundle_poller.spawn(); let authenticator_jh = authenticator.spawn(); @@ -65,8 +65,11 @@ async fn main() -> eyre::Result<()> { let build_jh = sim.clone().spawn_simulator_task(constants, sim_items.clone(), submit_channel); - let port = config.builder_port; - let server = serve_builder_with_span(([0, 0, 0, 0], port), span); + let server = serve_builder(([0, 0, 0, 0], config.builder_port)); + + // We have finished initializing the builder, so we can drop the init span + // guard. + drop(init_span_guard); select! { _ = tx_poller_jh => { diff --git a/src/service.rs b/src/service.rs index df14133..446d737 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,48 +1,10 @@ -use std::{fmt::Debug, net::SocketAddr}; - use axum::{ Router, http::StatusCode, response::{IntoResponse, Response}, routing::get, }; -use tracing::{Instrument, Span}; - -/// App result -pub type AppResult = Result; - -/// App error. This is a wrapper around eyre::Report that also includes an HTTP -/// status code. It implements [`IntoResponse`] so that it can be returned as an -/// error type from [`axum::handler::Handler`]s. -#[derive(Debug)] -pub struct AppError { - code: StatusCode, - eyre: eyre::Report, -} - -impl AppError { - /// Instantiate a new error with the bad request status code. - pub fn bad_req(e: E) -> Self { - Self { code: StatusCode::BAD_REQUEST, eyre: e.into() } - } - - /// Instantiate a new error with the bad request status code and an error - /// string. - pub fn bad_req_str(e: &str) -> Self { - Self { code: StatusCode::BAD_REQUEST, eyre: eyre::eyre!(e.to_owned()) } - } - - /// Instantiate a new error with the internal server error status code. - pub fn server_err(e: E) -> Self { - Self { code: StatusCode::INTERNAL_SERVER_ERROR, eyre: e.into() } - } -} - -impl IntoResponse for AppError { - fn into_response(self) -> Response { - (self.code, format!("{}", self.eyre)).into_response() - } -} +use std::net::SocketAddr; /// Return a 404 Not Found response pub async fn return_404() -> Response { @@ -55,26 +17,20 @@ pub async fn return_200() -> Response { } /// Serve a builder service on the given socket address. -pub fn serve_builder_with_span( - socket: impl Into, - span: Span, -) -> tokio::task::JoinHandle<()> { +pub fn serve_builder(socket: impl Into) -> tokio::task::JoinHandle<()> { let router = Router::new().route("/healthcheck", get(return_200)).fallback(return_404); let addr = socket.into(); - tokio::spawn( - async move { - match tokio::net::TcpListener::bind(&addr).await { - Ok(listener) => { - if let Err(err) = axum::serve(listener, router).await { - tracing::error!(%err, "serve failed"); - } - } - Err(err) => { - tracing::error!(%err, "failed to bind to the address"); + tokio::spawn(async move { + match tokio::net::TcpListener::bind(&addr).await { + Ok(listener) => { + if let Err(err) = axum::serve(listener, router).await { + tracing::error!(%err, "serve failed"); } - }; - } - .instrument(span), - ) + } + Err(err) => { + tracing::error!(%err, "failed to bind to the address"); + } + }; + }) } diff --git a/src/tasks/block.rs b/src/tasks/block.rs index 345dd68..fd5b9ea 100644 --- a/src/tasks/block.rs +++ b/src/tasks/block.rs @@ -15,8 +15,9 @@ use alloy::{ }; use chrono::{DateTime, Utc}; use eyre::Report; +use init4_bin_base::utils::calc::SlotCalculator; use signet_sim::{BlockBuild, BuiltBlock, SimCache}; -use signet_types::{SlotCalculator, config::SignetSystemConstants}; +use signet_types::config::SignetSystemConstants; use std::{ sync::{ Arc, @@ -210,7 +211,7 @@ impl Simulator { } } - /// Spawns the simulator task, which handles the setup and sets the deadline + /// Spawns the simulator task, which handles the setup and sets the deadline /// for the each round of simulation. /// /// # Arguments diff --git a/src/tasks/bundler.rs b/src/tasks/bundler.rs index 8664874..54f8828 100644 --- a/src/tasks/bundler.rs +++ b/src/tasks/bundler.rs @@ -1,6 +1,5 @@ //! Bundler service responsible for fetching bundles and sending them to the simulator. -pub use crate::config::BuilderConfig; -use crate::tasks::oauth::Authenticator; +use crate::tasks::oauth::SharedToken; use oauth2::TokenResponse; use reqwest::{Client, Url}; use serde::{Deserialize, Serialize}; @@ -8,7 +7,10 @@ use signet_bundle::SignetEthBundle; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}; use tokio::task::JoinHandle; use tokio::time; -use tracing::{Instrument, debug, trace}; +use tracing::{Instrument, debug, trace, warn}; + +pub use crate::config::BuilderConfig; + /// Holds a bundle from the cache with a unique ID and a Zenith bundle. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Bundle { @@ -26,12 +28,12 @@ pub struct TxPoolBundleResponse { } /// The BundlePoller polls the tx-pool for bundles. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct BundlePoller { /// The builder configuration values. pub config: BuilderConfig, /// Authentication module that periodically fetches and stores auth tokens. - pub authenticator: Authenticator, + pub token: SharedToken, /// Holds a Reqwest client pub client: Client, /// Defines the interval at which the bundler polls the tx-pool for bundles. @@ -41,28 +43,26 @@ pub struct BundlePoller { /// Implements a poller for the block builder to pull bundles from the tx-pool. impl BundlePoller { /// Creates a new BundlePoller from the provided builder config. - pub fn new(config: &BuilderConfig, authenticator: Authenticator) -> Self { - Self { - config: config.clone(), - authenticator, - client: Client::new(), - poll_interval_ms: 1000, - } + pub fn new(config: &BuilderConfig, token: SharedToken) -> Self { + Self { config: config.clone(), token, client: Client::new(), poll_interval_ms: 1000 } } /// Creates a new BundlePoller from the provided builder config and with the specified poll interval in ms. pub fn new_with_poll_interval_ms( config: &BuilderConfig, - authenticator: Authenticator, + token: SharedToken, poll_interval_ms: u64, ) -> Self { - Self { config: config.clone(), authenticator, client: Client::new(), poll_interval_ms } + Self { config: config.clone(), token, client: Client::new(), poll_interval_ms } } /// Fetches bundles from the transaction cache and returns them. pub async fn check_bundle_cache(&mut self) -> eyre::Result> { let bundle_url: Url = Url::parse(&self.config.tx_pool_url)?.join("bundles")?; - let token = self.authenticator.fetch_oauth_token().await?; + let Some(token) = self.token.read() else { + warn!("No token available, skipping bundle fetch"); + return Ok(vec![]); + }; let result = self .client diff --git a/src/tasks/oauth.rs b/src/tasks/oauth.rs index 69d3049..b402a88 100644 --- a/src/tasks/oauth.rs +++ b/src/tasks/oauth.rs @@ -1,99 +1,88 @@ //! Service responsible for authenticating with the cache with Oauth tokens. //! This authenticator periodically fetches a new token every set amount of seconds. -use std::sync::Arc; - use crate::config::BuilderConfig; use oauth2::{ AuthUrl, ClientId, ClientSecret, EmptyExtraTokenFields, StandardTokenResponse, TokenUrl, basic::{BasicClient, BasicTokenType}, reqwest::async_http_client, }; -use tokio::{sync::RwLock, task::JoinHandle}; +use std::sync::{Arc, Mutex}; +use tokio::task::JoinHandle; type Token = StandardTokenResponse; -/// A self-refreshing, periodically fetching authenticator for the block builder. -/// It is architected as a shareable struct that can be used across all the multiple builder tasks. -/// It fetches a new token every set amount of seconds, configured through the general builder config. -/// Readers are guaranteed to not read stale tokens as the [RwLock] guarantees that write tasks (refreshing the token) will claim priority over read access. -#[derive(Debug, Clone)] -pub struct Authenticator { - /// Configuration - pub config: BuilderConfig, - inner: Arc>, -} +/// A shared token that can be read and written to by multiple threads. +#[derive(Debug, Clone, Default)] +pub struct SharedToken(Arc>>); -/// Inner state of the Authenticator. -/// Contains the token that is being used for authentication. -#[derive(Debug)] -pub struct AuthenticatorInner { - /// The token - pub token: Option, -} +impl SharedToken { + /// Read the token from the shared token. + pub fn read(&self) -> Option { + self.0.lock().unwrap().clone() + } -impl Default for AuthenticatorInner { - fn default() -> Self { - Self::new() + /// Write a new token to the shared token. + pub fn write(&self, token: Token) { + let mut lock = self.0.lock().unwrap(); + *lock = Some(token); } -} -impl AuthenticatorInner { - /// Creates a new AuthenticatorInner with no token set. - pub const fn new() -> Self { - Self { token: None } + /// Check if the token is authenticated. + pub fn is_authenticated(&self) -> bool { + self.0.lock().unwrap().is_some() } } +/// A self-refreshing, periodically fetching authenticator for the block +/// builder. This task periodically fetches a new token, and stores it in a +/// [`SharedToken`]. +#[derive(Debug)] +pub struct Authenticator { + /// Configuration + pub config: BuilderConfig, + client: BasicClient, + token: SharedToken, +} + impl Authenticator { /// Creates a new Authenticator from the provided builder config. - pub fn new(config: &BuilderConfig) -> Self { - Self { config: config.clone(), inner: Arc::new(RwLock::new(AuthenticatorInner::new())) } + pub fn new(config: &BuilderConfig) -> eyre::Result { + let client = BasicClient::new( + ClientId::new(config.oauth_client_id.clone()), + Some(ClientSecret::new(config.oauth_client_secret.clone())), + AuthUrl::new(config.oauth_authenticate_url.clone())?, + Some(TokenUrl::new(config.oauth_token_url.clone())?), + ); + + Ok(Self { config: config.clone(), client, token: Default::default() }) } /// Requests a new authentication token and, if successful, sets it to as the token pub async fn authenticate(&self) -> eyre::Result<()> { let token = self.fetch_oauth_token().await?; - self.set_token(token).await; + self.set_token(token); Ok(()) } /// Returns true if there is Some token set - pub async fn is_authenticated(&self) -> bool { - let lock = self.inner.read().await; - - lock.token.is_some() + pub fn is_authenticated(&self) -> bool { + self.token.is_authenticated() } /// Sets the Authenticator's token to the provided value - pub async fn set_token( - &self, - token: StandardTokenResponse, - ) { - let mut lock = self.inner.write().await; - lock.token = Some(token); + fn set_token(&self, token: StandardTokenResponse) { + self.token.write(token); } /// Returns the currently set token - pub async fn token(&self) -> Option { - let lock = self.inner.read().await; - lock.token.clone() + pub fn token(&self) -> SharedToken { + self.token.clone() } /// Fetches an oauth token - pub async fn fetch_oauth_token( - &self, - ) -> eyre::Result> { - let config = self.config.clone(); - - let client = BasicClient::new( - ClientId::new(config.oauth_client_id.clone()), - Some(ClientSecret::new(config.oauth_client_secret.clone())), - AuthUrl::new(config.oauth_authenticate_url.clone())?, - Some(TokenUrl::new(config.oauth_token_url.clone())?), - ); - + pub async fn fetch_oauth_token(&self) -> eyre::Result { let token_result = - client.exchange_client_credentials().request_async(async_http_client).await?; + self.client.exchange_client_credentials().request_async(async_http_client).await?; Ok(token_result) } @@ -120,23 +109,3 @@ impl Authenticator { handle } } - -mod tests { - #[ignore = "integration test"] - #[tokio::test] - async fn test_authenticator() -> eyre::Result<()> { - use super::*; - use crate::test_utils::setup_test_config; - use oauth2::TokenResponse; - - let config = setup_test_config()?; - let auth = Authenticator::new(&config); - - let _ = auth.fetch_oauth_token().await?; - - let token = auth.token().await.unwrap(); - - assert!(!token.access_token().secret().is_empty()); - Ok(()) - } -} diff --git a/src/tasks/submit.rs b/src/tasks/submit.rs index 925ebb1..3d3c2db 100644 --- a/src/tasks/submit.rs +++ b/src/tasks/submit.rs @@ -1,6 +1,7 @@ use crate::{ config::{HostProvider, ZenithInstance}, signer::LocalOrAws, + tasks::oauth::SharedToken, utils::extract_signature_components, }; use alloy::{ @@ -41,8 +42,6 @@ macro_rules! spawn_provider_send { }; } -use super::oauth::Authenticator; - /// Control flow for transaction submission. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum ControlFlow { @@ -55,7 +54,7 @@ pub enum ControlFlow { } /// Submits sidecars in ethereum txns to mainnet ethereum -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct SubmitTask { /// Ethereum Provider pub host_provider: HostProvider, @@ -68,12 +67,13 @@ pub struct SubmitTask { /// Config pub config: crate::config::BuilderConfig, /// Authenticator - pub authenticator: Authenticator, + pub token: SharedToken, /// Channel over which to send pending transactions pub outbound_tx_channel: mpsc::UnboundedSender, } impl SubmitTask { + #[instrument(skip(self))] async fn sup_quincey(&self, sig_request: &SignRequest) -> eyre::Result { tracing::info!( host_block_number = %sig_request.host_block_number, @@ -81,7 +81,7 @@ impl SubmitTask { "pinging quincey for signature" ); - let token = self.authenticator.fetch_oauth_token().await?; + let Some(token) = self.token.read() else { bail!("no token available") }; let resp: reqwest::Response = self .client diff --git a/tests/authenticator.rs b/tests/authenticator.rs new file mode 100644 index 0000000..6899787 --- /dev/null +++ b/tests/authenticator.rs @@ -0,0 +1,15 @@ +use builder::{tasks::oauth::Authenticator, test_utils::setup_test_config}; + +#[ignore = "integration test"] +#[tokio::test] +async fn test_authenticator() -> eyre::Result<()> { + let config = setup_test_config()?; + let auth = Authenticator::new(&config)?; + + let _ = auth.fetch_oauth_token().await?; + + let token = auth.token(); + + assert!(token.is_authenticated()); + Ok(()) +} diff --git a/tests/block_builder_test.rs b/tests/block_builder_test.rs index f1126f9..8d2e65a 100644 --- a/tests/block_builder_test.rs +++ b/tests/block_builder_test.rs @@ -13,8 +13,8 @@ mod tests { tasks::block::Simulator, test_utils::{new_signed_tx, setup_logging, setup_test_config, test_block_env}, }; + use init4_bin_base::utils::calc::SlotCalculator; use signet_sim::{SimCache, SimItem}; - use signet_types::SlotCalculator; use std::{ sync::Arc, time::{Duration, Instant, SystemTime, UNIX_EPOCH}, @@ -138,6 +138,6 @@ mod tests { // Assert on the block let block = result.unwrap(); assert!(block.is_some(), "Block channel closed without receiving a block"); - assert!(block.unwrap().tx_count() == 2); // TODO: Why is this failing? I'm seeing EVM errors but haven't tracked them down yet. + assert!(block.unwrap().tx_count() == 2); // TODO: Why is this failing? I'm seeing EVM errors but haven't tracked them down yet. } } diff --git a/tests/bundle_poller_test.rs b/tests/bundle_poller_test.rs index 995419d..3876541 100644 --- a/tests/bundle_poller_test.rs +++ b/tests/bundle_poller_test.rs @@ -6,9 +6,9 @@ mod tests { #[tokio::test] async fn test_bundle_poller_roundtrip() -> Result<()> { let config = test_utils::setup_test_config().unwrap(); - let auth = Authenticator::new(&config); + let auth = Authenticator::new(&config)?; - let mut bundle_poller = builder::tasks::bundler::BundlePoller::new(&config, auth); + let mut bundle_poller = builder::tasks::bundler::BundlePoller::new(&config, auth.token()); let _ = bundle_poller.check_bundle_cache().await?;