@@ -619,17 +619,12 @@ pub async fn reschedule_actor(
619619 tracing:: debug!( actor_id=?input. actor_id, "rescheduling actor" ) ;
620620
621621 // Determine next backoff sleep duration
622- let mut backoff = util:: backoff:: Backoff :: new_at (
623- 8 ,
624- None ,
625- BASE_RETRY_TIMEOUT_MS ,
626- 500 ,
627- state. reschedule_state . retry_count ,
628- ) ;
622+ let mut backoff = reschedule_backoff ( state. reschedule_state . retry_count ) ;
629623
630624 let ( now, reset) = ctx
631625 . v ( 2 )
632626 . activity ( CompareRetryInput {
627+ retry_count : state. reschedule_state . retry_count ,
633628 last_retry_ts : state. reschedule_state . last_retry_ts ,
634629 } )
635630 . await ?;
@@ -720,15 +715,27 @@ pub async fn clear_pending_allocation(
720715
721716#[ derive( Debug , Serialize , Deserialize , Hash ) ]
722717struct CompareRetryInput {
718+ #[ serde( default ) ]
719+ retry_count : usize ,
723720 last_retry_ts : i64 ,
724721}
725722
726723#[ activity( CompareRetry ) ]
727724async fn compare_retry ( ctx : & ActivityCtx , input : & CompareRetryInput ) -> Result < ( i64 , bool ) > {
728- let now = util :: timestamp :: now ( ) ;
725+ let mut state = ctx . state :: < State > ( ) ? ;
729726
727+ let now = util:: timestamp:: now ( ) ;
730728 // If the last retry ts is more than RETRY_RESET_DURATION_MS ago, reset retry count
731- Ok ( ( now, input. last_retry_ts < now - RETRY_RESET_DURATION_MS ) )
729+ let reset = input. last_retry_ts < now - RETRY_RESET_DURATION_MS ;
730+
731+ if reset {
732+ state. reschedule_ts = None ;
733+ } else {
734+ let backoff = reschedule_backoff ( input. retry_count ) ;
735+ state. reschedule_ts = Some ( now + i64:: try_from ( backoff. current_duration ( ) ) ?) ;
736+ }
737+
738+ Ok ( ( now, reset) )
732739}
733740
734741#[ derive( Debug , Serialize , Deserialize , Hash ) ]
@@ -740,7 +747,9 @@ pub struct SetStartedInput {
740747pub async fn set_started ( ctx : & ActivityCtx , input : & SetStartedInput ) -> Result < ( ) > {
741748 let mut state = ctx. state :: < State > ( ) ?;
742749
743- state. start_ts = Some ( util:: timestamp:: now ( ) ) ;
750+ if state. start_ts . is_none ( ) {
751+ state. start_ts = Some ( util:: timestamp:: now ( ) ) ;
752+ }
744753 state. connectable_ts = Some ( util:: timestamp:: now ( ) ) ;
745754
746755 ctx. udb ( ) ?
@@ -800,3 +809,7 @@ pub async fn set_complete(ctx: &ActivityCtx, input: &SetCompleteInput) -> Result
800809
801810 Ok ( ( ) )
802811}
812+
813+ fn reschedule_backoff ( retry_count : usize ) -> util:: backoff:: Backoff {
814+ util:: backoff:: Backoff :: new_at ( 8 , None , BASE_RETRY_TIMEOUT_MS , 500 , retry_count)
815+ }
0 commit comments