1
1
use crate :: config:: HostProvider ;
2
- use alloy:: { primitives:: TxHash , providers:: Provider as _} ;
2
+ use alloy:: {
3
+ primitives:: TxHash ,
4
+ providers:: { PendingTransactionBuilder , PendingTransactionError , Provider as _, WatchTxError } ,
5
+ } ;
3
6
use init4_bin_base:: deps:: {
4
7
metrics:: { counter, histogram} ,
5
- tracing:: { debug, error} ,
8
+ tracing:: { Instrument , debug, error, info_span } ,
6
9
} ;
7
- use std:: time:: Instant ;
10
+ use std:: time:: { Duration , Instant } ;
8
11
use tokio:: { sync:: mpsc, task:: JoinHandle } ;
9
12
10
13
/// Collects metrics on transactions sent by the Builder
@@ -15,64 +18,69 @@ pub struct MetricsTask {
15
18
}
16
19
17
20
impl MetricsTask {
18
- /// Given a transaction hash, record metrics on the result of the transaction mining
19
- pub async fn log_tx ( & self , pending_tx_hash : TxHash ) {
20
- // start timer when tx hash is received
21
- let start : Instant = Instant :: now ( ) ;
21
+ /// Given a transaction hash, record metrics on the result of the
22
+ /// transaction mining
23
+ pub fn log_tx ( & self , tx_hash : TxHash ) -> impl Future < Output = ( ) > + use<> {
24
+ let provider = self . host_provider . clone ( ) ;
22
25
23
- // wait for the tx to mine, get its receipt
24
- let receipt_result =
25
- self . host_provider . clone ( ) . get_transaction_receipt ( pending_tx_hash ) . await ;
26
+ async move {
27
+ // start timer when tx hash is received
28
+ let start : Instant = Instant :: now ( ) ;
26
29
27
- match receipt_result {
28
- Ok ( maybe_receipt) => {
29
- match maybe_receipt {
30
- Some ( receipt) => {
31
- // record how long it took to mine the transaction
32
- // potential improvement: use the block timestamp to calculate the time elapsed
33
- histogram ! ( "metrics.tx_mine_time" )
34
- . record ( start. elapsed ( ) . as_millis ( ) as f64 ) ;
30
+ let span = info_span ! ( "metrics_submission" , %tx_hash) ;
35
31
36
- // log whether the transaction reverted
37
- if receipt. status ( ) {
38
- counter ! ( "metrics.tx_reverted" ) . increment ( 1 ) ;
39
- debug ! ( tx_hash = %pending_tx_hash, "tx reverted" ) ;
40
- } else {
41
- counter ! ( "metrics.tx_succeeded" ) . increment ( 1 ) ;
42
- debug ! ( tx_hash = %pending_tx_hash, "tx succeeded" ) ;
43
- }
44
- }
45
- None => {
46
- counter ! ( "metrics.no_receipt" ) . increment ( 1 ) ;
47
- error ! ( "no receipt found for tx hash" ) ;
32
+ // wait for the tx to mine, get its receipt
33
+ let receipt = PendingTransactionBuilder :: new ( provider. root ( ) . clone ( ) , tx_hash)
34
+ . with_required_confirmations ( 1 )
35
+ . with_timeout ( Some ( Duration :: from_secs ( 60 ) ) )
36
+ . get_receipt ( )
37
+ . instrument ( span. clone ( ) )
38
+ . await ;
39
+
40
+ // enter the span to log the result
41
+ let _guard = span. entered ( ) ;
42
+
43
+ match receipt {
44
+ Ok ( receipt) => {
45
+ // record how long it took to mine the transaction
46
+ // potential improvement: use the block timestamp to calculate the time elapsed
47
+ histogram ! ( "metrics.tx_mine_time" ) . record ( start. elapsed ( ) . as_millis ( ) as f64 ) ;
48
+
49
+ // log whether the transaction reverted
50
+ if receipt. status ( ) {
51
+ counter ! ( "metrics.tx_reverted" ) . increment ( 1 ) ;
52
+ debug ! ( "tx reverted" ) ;
53
+ } else {
54
+ counter ! ( "metrics.tx_succeeded" ) . increment ( 1 ) ;
55
+ debug ! ( "tx succeeded" ) ;
48
56
}
49
57
}
50
- }
51
- Err ( e) => {
52
- counter ! ( "metrics.rpc_error" ) . increment ( 1 ) ;
53
- error ! ( error = ?e, "rpc error" ) ;
58
+ Err ( PendingTransactionError :: TxWatcher ( WatchTxError :: Timeout ) ) => {
59
+ // log that the transaction timed out
60
+ counter ! ( "metrics.tx_not_mined" ) . increment ( 1 ) ;
61
+ debug ! ( "tx not mined" ) ;
62
+ }
63
+ Err ( e) => {
64
+ counter ! ( "metrics.rpc_error" ) . increment ( 1 ) ;
65
+ error ! ( error = ?e, "rpc error" ) ;
66
+ }
54
67
}
55
68
}
56
69
}
57
70
58
71
/// Spawns the task which collects metrics on pending transactions
59
72
pub fn spawn ( self ) -> ( mpsc:: UnboundedSender < TxHash > , JoinHandle < ( ) > ) {
60
73
let ( sender, mut inbound) = mpsc:: unbounded_channel ( ) ;
74
+
61
75
let handle = tokio:: spawn ( async move {
62
76
debug ! ( "metrics task spawned" ) ;
63
77
loop {
64
- if let Some ( pending_tx_hash) = inbound. recv ( ) . await {
65
- let this = self . clone ( ) ;
66
- tokio:: spawn ( async move {
67
- debug ! ( "received tx hash" ) ;
68
- let that = this. clone ( ) ;
69
- that. log_tx ( pending_tx_hash) . await ;
70
- debug ! ( "logged tx metrics" ) ;
71
- } ) ;
72
- } else {
78
+ let Some ( tx_hash) = inbound. recv ( ) . await else {
73
79
debug ! ( "upstream task gone" ) ;
74
80
break ;
75
- }
81
+ } ;
82
+ let fut = self . log_tx ( tx_hash) ;
83
+ tokio:: spawn ( fut) ;
76
84
}
77
85
} ) ;
78
86
0 commit comments