Skip to content

Commit 51e428c

Browse files
committed
refactor: cleaning up poller logic
1 parent db4fdbd commit 51e428c

File tree

2 files changed

+55
-43
lines changed

2 files changed

+55
-43
lines changed

src/tasks/bundler.rs

+27-23
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ use oauth2::TokenResponse;
55
use reqwest::{Client, Url};
66
use serde::{Deserialize, Serialize};
77
use signet_bundle::SignetEthBundle;
8-
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
9-
use tokio::task::JoinHandle;
10-
use tokio::time;
8+
use tokio::{
9+
sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
10+
task::JoinHandle,
11+
time::{self, Duration},
12+
};
1113

1214
/// Holds a bundle from the cache with a unique ID and a Zenith bundle.
1315
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -62,18 +64,21 @@ impl BundlePoller {
6264
return Ok(vec![]);
6365
};
6466

65-
let result = self
66-
.client
67+
self.client
6768
.get(bundle_url)
6869
.bearer_auth(token.access_token().secret())
6970
.send()
7071
.await?
71-
.error_for_status()?;
72-
73-
let body = result.bytes().await?;
74-
let resp: TxPoolBundleResponse = serde_json::from_slice(&body)?;
72+
.error_for_status()?
73+
.json()
74+
.await
75+
.map(|resp: TxPoolBundleResponse| resp.bundles)
76+
.map_err(Into::into)
77+
}
7578

76-
Ok(resp.bundles)
79+
/// Returns the poll duration as a [`Duration`].
80+
const fn poll_duration(&self) -> Duration {
81+
Duration::from_millis(self.poll_interval_ms)
7782
}
7883

7984
async fn task_future(mut self, outbound: UnboundedSender<Bundle>) {
@@ -92,22 +97,21 @@ impl BundlePoller {
9297
// exit the span after the check.
9398
drop(_guard);
9499

95-
match self.check_bundle_cache().instrument(span.clone()).await {
96-
Ok(bundles) => {
97-
debug!(count = ?bundles.len(), "found bundles");
98-
for bundle in bundles.into_iter() {
99-
if let Err(err) = outbound.send(bundle) {
100-
error!(err = ?err, "Failed to send bundle - channel is dropped");
101-
}
100+
if let Ok(bundles) = self
101+
.check_bundle_cache()
102+
.instrument(span.clone())
103+
.await
104+
.inspect_err(|err| debug!(%err, "Error fetching bundles"))
105+
{
106+
debug!(count = ?bundles.len(), "found bundles");
107+
for bundle in bundles.into_iter() {
108+
if let Err(err) = outbound.send(bundle) {
109+
error!(err = ?err, "Failed to send bundle - channel is dropped");
102110
}
103111
}
104-
// If fetching was an error, we log and continue. We expect
105-
// these to be transient network issues.
106-
Err(e) => {
107-
debug!(error = %e, "Error fetching bundles");
108-
}
109112
}
110-
time::sleep(time::Duration::from_millis(self.poll_interval_ms)).await;
113+
114+
time::sleep(self.poll_duration()).await;
111115
}
112116
}
113117

src/tasks/tx_poller.rs

+28-20
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use eyre::Error;
55
use init4_bin_base::deps::tracing::{Instrument, debug, debug_span, trace};
66
use reqwest::{Client, Url};
77
use serde::{Deserialize, Serialize};
8-
use serde_json::from_slice;
8+
use std::time::Duration;
99
use tokio::{sync::mpsc, task::JoinHandle, time};
1010

1111
/// Models a response from the transaction pool.
@@ -40,12 +40,22 @@ impl TxPoller {
4040
Self { config: config.clone(), client: Client::new(), poll_interval_ms }
4141
}
4242

43+
/// Returns the poll duration as a [`Duration`].
44+
const fn poll_duration(&self) -> Duration {
45+
Duration::from_millis(self.poll_interval_ms)
46+
}
47+
4348
/// Polls the transaction cache for transactions.
4449
pub async fn check_tx_cache(&mut self) -> Result<Vec<TxEnvelope>, Error> {
4550
let url: Url = Url::parse(&self.config.tx_pool_url)?.join("transactions")?;
46-
let result = self.client.get(url).send().await?;
47-
let response: TxPoolResponse = from_slice(result.text().await?.as_bytes())?;
48-
Ok(response.transactions)
51+
self.client
52+
.get(url)
53+
.send()
54+
.await?
55+
.json()
56+
.await
57+
.map(|resp: TxPoolResponse| resp.transactions)
58+
.map_err(Into::into)
4959
}
5060

5161
async fn task_future(mut self, outbound: mpsc::UnboundedSender<TxEnvelope>) {
@@ -64,25 +74,23 @@ impl TxPoller {
6474
// exit the span after the check.
6575
drop(_guard);
6676

67-
match self.check_tx_cache().instrument(span.clone()).await {
68-
Ok(transactions) => {
69-
let _guard = span.entered();
70-
debug!(count = ?transactions.len(), "found transactions");
71-
for tx in transactions.into_iter() {
72-
if outbound.send(tx).is_err() {
73-
// If there are no receivers, we can shut down
74-
trace!("No receivers left, shutting down");
75-
break;
76-
}
77+
if let Ok(transactions) =
78+
self.check_tx_cache().instrument(span.clone()).await.inspect_err(|err| {
79+
debug!(%err, "Error fetching transactions");
80+
})
81+
{
82+
let _guard = span.entered();
83+
debug!(count = ?transactions.len(), "found transactions");
84+
for tx in transactions.into_iter() {
85+
if outbound.send(tx).is_err() {
86+
// If there are no receivers, we can shut down
87+
trace!("No receivers left, shutting down");
88+
break;
7789
}
7890
}
79-
// If fetching was an error, we log and continue. We expect
80-
// these to be transient network issues.
81-
Err(e) => {
82-
debug!(error = %e, "Error fetching transactions");
83-
}
8491
}
85-
time::sleep(time::Duration::from_millis(self.poll_interval_ms)).await;
92+
93+
time::sleep(self.poll_duration()).await;
8694
}
8795
}
8896

0 commit comments

Comments
 (0)