Skip to content

Commit caf680e

Browse files
committed
refactor: trim some dead code, simplify auth
1 parent 31d62d6 commit caf680e

File tree

5 files changed

+91
-141
lines changed

5 files changed

+91
-141
lines changed

bin/builder.rs

+11-8
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
use builder::{
22
config::BuilderConfig,
3-
service::serve_builder_with_span,
3+
service::serve_builder,
44
tasks::{
55
block::Simulator, bundler, metrics::MetricsTask, oauth::Authenticator, submit::SubmitTask,
66
tx_poller,
77
},
88
};
9+
use init4_bin_base::deps::tracing;
910
use signet_sim::SimCache;
1011
use signet_types::SlotCalculator;
1112
use std::sync::Arc;
@@ -14,12 +15,11 @@ use tokio::select;
1415
#[tokio::main]
1516
async fn main() -> eyre::Result<()> {
1617
let _guard = init4_bin_base::init4();
17-
18-
let span = tracing::info_span!("zenith-builder");
18+
let init_span_guard = tracing::info_span!("builder initialization");
1919

2020
let config = BuilderConfig::load_from_env()?.clone();
2121
let constants = config.load_pecorino_constants();
22-
let authenticator = Authenticator::new(&config);
22+
let authenticator = Authenticator::new(&config)?;
2323

2424
let (host_provider, ru_provider, sequencer_signer) = tokio::try_join!(
2525
config.connect_host_provider(),
@@ -33,7 +33,7 @@ async fn main() -> eyre::Result<()> {
3333
let (tx_channel, metrics_jh) = metrics.spawn();
3434

3535
let submit = SubmitTask {
36-
authenticator: authenticator.clone(),
36+
token: authenticator.token(),
3737
host_provider,
3838
zenith,
3939
client: reqwest::Client::new(),
@@ -45,7 +45,7 @@ async fn main() -> eyre::Result<()> {
4545
let tx_poller = tx_poller::TxPoller::new(&config);
4646
let (tx_receiver, tx_poller_jh) = tx_poller.spawn();
4747

48-
let bundle_poller = bundler::BundlePoller::new(&config, authenticator.clone());
48+
let bundle_poller = bundler::BundlePoller::new(&config, authenticator.token());
4949
let (bundle_receiver, bundle_poller_jh) = bundle_poller.spawn();
5050

5151
let authenticator_jh = authenticator.spawn();
@@ -63,8 +63,11 @@ async fn main() -> eyre::Result<()> {
6363

6464
let build_jh = sim.clone().spawn_simulator_task(constants, sim_items.clone(), submit_channel);
6565

66-
let port = config.builder_port;
67-
let server = serve_builder_with_span(([0, 0, 0, 0], port), span);
66+
let server = serve_builder(([0, 0, 0, 0], config.builder_port));
67+
68+
// We have finished initializing the builder, so we can drop the init span
69+
// guard.
70+
drop(init_span_guard);
6871

6972
select! {
7073
_ = tx_poller_jh => {

src/service.rs

+13-57
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,10 @@
1-
use std::{fmt::Debug, net::SocketAddr};
2-
31
use axum::{
42
Router,
53
http::StatusCode,
64
response::{IntoResponse, Response},
75
routing::get,
86
};
9-
use tracing::{Instrument, Span};
10-
11-
/// App result
12-
pub type AppResult<T, E = AppError> = Result<T, E>;
13-
14-
/// App error. This is a wrapper around eyre::Report that also includes an HTTP
15-
/// status code. It implements [`IntoResponse`] so that it can be returned as an
16-
/// error type from [`axum::handler::Handler`]s.
17-
#[derive(Debug)]
18-
pub struct AppError {
19-
code: StatusCode,
20-
eyre: eyre::Report,
21-
}
22-
23-
impl AppError {
24-
/// Instantiate a new error with the bad request status code.
25-
pub fn bad_req<E: std::error::Error + Send + Sync + 'static>(e: E) -> Self {
26-
Self { code: StatusCode::BAD_REQUEST, eyre: e.into() }
27-
}
28-
29-
/// Instantiate a new error with the bad request status code and an error
30-
/// string.
31-
pub fn bad_req_str(e: &str) -> Self {
32-
Self { code: StatusCode::BAD_REQUEST, eyre: eyre::eyre!(e.to_owned()) }
33-
}
34-
35-
/// Instantiate a new error with the internal server error status code.
36-
pub fn server_err<E: std::error::Error + Send + Sync + 'static>(e: E) -> Self {
37-
Self { code: StatusCode::INTERNAL_SERVER_ERROR, eyre: e.into() }
38-
}
39-
}
40-
41-
impl IntoResponse for AppError {
42-
fn into_response(self) -> Response {
43-
(self.code, format!("{}", self.eyre)).into_response()
44-
}
45-
}
7+
use std::net::SocketAddr;
468

479
/// Return a 404 Not Found response
4810
pub async fn return_404() -> Response {
@@ -55,26 +17,20 @@ pub async fn return_200() -> Response {
5517
}
5618

5719
/// Serve a builder service on the given socket address.
58-
pub fn serve_builder_with_span(
59-
socket: impl Into<SocketAddr>,
60-
span: Span,
61-
) -> tokio::task::JoinHandle<()> {
20+
pub fn serve_builder(socket: impl Into<SocketAddr>) -> tokio::task::JoinHandle<()> {
6221
let router = Router::new().route("/healthcheck", get(return_200)).fallback(return_404);
6322

6423
let addr = socket.into();
65-
tokio::spawn(
66-
async move {
67-
match tokio::net::TcpListener::bind(&addr).await {
68-
Ok(listener) => {
69-
if let Err(err) = axum::serve(listener, router).await {
70-
tracing::error!(%err, "serve failed");
71-
}
72-
}
73-
Err(err) => {
74-
tracing::error!(%err, "failed to bind to the address");
24+
tokio::spawn(async move {
25+
match tokio::net::TcpListener::bind(&addr).await {
26+
Ok(listener) => {
27+
if let Err(err) = axum::serve(listener, router).await {
28+
tracing::error!(%err, "serve failed");
7529
}
76-
};
77-
}
78-
.instrument(span),
79-
)
30+
}
31+
Err(err) => {
32+
tracing::error!(%err, "failed to bind to the address");
33+
}
34+
};
35+
})
8036
}

src/tasks/bundler.rs

+14-14
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
//! Bundler service responsible for fetching bundles and sending them to the simulator.
2-
pub use crate::config::BuilderConfig;
3-
use crate::tasks::oauth::Authenticator;
2+
use crate::tasks::oauth::SharedToken;
43
use oauth2::TokenResponse;
54
use reqwest::{Client, Url};
65
use serde::{Deserialize, Serialize};
@@ -9,6 +8,9 @@ use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
98
use tokio::task::JoinHandle;
109
use tokio::time;
1110
use tracing::{Instrument, debug, trace};
11+
12+
pub use crate::config::BuilderConfig;
13+
1214
/// Holds a bundle from the cache with a unique ID and a Zenith bundle.
1315
#[derive(Debug, Clone, Serialize, Deserialize)]
1416
pub struct Bundle {
@@ -26,12 +28,12 @@ pub struct TxPoolBundleResponse {
2628
}
2729

2830
/// The BundlePoller polls the tx-pool for bundles.
29-
#[derive(Debug, Clone)]
31+
#[derive(Debug)]
3032
pub struct BundlePoller {
3133
/// The builder configuration values.
3234
pub config: BuilderConfig,
3335
/// Authentication module that periodically fetches and stores auth tokens.
34-
pub authenticator: Authenticator,
36+
pub token: SharedToken,
3537
/// Holds a Reqwest client
3638
pub client: Client,
3739
/// Defines the interval at which the bundler polls the tx-pool for bundles.
@@ -41,28 +43,26 @@ pub struct BundlePoller {
4143
/// Implements a poller for the block builder to pull bundles from the tx-pool.
4244
impl BundlePoller {
4345
/// Creates a new BundlePoller from the provided builder config.
44-
pub fn new(config: &BuilderConfig, authenticator: Authenticator) -> Self {
45-
Self {
46-
config: config.clone(),
47-
authenticator,
48-
client: Client::new(),
49-
poll_interval_ms: 1000,
50-
}
46+
pub fn new(config: &BuilderConfig, token: SharedToken) -> Self {
47+
Self { config: config.clone(), token, client: Client::new(), poll_interval_ms: 1000 }
5148
}
5249

5350
/// Creates a new BundlePoller from the provided builder config and with the specified poll interval in ms.
5451
pub fn new_with_poll_interval_ms(
5552
config: &BuilderConfig,
56-
authenticator: Authenticator,
53+
token: SharedToken,
5754
poll_interval_ms: u64,
5855
) -> Self {
59-
Self { config: config.clone(), authenticator, client: Client::new(), poll_interval_ms }
56+
Self { config: config.clone(), token, client: Client::new(), poll_interval_ms }
6057
}
6158

6259
/// Fetches bundles from the transaction cache and returns them.
6360
pub async fn check_bundle_cache(&mut self) -> eyre::Result<Vec<Bundle>> {
6461
let bundle_url: Url = Url::parse(&self.config.tx_pool_url)?.join("bundles")?;
65-
let token = self.authenticator.fetch_oauth_token().await?;
62+
let Some(token) = self.token.read() else {
63+
tracing::warn!("No token available, skipping bundle fetch");
64+
return Ok(vec![]);
65+
};
6666

6767
let result = self
6868
.client

src/tasks/oauth.rs

+46-57
Original file line numberDiff line numberDiff line change
@@ -1,99 +1,88 @@
11
//! Service responsible for authenticating with the cache with Oauth tokens.
22
//! This authenticator periodically fetches a new token every set amount of seconds.
3-
use std::sync::Arc;
4-
53
use crate::config::BuilderConfig;
64
use oauth2::{
75
AuthUrl, ClientId, ClientSecret, EmptyExtraTokenFields, StandardTokenResponse, TokenUrl,
86
basic::{BasicClient, BasicTokenType},
97
reqwest::async_http_client,
108
};
11-
use tokio::{sync::RwLock, task::JoinHandle};
9+
use std::sync::{Arc, Mutex};
10+
use tokio::task::JoinHandle;
1211

1312
type Token = StandardTokenResponse<EmptyExtraTokenFields, BasicTokenType>;
1413

15-
/// A self-refreshing, periodically fetching authenticator for the block builder.
16-
/// It is architected as a shareable struct that can be used across all the multiple builder tasks.
17-
/// It fetches a new token every set amount of seconds, configured through the general builder config.
18-
/// Readers are guaranteed to not read stale tokens as the [RwLock] guarantees that write tasks (refreshing the token) will claim priority over read access.
19-
#[derive(Debug, Clone)]
20-
pub struct Authenticator {
21-
/// Configuration
22-
pub config: BuilderConfig,
23-
inner: Arc<RwLock<AuthenticatorInner>>,
24-
}
14+
/// A shared token that can be read and written to by multiple threads.
15+
#[derive(Debug, Clone, Default)]
16+
pub struct SharedToken(Arc<Mutex<Option<Token>>>);
2517

26-
/// Inner state of the Authenticator.
27-
/// Contains the token that is being used for authentication.
28-
#[derive(Debug)]
29-
pub struct AuthenticatorInner {
30-
/// The token
31-
pub token: Option<Token>,
32-
}
18+
impl SharedToken {
19+
/// Read the token from the shared token.
20+
pub fn read(&self) -> Option<Token> {
21+
self.0.lock().unwrap().clone()
22+
}
3323

34-
impl Default for AuthenticatorInner {
35-
fn default() -> Self {
36-
Self::new()
24+
/// Write a new token to the shared token.
25+
pub fn write(&self, token: Token) {
26+
let mut lock = self.0.lock().unwrap();
27+
*lock = Some(token);
3728
}
38-
}
3929

40-
impl AuthenticatorInner {
41-
/// Creates a new AuthenticatorInner with no token set.
42-
pub const fn new() -> Self {
43-
Self { token: None }
30+
/// Check if the token is authenticated.
31+
pub fn is_authenticated(&self) -> bool {
32+
self.0.lock().unwrap().is_some()
4433
}
4534
}
4635

36+
/// A self-refreshing, periodically fetching authenticator for the block
37+
/// builder. This task periodically fetches a new token, and stores it in a
38+
/// [`SharedToken`].
39+
#[derive(Debug)]
40+
pub struct Authenticator {
41+
/// Configuration
42+
pub config: BuilderConfig,
43+
client: BasicClient,
44+
token: SharedToken,
45+
}
46+
4747
impl Authenticator {
4848
/// Creates a new Authenticator from the provided builder config.
49-
pub fn new(config: &BuilderConfig) -> Self {
50-
Self { config: config.clone(), inner: Arc::new(RwLock::new(AuthenticatorInner::new())) }
49+
pub fn new(config: &BuilderConfig) -> eyre::Result<Self> {
50+
let client = BasicClient::new(
51+
ClientId::new(config.oauth_client_id.clone()),
52+
Some(ClientSecret::new(config.oauth_client_secret.clone())),
53+
AuthUrl::new(config.oauth_authenticate_url.clone())?,
54+
Some(TokenUrl::new(config.oauth_token_url.clone())?),
55+
);
56+
57+
Ok(Self { config: config.clone(), client, token: Default::default() })
5158
}
5259

5360
/// Requests a new authentication token and, if successful, sets it to as the token
5461
pub async fn authenticate(&self) -> eyre::Result<()> {
5562
let token = self.fetch_oauth_token().await?;
56-
self.set_token(token).await;
63+
self.set_token(token);
5764
Ok(())
5865
}
5966

6067
/// Returns true if there is Some token set
61-
pub async fn is_authenticated(&self) -> bool {
62-
let lock = self.inner.read().await;
63-
64-
lock.token.is_some()
68+
pub fn is_authenticated(&self) -> bool {
69+
self.token.is_authenticated()
6570
}
6671

6772
/// Sets the Authenticator's token to the provided value
68-
pub async fn set_token(
69-
&self,
70-
token: StandardTokenResponse<EmptyExtraTokenFields, BasicTokenType>,
71-
) {
72-
let mut lock = self.inner.write().await;
73-
lock.token = Some(token);
73+
fn set_token(&self, token: StandardTokenResponse<EmptyExtraTokenFields, BasicTokenType>) {
74+
self.token.write(token);
7475
}
7576

7677
/// Returns the currently set token
77-
pub async fn token(&self) -> Option<Token> {
78-
let lock = self.inner.read().await;
79-
lock.token.clone()
78+
pub fn token(&self) -> SharedToken {
79+
self.token.clone()
8080
}
8181

8282
/// Fetches an oauth token
83-
pub async fn fetch_oauth_token(
84-
&self,
85-
) -> eyre::Result<StandardTokenResponse<EmptyExtraTokenFields, BasicTokenType>> {
86-
let config = self.config.clone();
87-
88-
let client = BasicClient::new(
89-
ClientId::new(config.oauth_client_id.clone()),
90-
Some(ClientSecret::new(config.oauth_client_secret.clone())),
91-
AuthUrl::new(config.oauth_authenticate_url.clone())?,
92-
Some(TokenUrl::new(config.oauth_token_url.clone())?),
93-
);
94-
83+
async fn fetch_oauth_token(&self) -> eyre::Result<Token> {
9584
let token_result =
96-
client.exchange_client_credentials().request_async(async_http_client).await?;
85+
self.client.exchange_client_credentials().request_async(async_http_client).await?;
9786

9887
Ok(token_result)
9988
}

0 commit comments

Comments
 (0)