diff --git a/engine/packages/gasoline/src/ctx/operation.rs b/engine/packages/gasoline/src/ctx/operation.rs index 99fbd6071f..167b0f9af4 100644 --- a/engine/packages/gasoline/src/ctx/operation.rs +++ b/engine/packages/gasoline/src/ctx/operation.rs @@ -29,7 +29,7 @@ pub struct OperationCtx { pools: rivet_pools::Pools, cache: rivet_cache::Cache, msg_ctx: MessageCtx, - from_workflow: bool, + pub(crate) from_workflow: bool, } impl OperationCtx { diff --git a/engine/packages/pegboard-gateway/src/lib.rs b/engine/packages/pegboard-gateway/src/lib.rs index 7542fdfe96..39bad2a7c2 100644 --- a/engine/packages/pegboard-gateway/src/lib.rs +++ b/engine/packages/pegboard-gateway/src/lib.rs @@ -364,7 +364,6 @@ impl CustomServeTrait for PegboardGateway { ) => { tracing::debug!(?close, "server closed websocket"); - if open_msg.can_hibernate && close.retry { // Successful closure return Err(WebSocketServiceRetry.build()); diff --git a/engine/packages/pegboard-serverless/src/lib.rs b/engine/packages/pegboard-serverless/src/lib.rs index a5125a90a1..19d9c9386c 100644 --- a/engine/packages/pegboard-serverless/src/lib.rs +++ b/engine/packages/pegboard-serverless/src/lib.rs @@ -387,6 +387,16 @@ async fn outbound_handler( return Ok(()); } + Err(sse::Error::InvalidStatusCode(code, res)) => { + let body = res + .text() + .await + .unwrap_or_else(|_| "".to_string()); + bail!( + "invalid status code ({code}):\n{}", + util::safe_slice(&body, 0, 512) + ); + } Err(err) => return Err(err.into()), } } diff --git a/engine/packages/pegboard/src/workflows/actor/actor_keys.rs b/engine/packages/pegboard/src/workflows/actor/keys.rs similarity index 97% rename from engine/packages/pegboard/src/workflows/actor/actor_keys.rs rename to engine/packages/pegboard/src/workflows/actor/keys.rs index ec2746e1df..f63c5d95af 100644 --- a/engine/packages/pegboard/src/workflows/actor/actor_keys.rs +++ b/engine/packages/pegboard/src/workflows/actor/keys.rs @@ -241,21 +241,20 @@ pub async fn reserve_actor_key( input.name.clone(), input.key.clone(), )); - let (start, end) = actor_key_subspace.range(); let mut stream = tx.get_ranges_keyvalues( universaldb::RangeOption { mode: StreamingMode::Iterator, - ..(start, end).into() + ..(&actor_key_subspace).into() }, Serializable, ); while let Some(entry) = stream.try_next().await? { - let (_idx_key, data) = tx.read_entry::(&entry)?; + let (idx_key, data) = tx.read_entry::(&entry)?; if !data.is_destroyed { return Ok(ReserveActorKeyOutput::ExistingActor { - existing_actor_id: _idx_key.actor_id, + existing_actor_id: idx_key.actor_id, }); } } diff --git a/engine/packages/pegboard/src/workflows/actor/mod.rs b/engine/packages/pegboard/src/workflows/actor/mod.rs index e657fad85f..8659a6a523 100644 --- a/engine/packages/pegboard/src/workflows/actor/mod.rs +++ b/engine/packages/pegboard/src/workflows/actor/mod.rs @@ -5,8 +5,8 @@ use rivet_types::actors::CrashPolicy; use crate::{errors, workflows::runner::AllocatePendingActorsInput}; -mod actor_keys; mod destroy; +mod keys; mod runtime; mod setup; @@ -147,7 +147,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> .await?; if let Some(key) = &input.key { - match actor_keys::reserve_key( + match keys::reserve_key( ctx, input.namespace_id, input.name.clone(), @@ -156,8 +156,8 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> ) .await? { - actor_keys::ReserveKeyOutput::Success => {} - actor_keys::ReserveKeyOutput::ForwardToDatacenter { dc_label } => { + keys::ReserveKeyOutput::Success => {} + keys::ReserveKeyOutput::ForwardToDatacenter { dc_label } => { ctx.msg(Failed { error: errors::Actor::KeyReservedInDifferentDatacenter { datacenter_label: dc_label, @@ -181,7 +181,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> return Ok(()); } - actor_keys::ReserveKeyOutput::KeyExists { existing_actor_id } => { + keys::ReserveKeyOutput::KeyExists { existing_actor_id } => { ctx.msg(Failed { error: errors::Actor::DuplicateKey { key: key.clone(), @@ -696,6 +696,7 @@ pub struct Lost { /// Immediately reschedules the actor regardless of its crash policy. pub force_reschedule: bool, /// Resets the rescheduling retry count to 0. + #[serde(default)] pub reset_rescheduling: bool, } diff --git a/engine/packages/pegboard/src/workflows/runner.rs b/engine/packages/pegboard/src/workflows/runner.rs index 14cd0d42a5..33c4840d0c 100644 --- a/engine/packages/pegboard/src/workflows/runner.rs +++ b/engine/packages/pegboard/src/workflows/runner.rs @@ -1024,8 +1024,6 @@ pub(crate) async fn allocate_pending_actors( let mut stream = tx.get_ranges_keyvalues( universaldb::RangeOption { mode: StreamingMode::Iterator, - // Containers bin pack so we reverse the order - reverse: true, ..(&runner_alloc_subspace).into() }, // NOTE: This is not Serializable because we don't want to conflict with all of the