11use crate :: config:: HostProvider ;
2- use alloy:: { primitives:: TxHash , providers:: Provider as _} ;
2+ use alloy:: {
3+ primitives:: TxHash ,
4+ providers:: { PendingTransactionBuilder , PendingTransactionError , Provider as _, WatchTxError } ,
5+ } ;
36use init4_bin_base:: deps:: {
47 metrics:: { counter, histogram} ,
5- tracing:: { debug, error} ,
8+ tracing:: { Instrument , debug, error, info_span } ,
69} ;
7- use std:: time:: Instant ;
10+ use std:: time:: { Duration , Instant } ;
811use tokio:: { sync:: mpsc, task:: JoinHandle } ;
912
1013/// Collects metrics on transactions sent by the Builder
@@ -15,64 +18,69 @@ pub struct MetricsTask {
1518}
1619
1720impl MetricsTask {
18- /// Given a transaction hash, record metrics on the result of the transaction mining
19- pub async fn log_tx ( & self , pending_tx_hash : TxHash ) {
20- // start timer when tx hash is received
21- let start : Instant = Instant :: now ( ) ;
21+ /// Given a transaction hash, record metrics on the result of the
22+ /// transaction mining
23+ pub fn log_tx ( & self , tx_hash : TxHash ) -> impl Future < Output = ( ) > + use<> {
24+ let provider = self . host_provider . clone ( ) ;
2225
23- // wait for the tx to mine, get its receipt
24- let receipt_result =
25- self . host_provider . clone ( ) . get_transaction_receipt ( pending_tx_hash ) . await ;
26+ async move {
27+ // start timer when tx hash is received
28+ let start : Instant = Instant :: now ( ) ;
2629
27- match receipt_result {
28- Ok ( maybe_receipt) => {
29- match maybe_receipt {
30- Some ( receipt) => {
31- // record how long it took to mine the transaction
32- // potential improvement: use the block timestamp to calculate the time elapsed
33- histogram ! ( "metrics.tx_mine_time" )
34- . record ( start. elapsed ( ) . as_millis ( ) as f64 ) ;
30+ let span = info_span ! ( "metrics_submission" , %tx_hash) ;
3531
36- // log whether the transaction reverted
37- if receipt. status ( ) {
38- counter ! ( "metrics.tx_reverted" ) . increment ( 1 ) ;
39- debug ! ( tx_hash = %pending_tx_hash, "tx reverted" ) ;
40- } else {
41- counter ! ( "metrics.tx_succeeded" ) . increment ( 1 ) ;
42- debug ! ( tx_hash = %pending_tx_hash, "tx succeeded" ) ;
43- }
44- }
45- None => {
46- counter ! ( "metrics.no_receipt" ) . increment ( 1 ) ;
47- error ! ( "no receipt found for tx hash" ) ;
32+ // wait for the tx to mine, get its receipt
33+ let receipt = PendingTransactionBuilder :: new ( provider. root ( ) . clone ( ) , tx_hash)
34+ . with_required_confirmations ( 1 )
35+ . with_timeout ( Some ( Duration :: from_secs ( 60 ) ) )
36+ . get_receipt ( )
37+ . instrument ( span. clone ( ) )
38+ . await ;
39+
40+ // enter the span to log the result
41+ let _guard = span. entered ( ) ;
42+
43+ match receipt {
44+ Ok ( receipt) => {
45+ // record how long it took to mine the transaction
46+ // potential improvement: use the block timestamp to calculate the time elapsed
47+ histogram ! ( "metrics.tx_mine_time" ) . record ( start. elapsed ( ) . as_millis ( ) as f64 ) ;
48+
49+ // log whether the transaction reverted
50+ if receipt. status ( ) {
51+ counter ! ( "metrics.tx_reverted" ) . increment ( 1 ) ;
52+ debug ! ( "tx reverted" ) ;
53+ } else {
54+ counter ! ( "metrics.tx_succeeded" ) . increment ( 1 ) ;
55+ debug ! ( "tx succeeded" ) ;
4856 }
4957 }
50- }
51- Err ( e) => {
52- counter ! ( "metrics.rpc_error" ) . increment ( 1 ) ;
53- error ! ( error = ?e, "rpc error" ) ;
58+ Err ( PendingTransactionError :: TxWatcher ( WatchTxError :: Timeout ) ) => {
59+ // log that the transaction timed out
60+ counter ! ( "metrics.tx_not_mined" ) . increment ( 1 ) ;
61+ debug ! ( "tx not mined" ) ;
62+ }
63+ Err ( e) => {
64+ counter ! ( "metrics.rpc_error" ) . increment ( 1 ) ;
65+ error ! ( error = ?e, "rpc error" ) ;
66+ }
5467 }
5568 }
5669 }
5770
5871 /// Spawns the task which collects metrics on pending transactions
5972 pub fn spawn ( self ) -> ( mpsc:: UnboundedSender < TxHash > , JoinHandle < ( ) > ) {
6073 let ( sender, mut inbound) = mpsc:: unbounded_channel ( ) ;
74+
6175 let handle = tokio:: spawn ( async move {
6276 debug ! ( "metrics task spawned" ) ;
6377 loop {
64- if let Some ( pending_tx_hash) = inbound. recv ( ) . await {
65- let this = self . clone ( ) ;
66- tokio:: spawn ( async move {
67- debug ! ( "received tx hash" ) ;
68- let that = this. clone ( ) ;
69- that. log_tx ( pending_tx_hash) . await ;
70- debug ! ( "logged tx metrics" ) ;
71- } ) ;
72- } else {
78+ let Some ( tx_hash) = inbound. recv ( ) . await else {
7379 debug ! ( "upstream task gone" ) ;
7480 break ;
75- }
81+ } ;
82+ let fut = self . log_tx ( tx_hash) ;
83+ tokio:: spawn ( fut) ;
7684 }
7785 } ) ;
7886
0 commit comments