Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion engine/packages/gasoline/src/ctx/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub struct OperationCtx {
pools: rivet_pools::Pools,
cache: rivet_cache::Cache,
msg_ctx: MessageCtx,
from_workflow: bool,
pub(crate) from_workflow: bool,
}

impl OperationCtx {
Expand Down
1 change: 0 additions & 1 deletion engine/packages/pegboard-gateway/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,6 @@ impl CustomServeTrait for PegboardGateway {
) => {
tracing::debug!(?close, "server closed websocket");


if open_msg.can_hibernate && close.retry {
// Successful closure
return Err(WebSocketServiceRetry.build());
Expand Down
10 changes: 10 additions & 0 deletions engine/packages/pegboard-serverless/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,16 @@ async fn outbound_handler(

return Ok(());
}
Err(sse::Error::InvalidStatusCode(code, res)) => {
let body = res
.text()
.await
.unwrap_or_else(|_| "<could not read body>".to_string());
bail!(
"invalid status code ({code}):\n{}",
util::safe_slice(&body, 0, 512)
);
}
Err(err) => return Err(err.into()),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,21 +241,20 @@ pub async fn reserve_actor_key(
input.name.clone(),
input.key.clone(),
));
let (start, end) = actor_key_subspace.range();

let mut stream = tx.get_ranges_keyvalues(
universaldb::RangeOption {
mode: StreamingMode::Iterator,
..(start, end).into()
..(&actor_key_subspace).into()
},
Serializable,
);

while let Some(entry) = stream.try_next().await? {
let (_idx_key, data) = tx.read_entry::<keys::ns::ActorByKeyKey>(&entry)?;
let (idx_key, data) = tx.read_entry::<keys::ns::ActorByKeyKey>(&entry)?;
if !data.is_destroyed {
return Ok(ReserveActorKeyOutput::ExistingActor {
existing_actor_id: _idx_key.actor_id,
existing_actor_id: idx_key.actor_id,
});
}
}
Expand Down
11 changes: 6 additions & 5 deletions engine/packages/pegboard/src/workflows/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use rivet_types::actors::CrashPolicy;

use crate::{errors, workflows::runner::AllocatePendingActorsInput};

mod actor_keys;
mod destroy;
mod keys;
mod runtime;
mod setup;

Expand Down Expand Up @@ -147,7 +147,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
.await?;

if let Some(key) = &input.key {
match actor_keys::reserve_key(
match keys::reserve_key(
ctx,
input.namespace_id,
input.name.clone(),
Expand All @@ -156,8 +156,8 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
)
.await?
{
actor_keys::ReserveKeyOutput::Success => {}
actor_keys::ReserveKeyOutput::ForwardToDatacenter { dc_label } => {
keys::ReserveKeyOutput::Success => {}
keys::ReserveKeyOutput::ForwardToDatacenter { dc_label } => {
ctx.msg(Failed {
error: errors::Actor::KeyReservedInDifferentDatacenter {
datacenter_label: dc_label,
Expand All @@ -181,7 +181,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>

return Ok(());
}
actor_keys::ReserveKeyOutput::KeyExists { existing_actor_id } => {
keys::ReserveKeyOutput::KeyExists { existing_actor_id } => {
ctx.msg(Failed {
error: errors::Actor::DuplicateKey {
key: key.clone(),
Expand Down Expand Up @@ -696,6 +696,7 @@ pub struct Lost {
/// Immediately reschedules the actor regardless of its crash policy.
pub force_reschedule: bool,
/// Resets the rescheduling retry count to 0.
#[serde(default)]
pub reset_rescheduling: bool,
}

Expand Down
2 changes: 0 additions & 2 deletions engine/packages/pegboard/src/workflows/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1024,8 +1024,6 @@ pub(crate) async fn allocate_pending_actors(
let mut stream = tx.get_ranges_keyvalues(
universaldb::RangeOption {
mode: StreamingMode::Iterator,
// Containers bin pack so we reverse the order
reverse: true,
..(&runner_alloc_subspace).into()
},
// NOTE: This is not Serializable because we don't want to conflict with all of the
Expand Down
Loading