@@ -45,7 +45,7 @@ impl EnvTask {
45
45
}
46
46
47
47
/// Construct the BlockEnv and send it to the sender.
48
- pub async fn task_fut ( self , sender : watch:: Sender < BlockEnv > ) {
48
+ pub async fn task_fut ( self , sender : watch:: Sender < Option < BlockEnv > > ) {
49
49
let span = info_span ! ( "EnvTask::task_fut::init" ) ;
50
50
let mut poller = match self . provider . watch_blocks ( ) . instrument ( span. clone ( ) ) . await {
51
51
Ok ( poller) => poller,
@@ -64,7 +64,10 @@ impl EnvTask {
64
64
blocks. next ( ) . instrument ( info_span ! ( "EnvTask::task_fut::stream" ) ) . await
65
65
{
66
66
let Some ( block) = blocks. last ( ) else {
67
+ // Clear the env, as the block has changed but we can't
68
+ // get a new one
67
69
debug ! ( "empty filter changes" ) ;
70
+ let _ = sender. send ( None ) ;
68
71
continue ;
69
72
} ;
70
73
let span = info_span ! ( "EnvTask::task_fut::loop" , hash = %block, number = tracing:: field:: Empty ) ;
@@ -79,11 +82,13 @@ impl EnvTask {
79
82
Ok ( Some ( block) ) => block. header . inner ,
80
83
Ok ( None ) => {
81
84
let _span = span. enter ( ) ;
85
+ let _ = sender. send ( None ) ;
82
86
debug ! ( "block not found" ) ;
83
87
continue ;
84
88
}
85
89
Err ( err) => {
86
90
let _span = span. enter ( ) ;
91
+ let _ = sender. send ( None ) ;
87
92
error ! ( %err, "Failed to get latest block" ) ;
88
93
break ;
89
94
}
@@ -92,16 +97,16 @@ impl EnvTask {
92
97
93
98
let env = self . construct_block_env ( & previous) ;
94
99
debug ! ( ?env, "constructed block env" ) ;
95
- if let Err ( _) = sender. send ( env) {
100
+ if let Err ( _) = sender. send ( Some ( env) ) {
96
101
// The receiver has been dropped, so we can stop the task.
97
102
break ;
98
103
}
99
104
}
100
105
}
101
106
102
107
/// Spawn the task and return a watch::Receiver for the BlockEnv.
103
- pub fn spawn ( self ) -> watch:: Receiver < BlockEnv > {
104
- let ( sender, receiver) = watch:: channel ( BlockEnv :: default ( ) ) ;
108
+ pub fn spawn ( self ) -> watch:: Receiver < Option < BlockEnv > > {
109
+ let ( sender, receiver) = watch:: channel ( None ) ;
105
110
let fut = self . task_fut ( sender) ;
106
111
tokio:: spawn ( fut) ;
107
112
0 commit comments