Skip to content

Commit 1527e67

Browse files
committed
chore(pegboard): allow configuring base_retry_timeout, actor_start_threshold, actor_stop_threshold, and retry_reset_duration
1 parent 7547022 commit 1527e67

File tree

4 files changed

+92
-28
lines changed

4 files changed

+92
-28
lines changed

engine/packages/config/src/config/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ pub mod clickhouse;
1111
pub mod db;
1212
pub mod guard;
1313
pub mod logs;
14+
pub mod pegboard;
1415
pub mod pubsub;
1516
pub mod telemetry;
1617
pub mod topology;
@@ -24,6 +25,7 @@ pub use clickhouse::*;
2425
pub use db::Database;
2526
pub use guard::*;
2627
pub use logs::*;
28+
pub use pegboard::*;
2729
pub use pubsub::PubSub;
2830
pub use telemetry::*;
2931
pub use topology::*;
@@ -72,6 +74,9 @@ pub struct Root {
7274
#[serde(default)]
7375
pub api_peer: Option<ApiPeer>,
7476

77+
#[serde(default)]
78+
pub pegboard: Option<Pegboard>,
79+
7580
#[serde(default)]
7681
pub logs: Option<Logs>,
7782

@@ -107,6 +112,7 @@ impl Default for Root {
107112
guard: None,
108113
api_public: None,
109114
api_peer: None,
115+
pegboard: None,
110116
logs: None,
111117
topology: None,
112118
database: None,
@@ -136,6 +142,11 @@ impl Root {
136142
self.api_peer.as_ref().unwrap_or(&DEFAULT)
137143
}
138144

145+
pub fn pegboard(&self) -> &Pegboard {
146+
static DEFAULT: LazyLock<Pegboard> = LazyLock::new(Pegboard::default);
147+
self.pegboard.as_ref().unwrap_or(&DEFAULT)
148+
}
149+
139150
pub fn logs(&self) -> &Logs {
140151
static DEFAULT: LazyLock<Logs> = LazyLock::new(Logs::default);
141152
self.logs.as_ref().unwrap_or(&DEFAULT)
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
use schemars::JsonSchema;
2+
use serde::{Deserialize, Serialize};
3+
4+
#[derive(Debug, Serialize, Deserialize, Clone, Default, JsonSchema)]
5+
#[serde(deny_unknown_fields)]
6+
pub struct Pegboard {
7+
/// Time to delay an actor from rescheduling after a rescheduling failure.
8+
///
9+
/// Unit is in milliseconds.
10+
///
11+
/// **Experimental**
12+
pub base_retry_timeout: Option<usize>,
13+
/// How long to wait after creating and not receiving a starting state before setting actor as lost.
14+
///
15+
/// Unit is in milliseconds.
16+
///
17+
/// **Experimental**
18+
pub actor_start_threshold: Option<i64>,
19+
/// How long to wait after stopping and not receiving a stop state before setting actor as lost.
20+
///
21+
/// Unit is in milliseconds.
22+
///
23+
/// **Experimental**
24+
pub actor_stop_threshold: Option<i64>,
25+
/// How long an actor goes without retries before it's retry count is reset to 0, effectively resetting its
26+
/// backoff to 0.
27+
///
28+
/// Unit is in milliseconds.
29+
///
30+
/// **Experimental**
31+
pub retry_reset_duration: Option<i64>,
32+
}
33+
34+
impl Pegboard {
35+
pub fn base_retry_timeout(&self) -> usize {
36+
self.base_retry_timeout.unwrap_or(2000)
37+
}
38+
39+
pub fn actor_start_threshold(&self) -> i64 {
40+
self.actor_start_threshold.unwrap_or(30_000)
41+
}
42+
43+
pub fn actor_stop_threshold(&self) -> i64 {
44+
self.actor_stop_threshold.unwrap_or(30_000)
45+
}
46+
47+
pub fn retry_reset_duration(&self) -> i64 {
48+
self.retry_reset_duration.unwrap_or(10 * 60 * 1000)
49+
}
50+
}

engine/packages/pegboard/src/workflows/actor/mod.rs

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,6 @@ mod keys;
1010
mod runtime;
1111
mod setup;
1212

13-
/// Time to delay an actor from rescheduling after a rescheduling failure.
14-
const BASE_RETRY_TIMEOUT_MS: usize = 2000;
15-
/// How long to wait after creating and not receiving a starting state before setting actor as lost.
16-
const ACTOR_START_THRESHOLD_MS: i64 = util::duration::seconds(30);
17-
/// How long to wait after stopping and not receiving a stop state before setting actor as lost.
18-
const ACTOR_STOP_THRESHOLD_MS: i64 = util::duration::seconds(30);
19-
/// How long an actor goes without retries before it's retry count is reset to 0, effectively resetting its
20-
/// backoff to 0.
21-
const RETRY_RESET_DURATION_MS: i64 = util::duration::minutes(10);
22-
2313
#[derive(Clone, Debug, Serialize, Deserialize, Hash)]
2414
pub struct Input {
2515
pub actor_id: Id,
@@ -227,7 +217,11 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
227217
runtime::SpawnActorOutput::Allocated {
228218
runner_id,
229219
runner_workflow_id,
230-
} => runtime::LifecycleState::new(runner_id, runner_workflow_id),
220+
} => runtime::LifecycleState::new(
221+
runner_id,
222+
runner_workflow_id,
223+
ctx.config().pegboard().actor_start_threshold(),
224+
),
231225
runtime::SpawnActorOutput::Sleep => {
232226
ctx.activity(runtime::SetSleepingInput {
233227
actor_id: input.actor_id,
@@ -311,8 +305,10 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
311305
}) => match intent {
312306
protocol::ActorIntent::ActorIntentSleep => {
313307
if !state.sleeping {
314-
state.gc_timeout_ts =
315-
Some(util::timestamp::now() + ACTOR_STOP_THRESHOLD_MS);
308+
state.gc_timeout_ts = Some(
309+
util::timestamp::now()
310+
+ ctx.config().pegboard().actor_stop_threshold(),
311+
);
316312
state.sleeping = true;
317313

318314
ctx.activity(runtime::SetSleepingInput {
@@ -333,8 +329,10 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
333329
}
334330
}
335331
protocol::ActorIntent::ActorIntentStop => {
336-
state.gc_timeout_ts =
337-
Some(util::timestamp::now() + ACTOR_STOP_THRESHOLD_MS);
332+
state.gc_timeout_ts = Some(
333+
util::timestamp::now()
334+
+ ctx.config().pegboard().actor_stop_threshold(),
335+
);
338336

339337
ctx.activity(runtime::SetNotConnectableInput {
340338
actor_id: input.actor_id,

engine/packages/pegboard/src/workflows/actor/runtime.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
use base64::Engine;
21
use base64::prelude::BASE64_STANDARD;
2+
use base64::Engine;
33
use futures_util::StreamExt;
44
use futures_util::TryStreamExt;
55
use gas::prelude::*;
@@ -15,10 +15,7 @@ use universaldb::utils::{FormalKey, IsolationLevel::*};
1515

1616
use crate::{keys, metrics, workflows::runner::RUNNER_ELIGIBLE_THRESHOLD_MS};
1717

18-
use super::{
19-
ACTOR_START_THRESHOLD_MS, Allocate, BASE_RETRY_TIMEOUT_MS, Destroy, Input, PendingAllocation,
20-
RETRY_RESET_DURATION_MS, State, destroy,
21-
};
18+
use super::{destroy, Allocate, Destroy, Input, PendingAllocation, State};
2219

2320
#[derive(Deserialize, Serialize)]
2421
pub struct LifecycleState {
@@ -42,7 +39,7 @@ pub struct LifecycleState {
4239
}
4340

4441
impl LifecycleState {
45-
pub fn new(runner_id: Id, runner_workflow_id: Id) -> Self {
42+
pub fn new(runner_id: Id, runner_workflow_id: Id, actor_start_threshold: i64) -> Self {
4643
LifecycleState {
4744
generation: 0,
4845
runner_id: Some(runner_id),
@@ -51,7 +48,7 @@ impl LifecycleState {
5148
will_wake: false,
5249
wake_for_alarm: false,
5350
alarm_ts: None,
54-
gc_timeout_ts: Some(util::timestamp::now() + ACTOR_START_THRESHOLD_MS),
51+
gc_timeout_ts: Some(util::timestamp::now() + actor_start_threshold),
5552
reschedule_state: RescheduleState::default(),
5653
}
5754
}
@@ -619,7 +616,10 @@ pub async fn reschedule_actor(
619616
tracing::debug!(actor_id=?input.actor_id, "rescheduling actor");
620617

621618
// Determine next backoff sleep duration
622-
let mut backoff = reschedule_backoff(state.reschedule_state.retry_count);
619+
let mut backoff = reschedule_backoff(
620+
state.reschedule_state.retry_count,
621+
ctx.config().pegboard().base_retry_timeout(),
622+
);
623623

624624
let (now, reset) = ctx
625625
.v(2)
@@ -670,7 +670,8 @@ pub async fn reschedule_actor(
670670
state.runner_workflow_id = Some(*runner_workflow_id);
671671

672672
// Reset gc timeout once allocated
673-
state.gc_timeout_ts = Some(util::timestamp::now() + ACTOR_START_THRESHOLD_MS);
673+
state.gc_timeout_ts =
674+
Some(util::timestamp::now() + ctx.config().pegboard().actor_start_threshold());
674675
}
675676

676677
Ok(spawn_res)
@@ -725,13 +726,17 @@ async fn compare_retry(ctx: &ActivityCtx, input: &CompareRetryInput) -> Result<(
725726
let mut state = ctx.state::<State>()?;
726727

727728
let now = util::timestamp::now();
729+
728730
// If the last retry ts is more than RETRY_RESET_DURATION_MS ago, reset retry count
729-
let reset = input.last_retry_ts < now - RETRY_RESET_DURATION_MS;
731+
let reset = input.last_retry_ts < now - ctx.config().pegboard().retry_reset_duration();
730732

731733
if reset {
732734
state.reschedule_ts = None;
733735
} else {
734-
let backoff = reschedule_backoff(input.retry_count);
736+
let backoff = reschedule_backoff(
737+
input.retry_count,
738+
ctx.config().pegboard().base_retry_timeout(),
739+
);
735740
state.reschedule_ts = Some(now + i64::try_from(backoff.current_duration())?);
736741
}
737742

@@ -810,6 +815,6 @@ pub async fn set_complete(ctx: &ActivityCtx, input: &SetCompleteInput) -> Result
810815
Ok(())
811816
}
812817

813-
fn reschedule_backoff(retry_count: usize) -> util::backoff::Backoff {
814-
util::backoff::Backoff::new_at(8, None, BASE_RETRY_TIMEOUT_MS, 500, retry_count)
818+
fn reschedule_backoff(retry_count: usize, base_retry_timeout: usize) -> util::backoff::Backoff {
819+
util::backoff::Backoff::new_at(8, None, base_retry_timeout, 500, retry_count)
815820
}

0 commit comments

Comments
 (0)