Skip to content

Commit aa49dc4

Browse files
committed
fix: metrics previously did not wait for confirmation
1 parent fb9f608 commit aa49dc4

File tree

1 file changed

+52
-44
lines changed

1 file changed

+52
-44
lines changed

src/tasks/metrics.rs

+52-44
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
use crate::config::HostProvider;
2-
use alloy::{primitives::TxHash, providers::Provider as _};
2+
use alloy::{
3+
primitives::TxHash,
4+
providers::{PendingTransactionBuilder, PendingTransactionError, Provider as _, WatchTxError},
5+
};
36
use init4_bin_base::deps::{
47
metrics::{counter, histogram},
5-
tracing::{debug, error},
8+
tracing::{Instrument, debug, error, info_span},
69
};
7-
use std::time::Instant;
10+
use std::time::{Duration, Instant};
811
use tokio::{sync::mpsc, task::JoinHandle};
912

1013
/// Collects metrics on transactions sent by the Builder
@@ -15,64 +18,69 @@ pub struct MetricsTask {
1518
}
1619

1720
impl MetricsTask {
18-
/// Given a transaction hash, record metrics on the result of the transaction mining
19-
pub async fn log_tx(&self, pending_tx_hash: TxHash) {
20-
// start timer when tx hash is received
21-
let start: Instant = Instant::now();
21+
/// Given a transaction hash, record metrics on the result of the
22+
/// transaction mining
23+
pub fn log_tx(&self, tx_hash: TxHash) -> impl Future<Output = ()> + use<> {
24+
let provider = self.host_provider.clone();
2225

23-
// wait for the tx to mine, get its receipt
24-
let receipt_result =
25-
self.host_provider.clone().get_transaction_receipt(pending_tx_hash).await;
26+
async move {
27+
// start timer when tx hash is received
28+
let start: Instant = Instant::now();
2629

27-
match receipt_result {
28-
Ok(maybe_receipt) => {
29-
match maybe_receipt {
30-
Some(receipt) => {
31-
// record how long it took to mine the transaction
32-
// potential improvement: use the block timestamp to calculate the time elapsed
33-
histogram!("metrics.tx_mine_time")
34-
.record(start.elapsed().as_millis() as f64);
30+
let span = info_span!("metrics_submission", %tx_hash);
3531

36-
// log whether the transaction reverted
37-
if receipt.status() {
38-
counter!("metrics.tx_reverted").increment(1);
39-
debug!(tx_hash = %pending_tx_hash, "tx reverted");
40-
} else {
41-
counter!("metrics.tx_succeeded").increment(1);
42-
debug!(tx_hash = %pending_tx_hash, "tx succeeded");
43-
}
44-
}
45-
None => {
46-
counter!("metrics.no_receipt").increment(1);
47-
error!("no receipt found for tx hash");
32+
// wait for the tx to mine, get its receipt
33+
let receipt = PendingTransactionBuilder::new(provider.root().clone(), tx_hash)
34+
.with_required_confirmations(1)
35+
.with_timeout(Some(Duration::from_secs(60)))
36+
.get_receipt()
37+
.instrument(span.clone())
38+
.await;
39+
40+
// enter the span to log the result
41+
let _guard = span.entered();
42+
43+
match receipt {
44+
Ok(receipt) => {
45+
// record how long it took to mine the transaction
46+
// potential improvement: use the block timestamp to calculate the time elapsed
47+
histogram!("metrics.tx_mine_time").record(start.elapsed().as_millis() as f64);
48+
49+
// log whether the transaction reverted
50+
if receipt.status() {
51+
counter!("metrics.tx_reverted").increment(1);
52+
debug!("tx reverted");
53+
} else {
54+
counter!("metrics.tx_succeeded").increment(1);
55+
debug!("tx succeeded");
4856
}
4957
}
50-
}
51-
Err(e) => {
52-
counter!("metrics.rpc_error").increment(1);
53-
error!(error = ?e, "rpc error");
58+
Err(PendingTransactionError::TxWatcher(WatchTxError::Timeout)) => {
59+
// log that the transaction timed out
60+
counter!("metrics.tx_not_mined").increment(1);
61+
debug!("tx not mined");
62+
}
63+
Err(e) => {
64+
counter!("metrics.rpc_error").increment(1);
65+
error!(error = ?e, "rpc error");
66+
}
5467
}
5568
}
5669
}
5770

5871
/// Spawns the task which collects metrics on pending transactions
5972
pub fn spawn(self) -> (mpsc::UnboundedSender<TxHash>, JoinHandle<()>) {
6073
let (sender, mut inbound) = mpsc::unbounded_channel();
74+
6175
let handle = tokio::spawn(async move {
6276
debug!("metrics task spawned");
6377
loop {
64-
if let Some(pending_tx_hash) = inbound.recv().await {
65-
let this = self.clone();
66-
tokio::spawn(async move {
67-
debug!("received tx hash");
68-
let that = this.clone();
69-
that.log_tx(pending_tx_hash).await;
70-
debug!("logged tx metrics");
71-
});
72-
} else {
78+
let Some(tx_hash) = inbound.recv().await else {
7379
debug!("upstream task gone");
7480
break;
75-
}
81+
};
82+
let fut = self.log_tx(tx_hash);
83+
tokio::spawn(fut);
7684
}
7785
});
7886

0 commit comments

Comments
 (0)