diff --git a/src/config.rs b/src/config.rs index 69d31b2..30e7788 100644 --- a/src/config.rs +++ b/src/config.rs @@ -138,7 +138,10 @@ pub struct BuilderConfig { )] pub oauth_token_url: String, /// The oauth token refresh interval in seconds. - #[from_env(var = "CONCURRENCY_LIMIT", desc = "The oauth token refresh interval in seconds")] + #[from_env( + var = "AUTH_TOKEN_REFRESH_INTERVAL", + desc = "The oauth token refresh interval in seconds" + )] pub oauth_token_refresh_interval: u64, /// The max number of simultaneous block simulations to run. #[from_env( diff --git a/src/tasks/bundler.rs b/src/tasks/bundler.rs index 192b8ad..4aeffa1 100644 --- a/src/tasks/bundler.rs +++ b/src/tasks/bundler.rs @@ -5,9 +5,11 @@ use oauth2::TokenResponse; use reqwest::{Client, Url}; use serde::{Deserialize, Serialize}; use signet_bundle::SignetEthBundle; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}; -use tokio::task::JoinHandle; -use tokio::time; +use tokio::{ + sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, + task::JoinHandle, + time::{self, Duration}, +}; /// Holds a bundle from the cache with a unique ID and a Zenith bundle. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -62,18 +64,21 @@ impl BundlePoller { return Ok(vec![]); }; - let result = self - .client + self.client .get(bundle_url) .bearer_auth(token.access_token().secret()) .send() .await? - .error_for_status()?; - - let body = result.bytes().await?; - let resp: TxPoolBundleResponse = serde_json::from_slice(&body)?; + .error_for_status()? + .json() + .await + .map(|resp: TxPoolBundleResponse| resp.bundles) + .map_err(Into::into) + } - Ok(resp.bundles) + /// Returns the poll duration as a [`Duration`]. + const fn poll_duration(&self) -> Duration { + Duration::from_millis(self.poll_interval_ms) } async fn task_future(mut self, outbound: UnboundedSender) { @@ -92,22 +97,21 @@ impl BundlePoller { // exit the span after the check. drop(_guard); - match self.check_bundle_cache().instrument(span.clone()).await { - Ok(bundles) => { - debug!(count = ?bundles.len(), "found bundles"); - for bundle in bundles.into_iter() { - if let Err(err) = outbound.send(bundle) { - error!(err = ?err, "Failed to send bundle - channel is dropped"); - } + if let Ok(bundles) = self + .check_bundle_cache() + .instrument(span.clone()) + .await + .inspect_err(|err| debug!(%err, "Error fetching bundles")) + { + debug!(count = ?bundles.len(), "found bundles"); + for bundle in bundles.into_iter() { + if let Err(err) = outbound.send(bundle) { + error!(err = ?err, "Failed to send bundle - channel is dropped"); } } - // If fetching was an error, we log and continue. We expect - // these to be transient network issues. - Err(e) => { - debug!(error = %e, "Error fetching bundles"); - } } - time::sleep(time::Duration::from_millis(self.poll_interval_ms)).await; + + time::sleep(self.poll_duration()).await; } } diff --git a/src/tasks/tx_poller.rs b/src/tasks/tx_poller.rs index a4655b5..e9df5b0 100644 --- a/src/tasks/tx_poller.rs +++ b/src/tasks/tx_poller.rs @@ -5,7 +5,7 @@ use eyre::Error; use init4_bin_base::deps::tracing::{Instrument, debug, debug_span, trace}; use reqwest::{Client, Url}; use serde::{Deserialize, Serialize}; -use serde_json::from_slice; +use std::time::Duration; use tokio::{sync::mpsc, task::JoinHandle, time}; /// Models a response from the transaction pool. @@ -40,12 +40,22 @@ impl TxPoller { Self { config: config.clone(), client: Client::new(), poll_interval_ms } } + /// Returns the poll duration as a [`Duration`]. + const fn poll_duration(&self) -> Duration { + Duration::from_millis(self.poll_interval_ms) + } + /// Polls the transaction cache for transactions. pub async fn check_tx_cache(&mut self) -> Result, Error> { let url: Url = Url::parse(&self.config.tx_pool_url)?.join("transactions")?; - let result = self.client.get(url).send().await?; - let response: TxPoolResponse = from_slice(result.text().await?.as_bytes())?; - Ok(response.transactions) + self.client + .get(url) + .send() + .await? + .json() + .await + .map(|resp: TxPoolResponse| resp.transactions) + .map_err(Into::into) } async fn task_future(mut self, outbound: mpsc::UnboundedSender) { @@ -64,25 +74,23 @@ impl TxPoller { // exit the span after the check. drop(_guard); - match self.check_tx_cache().instrument(span.clone()).await { - Ok(transactions) => { - let _guard = span.entered(); - debug!(count = ?transactions.len(), "found transactions"); - for tx in transactions.into_iter() { - if outbound.send(tx).is_err() { - // If there are no receivers, we can shut down - trace!("No receivers left, shutting down"); - break; - } + if let Ok(transactions) = + self.check_tx_cache().instrument(span.clone()).await.inspect_err(|err| { + debug!(%err, "Error fetching transactions"); + }) + { + let _guard = span.entered(); + debug!(count = ?transactions.len(), "found transactions"); + for tx in transactions.into_iter() { + if outbound.send(tx).is_err() { + // If there are no receivers, we can shut down + trace!("No receivers left, shutting down"); + break; } } - // If fetching was an error, we log and continue. We expect - // these to be transient network issues. - Err(e) => { - debug!(error = %e, "Error fetching transactions"); - } } - time::sleep(time::Duration::from_millis(self.poll_interval_ms)).await; + + time::sleep(self.poll_duration()).await; } }