Skip to content

refactor: cleaning up poller logic #80

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,10 @@ pub struct BuilderConfig {
)]
pub oauth_token_url: String,
/// The oauth token refresh interval in seconds.
#[from_env(var = "CONCURRENCY_LIMIT", desc = "The oauth token refresh interval in seconds")]
#[from_env(
var = "AUTH_TOKEN_REFRESH_INTERVAL",
desc = "The oauth token refresh interval in seconds"
)]
pub oauth_token_refresh_interval: u64,
/// The max number of simultaneous block simulations to run.
#[from_env(
Expand Down
50 changes: 27 additions & 23 deletions src/tasks/bundler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ use oauth2::TokenResponse;
use reqwest::{Client, Url};
use serde::{Deserialize, Serialize};
use signet_bundle::SignetEthBundle;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
use tokio::task::JoinHandle;
use tokio::time;
use tokio::{
sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
task::JoinHandle,
time::{self, Duration},
};

/// Holds a bundle from the cache with a unique ID and a Zenith bundle.
#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -62,18 +64,21 @@ impl BundlePoller {
return Ok(vec![]);
};

let result = self
.client
self.client
.get(bundle_url)
.bearer_auth(token.access_token().secret())
.send()
.await?
.error_for_status()?;

let body = result.bytes().await?;
let resp: TxPoolBundleResponse = serde_json::from_slice(&body)?;
.error_for_status()?
.json()
.await
.map(|resp: TxPoolBundleResponse| resp.bundles)
.map_err(Into::into)
}

Ok(resp.bundles)
/// Returns the poll duration as a [`Duration`].
const fn poll_duration(&self) -> Duration {
Duration::from_millis(self.poll_interval_ms)
}

async fn task_future(mut self, outbound: UnboundedSender<Bundle>) {
Expand All @@ -92,22 +97,21 @@ impl BundlePoller {
// exit the span after the check.
drop(_guard);

match self.check_bundle_cache().instrument(span.clone()).await {
Ok(bundles) => {
debug!(count = ?bundles.len(), "found bundles");
for bundle in bundles.into_iter() {
if let Err(err) = outbound.send(bundle) {
error!(err = ?err, "Failed to send bundle - channel is dropped");
}
if let Ok(bundles) = self
.check_bundle_cache()
.instrument(span.clone())
.await
.inspect_err(|err| debug!(%err, "Error fetching bundles"))
{
debug!(count = ?bundles.len(), "found bundles");
for bundle in bundles.into_iter() {
if let Err(err) = outbound.send(bundle) {
error!(err = ?err, "Failed to send bundle - channel is dropped");
}
}
// If fetching was an error, we log and continue. We expect
// these to be transient network issues.
Err(e) => {
debug!(error = %e, "Error fetching bundles");
}
}
time::sleep(time::Duration::from_millis(self.poll_interval_ms)).await;

time::sleep(self.poll_duration()).await;
}
}

Expand Down
48 changes: 28 additions & 20 deletions src/tasks/tx_poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use eyre::Error;
use init4_bin_base::deps::tracing::{Instrument, debug, debug_span, trace};
use reqwest::{Client, Url};
use serde::{Deserialize, Serialize};
use serde_json::from_slice;
use std::time::Duration;
use tokio::{sync::mpsc, task::JoinHandle, time};

/// Models a response from the transaction pool.
Expand Down Expand Up @@ -40,12 +40,22 @@ impl TxPoller {
Self { config: config.clone(), client: Client::new(), poll_interval_ms }
}

/// Returns the poll duration as a [`Duration`].
const fn poll_duration(&self) -> Duration {
Duration::from_millis(self.poll_interval_ms)
}

/// Polls the transaction cache for transactions.
pub async fn check_tx_cache(&mut self) -> Result<Vec<TxEnvelope>, Error> {
let url: Url = Url::parse(&self.config.tx_pool_url)?.join("transactions")?;
let result = self.client.get(url).send().await?;
let response: TxPoolResponse = from_slice(result.text().await?.as_bytes())?;
Ok(response.transactions)
self.client
.get(url)
.send()
.await?
.json()
.await
.map(|resp: TxPoolResponse| resp.transactions)
.map_err(Into::into)
}

async fn task_future(mut self, outbound: mpsc::UnboundedSender<TxEnvelope>) {
Expand All @@ -64,25 +74,23 @@ impl TxPoller {
// exit the span after the check.
drop(_guard);

match self.check_tx_cache().instrument(span.clone()).await {
Ok(transactions) => {
let _guard = span.entered();
debug!(count = ?transactions.len(), "found transactions");
for tx in transactions.into_iter() {
if outbound.send(tx).is_err() {
// If there are no receivers, we can shut down
trace!("No receivers left, shutting down");
break;
}
if let Ok(transactions) =
self.check_tx_cache().instrument(span.clone()).await.inspect_err(|err| {
debug!(%err, "Error fetching transactions");
})
{
let _guard = span.entered();
debug!(count = ?transactions.len(), "found transactions");
for tx in transactions.into_iter() {
if outbound.send(tx).is_err() {
// If there are no receivers, we can shut down
trace!("No receivers left, shutting down");
break;
}
}
// If fetching was an error, we log and continue. We expect
// these to be transient network issues.
Err(e) => {
debug!(error = %e, "Error fetching transactions");
}
}
time::sleep(time::Duration::from_millis(self.poll_interval_ms)).await;

time::sleep(self.poll_duration()).await;
}
}

Expand Down
Loading