File tree Expand file tree Collapse file tree 4 files changed +41
-14
lines changed
magicblock-replicator/src Expand file tree Collapse file tree 4 files changed +41
-14
lines changed Original file line number Diff line number Diff line change @@ -4,6 +4,7 @@ use async_nats::jetstream::consumer::{
44 pull:: { Config as PullConfig , Stream as MessageStream } ,
55 AckPolicy , DeliverPolicy , PullConsumer ,
66} ;
7+ use tokio_util:: sync:: CancellationToken ;
78use tracing:: warn;
89
910use super :: cfg;
@@ -58,18 +59,27 @@ impl Consumer {
5859 ///
5960 /// Use this in a `tokio::select!` loop to process messages as they arrive.
6061 /// Messages are fetched in batches for efficiency.
61- pub async fn messages ( & self ) -> MessageStream {
62+ pub async fn messages (
63+ & self ,
64+ cancel : & CancellationToken ,
65+ ) -> Option < MessageStream > {
6266 loop {
63- let result = self
67+ let messages = self
6468 . inner
6569 . stream ( )
6670 . max_messages_per_batch ( cfg:: BATCH_SIZE )
67- . messages ( )
68- . await ;
69- match result {
70- Ok ( s) => break s,
71- Err ( error) => {
72- warn ! ( %error, "failed to create message stream" )
71+ . messages ( ) ;
72+ tokio:: select! {
73+ result = messages => {
74+ match result {
75+ Ok ( s) => break Some ( s) ,
76+ Err ( error) => {
77+ warn!( %error, "failed to create message stream" )
78+ }
79+ }
80+ }
81+ _ = cancel. cancelled( ) => {
82+ break None ;
7383 }
7484 }
7585 }
Original file line number Diff line number Diff line change 22
33use async_nats:: jetstream:: kv:: { Operation , Watch } ;
44use futures:: StreamExt ;
5+ use tokio_util:: sync:: CancellationToken ;
56use tracing:: warn;
67
78use super :: cfg;
@@ -17,8 +18,14 @@ pub struct LockWatcher {
1718
1819impl LockWatcher {
1920 /// Creates a new lock watcher.
20- pub ( crate ) async fn new ( broker : & Broker ) -> Self {
21+ pub ( crate ) async fn new (
22+ broker : & Broker ,
23+ cancel : & CancellationToken ,
24+ ) -> Option < Self > {
2125 let watch = loop {
26+ if cancel. is_cancelled ( ) {
27+ return None ;
28+ }
2229 let store = match broker. ctx . get_key_value ( cfg:: PRODUCER_LOCK ) . await
2330 {
2431 Ok ( s) => s,
@@ -35,7 +42,7 @@ impl LockWatcher {
3542 }
3643 }
3744 } ;
38- Self { watch }
45+ Some ( Self { watch } )
3946 }
4047
4148 /// Waits for the lock to be deleted or expire.
Original file line number Diff line number Diff line change @@ -186,7 +186,10 @@ impl ReplicationContext {
186186 let Some ( consumer) = self . create_consumer ( reset) . await else {
187187 return Ok ( None ) ;
188188 } ;
189- let watcher = LockWatcher :: new ( & self . broker ) . await ;
189+ let Some ( watcher) = LockWatcher :: new ( & self . broker , & self . cancel ) . await
190+ else {
191+ return Ok ( None ) ;
192+ } ;
190193 self . enter_replica_mode ( ) . await ;
191194 Ok ( Some ( Standby :: new (
192195 self ,
Original file line number Diff line number Diff line change @@ -49,7 +49,10 @@ impl Standby {
4949 /// Returns `Some(Primary)` on promotion, `None` on shutdown.
5050 pub async fn run ( mut self ) -> Result < Option < Primary > > {
5151 let mut timeout_check = tokio:: time:: interval ( Duration :: from_secs ( 1 ) ) ;
52- let mut stream = self . consumer . messages ( ) . await ;
52+ let Some ( mut stream) = self . consumer . messages ( & self . ctx . cancel ) . await
53+ else {
54+ return Ok ( None ) ;
55+ } ;
5356
5457 loop {
5558 tokio:: select! {
@@ -63,8 +66,12 @@ impl Standby {
6366 }
6467 result = stream. next( ) => {
6568 let Some ( result) = result else {
66- stream = self . consumer. messages( ) . await ;
67- continue ;
69+ if let Some ( s) = self . consumer. messages( & self . ctx. cancel) . await {
70+ stream = s;
71+ continue ;
72+ } else {
73+ return Ok ( None ) ;
74+ } ;
6875 } ;
6976 match result {
7077 Ok ( msg) => {
You can’t perform that action at this time.
0 commit comments