diff --git a/engine/artifacts/openapi.json b/engine/artifacts/openapi.json index 517c5fa80b..091deb365a 100644 --- a/engine/artifacts/openapi.json +++ b/engine/artifacts/openapi.json @@ -830,14 +830,16 @@ "integer", "null" ], - "format": "int64" + "format": "int64", + "description": "Denotes when the actor was last connectable. Null if actor is not running." }, "crash_policy": { "$ref": "#/components/schemas/CrashPolicy" }, "create_ts": { "type": "integer", - "format": "int64" + "format": "int64", + "description": "Denotes when the actor was first created." }, "datacenter": { "type": "string" @@ -847,7 +849,8 @@ "integer", "null" ], - "format": "int64" + "format": "int64", + "description": "Denotes when the actor was destroyed." }, "key": { "type": [ @@ -866,7 +869,16 @@ "integer", "null" ], - "format": "int64" + "format": "int64", + "description": "Denotes when the actor started waiting for an allocation." + }, + "reschedule_ts": { + "type": [ + "integer", + "null" + ], + "format": "int64", + "description": "Denotes when the actor will try to allocate again. If this is set, the actor will not attempt to\nallocate until the given timestamp." }, "runner_name_selector": { "type": "string" @@ -876,14 +888,16 @@ "integer", "null" ], - "format": "int64" + "format": "int64", + "description": "Denotes when the actor entered a sleeping state." }, "start_ts": { "type": [ "integer", "null" ], - "format": "int64" + "format": "int64", + "description": "Denotes when the actor was first made connectable. Null if never." } } }, diff --git a/engine/packages/pegboard/src/ops/actor/get.rs b/engine/packages/pegboard/src/ops/actor/get.rs index 237453c19f..fbf0977d5d 100644 --- a/engine/packages/pegboard/src/ops/actor/get.rs +++ b/engine/packages/pegboard/src/ops/actor/get.rs @@ -87,10 +87,11 @@ pub async fn pegboard_actor_get(ctx: &OperationCtx, input: &Input) -> Result Re start_ts: actor_state.start_ts, sleep_ts: actor_state.sleep_ts, connectable_ts: actor_state.connectable_ts, + reschedule_ts: actor_state.reschedule_ts, destroy_ts: actor_state.destroy_ts, }); } diff --git a/engine/packages/pegboard/src/workflows/actor/mod.rs b/engine/packages/pegboard/src/workflows/actor/mod.rs index 8659a6a523..42fecfa634 100644 --- a/engine/packages/pegboard/src/workflows/actor/mod.rs +++ b/engine/packages/pegboard/src/workflows/actor/mod.rs @@ -58,6 +58,8 @@ pub struct State { pub complete_ts: Option, pub connectable_ts: Option, pub pending_allocation_ts: Option, + #[serde(default)] + pub reschedule_ts: Option, pub destroy_ts: Option, // Null if not allocated @@ -93,6 +95,7 @@ impl State { sleep_ts: None, connectable_ts: None, complete_ts: None, + reschedule_ts: None, destroy_ts: None, runner_id: None, diff --git a/engine/packages/pegboard/src/workflows/actor/runtime.rs b/engine/packages/pegboard/src/workflows/actor/runtime.rs index f4d03e80a5..f6c23457aa 100644 --- a/engine/packages/pegboard/src/workflows/actor/runtime.rs +++ b/engine/packages/pegboard/src/workflows/actor/runtime.rs @@ -619,17 +619,12 @@ pub async fn reschedule_actor( tracing::debug!(actor_id=?input.actor_id, "rescheduling actor"); // Determine next backoff sleep duration - let mut backoff = util::backoff::Backoff::new_at( - 8, - None, - BASE_RETRY_TIMEOUT_MS, - 500, - state.reschedule_state.retry_count, - ); + let mut backoff = reschedule_backoff(state.reschedule_state.retry_count); let (now, reset) = ctx .v(2) .activity(CompareRetryInput { + retry_count: state.reschedule_state.retry_count, last_retry_ts: state.reschedule_state.last_retry_ts, }) .await?; @@ -720,15 +715,27 @@ pub async fn clear_pending_allocation( #[derive(Debug, Serialize, Deserialize, Hash)] struct CompareRetryInput { + #[serde(default)] + retry_count: usize, last_retry_ts: i64, } #[activity(CompareRetry)] async fn compare_retry(ctx: &ActivityCtx, input: &CompareRetryInput) -> Result<(i64, bool)> { - let now = util::timestamp::now(); + let mut state = ctx.state::()?; + let now = util::timestamp::now(); // If the last retry ts is more than RETRY_RESET_DURATION_MS ago, reset retry count - Ok((now, input.last_retry_ts < now - RETRY_RESET_DURATION_MS)) + let reset = input.last_retry_ts < now - RETRY_RESET_DURATION_MS; + + if reset { + state.reschedule_ts = None; + } else { + let backoff = reschedule_backoff(input.retry_count); + state.reschedule_ts = Some(now + i64::try_from(backoff.current_duration())?); + } + + Ok((now, reset)) } #[derive(Debug, Serialize, Deserialize, Hash)] @@ -740,7 +747,9 @@ pub struct SetStartedInput { pub async fn set_started(ctx: &ActivityCtx, input: &SetStartedInput) -> Result<()> { let mut state = ctx.state::()?; - state.start_ts = Some(util::timestamp::now()); + if state.start_ts.is_none() { + state.start_ts = Some(util::timestamp::now()); + } state.connectable_ts = Some(util::timestamp::now()); ctx.udb()? @@ -800,3 +809,7 @@ pub async fn set_complete(ctx: &ActivityCtx, input: &SetCompleteInput) -> Result Ok(()) } + +fn reschedule_backoff(retry_count: usize) -> util::backoff::Backoff { + util::backoff::Backoff::new_at(8, None, BASE_RETRY_TIMEOUT_MS, 500, retry_count) +} diff --git a/engine/packages/types/src/actors.rs b/engine/packages/types/src/actors.rs index ed22c8e553..0b7bde45a9 100644 --- a/engine/packages/types/src/actors.rs +++ b/engine/packages/types/src/actors.rs @@ -15,11 +15,20 @@ pub struct Actor { pub runner_name_selector: String, pub crash_policy: CrashPolicy, + /// Denotes when the actor was first created. pub create_ts: i64, + /// Denotes when the actor was first made connectable. Null if never. pub start_ts: Option, + /// Denotes when the actor started waiting for an allocation. pub pending_allocation_ts: Option, + /// Denotes when the actor was last connectable. Null if actor is not running. pub connectable_ts: Option, + /// Denotes when the actor entered a sleeping state. pub sleep_ts: Option, + /// Denotes when the actor will try to allocate again. If this is set, the actor will not attempt to + /// allocate until the given timestamp. + pub reschedule_ts: Option, + /// Denotes when the actor was destroyed. pub destroy_ts: Option, } diff --git a/engine/packages/util/src/backoff.rs b/engine/packages/util/src/backoff.rs index 617826d2af..183f25042e 100644 --- a/engine/packages/util/src/backoff.rs +++ b/engine/packages/util/src/backoff.rs @@ -85,8 +85,7 @@ impl Backoff { return None; } - let next_wait = self.wait * 2usize.pow(self.i.min(self.max_exponent) as u32) - + rand::thread_rng().gen_range(0..self.randomness); + let next_wait = self.current_duration() + rand::thread_rng().gen_range(0..self.randomness); self.sleep_until += Duration::from_millis(next_wait as u64); self.i += 1;