Skip to content

Commit b8fface

Browse files
committed
feat(pb): add reschedule_ts to actors
1 parent d3a8681 commit b8fface

File tree

7 files changed

+60
-20
lines changed

7 files changed

+60
-20
lines changed

engine/artifacts/openapi.json

Lines changed: 20 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/pegboard/src/ops/actor/get.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,11 @@ pub async fn pegboard_actor_get(ctx: &OperationCtx, input: &Input) -> Result<Out
8787
crash_policy: actor_state.crash_policy,
8888

8989
create_ts: actor_state.create_ts,
90-
pending_allocation_ts: actor_state.pending_allocation_ts,
9190
start_ts: actor_state.start_ts,
92-
sleep_ts: actor_state.sleep_ts,
91+
pending_allocation_ts: actor_state.pending_allocation_ts,
9392
connectable_ts: actor_state.connectable_ts,
93+
sleep_ts: actor_state.sleep_ts,
94+
reschedule_ts: actor_state.reschedule_ts,
9495
destroy_ts: actor_state.destroy_ts,
9596
});
9697
}

engine/packages/pegboard/src/ops/actor/list_for_ns.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ pub async fn pegboard_actor_list_for_ns(ctx: &OperationCtx, input: &Input) -> Re
198198
start_ts: actor_state.start_ts,
199199
sleep_ts: actor_state.sleep_ts,
200200
connectable_ts: actor_state.connectable_ts,
201+
reschedule_ts: actor_state.reschedule_ts,
201202
destroy_ts: actor_state.destroy_ts,
202203
});
203204
}

engine/packages/pegboard/src/workflows/actor/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ pub struct State {
5858
pub complete_ts: Option<i64>,
5959
pub connectable_ts: Option<i64>,
6060
pub pending_allocation_ts: Option<i64>,
61+
#[serde(default)]
62+
pub reschedule_ts: Option<i64>,
6163
pub destroy_ts: Option<i64>,
6264

6365
// Null if not allocated
@@ -93,6 +95,7 @@ impl State {
9395
sleep_ts: None,
9496
connectable_ts: None,
9597
complete_ts: None,
98+
reschedule_ts: None,
9699
destroy_ts: None,
97100

98101
runner_id: None,

engine/packages/pegboard/src/workflows/actor/runtime.rs

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -619,17 +619,12 @@ pub async fn reschedule_actor(
619619
tracing::debug!(actor_id=?input.actor_id, "rescheduling actor");
620620

621621
// Determine next backoff sleep duration
622-
let mut backoff = util::backoff::Backoff::new_at(
623-
8,
624-
None,
625-
BASE_RETRY_TIMEOUT_MS,
626-
500,
627-
state.reschedule_state.retry_count,
628-
);
622+
let mut backoff = reschedule_backoff(state.reschedule_state.retry_count);
629623

630624
let (now, reset) = ctx
631625
.v(2)
632626
.activity(CompareRetryInput {
627+
retry_count: state.reschedule_state.retry_count,
633628
last_retry_ts: state.reschedule_state.last_retry_ts,
634629
})
635630
.await?;
@@ -720,15 +715,27 @@ pub async fn clear_pending_allocation(
720715

721716
#[derive(Debug, Serialize, Deserialize, Hash)]
722717
struct CompareRetryInput {
718+
#[serde(default)]
719+
retry_count: usize,
723720
last_retry_ts: i64,
724721
}
725722

726723
#[activity(CompareRetry)]
727724
async fn compare_retry(ctx: &ActivityCtx, input: &CompareRetryInput) -> Result<(i64, bool)> {
728-
let now = util::timestamp::now();
725+
let mut state = ctx.state::<State>()?;
729726

727+
let now = util::timestamp::now();
730728
// If the last retry ts is more than RETRY_RESET_DURATION_MS ago, reset retry count
731-
Ok((now, input.last_retry_ts < now - RETRY_RESET_DURATION_MS))
729+
let reset = input.last_retry_ts < now - RETRY_RESET_DURATION_MS;
730+
731+
if reset {
732+
state.reschedule_ts = None;
733+
} else {
734+
let backoff = reschedule_backoff(input.retry_count);
735+
state.reschedule_ts = Some(now + i64::try_from(backoff.current_duration())?);
736+
}
737+
738+
Ok((now, reset))
732739
}
733740

734741
#[derive(Debug, Serialize, Deserialize, Hash)]
@@ -740,7 +747,9 @@ pub struct SetStartedInput {
740747
pub async fn set_started(ctx: &ActivityCtx, input: &SetStartedInput) -> Result<()> {
741748
let mut state = ctx.state::<State>()?;
742749

743-
state.start_ts = Some(util::timestamp::now());
750+
if state.start_ts.is_none() {
751+
state.start_ts = Some(util::timestamp::now());
752+
}
744753
state.connectable_ts = Some(util::timestamp::now());
745754

746755
ctx.udb()?
@@ -800,3 +809,7 @@ pub async fn set_complete(ctx: &ActivityCtx, input: &SetCompleteInput) -> Result
800809

801810
Ok(())
802811
}
812+
813+
fn reschedule_backoff(retry_count: usize) -> util::backoff::Backoff {
814+
util::backoff::Backoff::new_at(8, None, BASE_RETRY_TIMEOUT_MS, 500, retry_count)
815+
}

engine/packages/types/src/actors.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,20 @@ pub struct Actor {
1515
pub runner_name_selector: String,
1616
pub crash_policy: CrashPolicy,
1717

18+
/// Denotes when the actor was first created.
1819
pub create_ts: i64,
20+
/// Denotes when the actor was first made connectable. Null if never.
1921
pub start_ts: Option<i64>,
22+
/// Denotes when the actor started waiting for an allocation.
2023
pub pending_allocation_ts: Option<i64>,
24+
/// Denotes when the actor was last connectable. Null if actor is not running.
2125
pub connectable_ts: Option<i64>,
26+
/// Denotes when the actor entered a sleeping state.
2227
pub sleep_ts: Option<i64>,
28+
/// Denotes when the actor will try to allocate again. If this is set, the actor will not attempt to
29+
/// allocate until the given timestamp.
30+
pub reschedule_ts: Option<i64>,
31+
/// Denotes when the actor was destroyed.
2332
pub destroy_ts: Option<i64>,
2433
}
2534

engine/packages/util/src/backoff.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,7 @@ impl Backoff {
8585
return None;
8686
}
8787

88-
let next_wait = self.wait * 2usize.pow(self.i.min(self.max_exponent) as u32)
89-
+ rand::thread_rng().gen_range(0..self.randomness);
88+
let next_wait = self.current_duration() + rand::thread_rng().gen_range(0..self.randomness);
9089
self.sleep_until += Duration::from_millis(next_wait as u64);
9190

9291
self.i += 1;

0 commit comments

Comments
 (0)