Skip to content

Commit 1633914

Browse files
committed
chore: misc bug fixes, add logs for outbound req (#3332)
1 parent 8e33967 commit 1633914

File tree

6 files changed

+20
-13
lines changed

6 files changed

+20
-13
lines changed

engine/packages/gasoline/src/ctx/operation.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ pub struct OperationCtx {
2929
pools: rivet_pools::Pools,
3030
cache: rivet_cache::Cache,
3131
msg_ctx: MessageCtx,
32-
from_workflow: bool,
32+
pub(crate) from_workflow: bool,
3333
}
3434

3535
impl OperationCtx {

engine/packages/pegboard-gateway/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,6 @@ impl CustomServeTrait for PegboardGateway {
364364
) => {
365365
tracing::debug!(?close, "server closed websocket");
366366

367-
368367
if open_msg.can_hibernate && close.retry {
369368
// Successful closure
370369
return Err(WebSocketServiceRetry.build());

engine/packages/pegboard-serverless/src/lib.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,16 @@ async fn outbound_handler(
387387

388388
return Ok(());
389389
}
390+
Err(sse::Error::InvalidStatusCode(code, res)) => {
391+
let body = res
392+
.text()
393+
.await
394+
.unwrap_or_else(|_| "<could not read body>".to_string());
395+
bail!(
396+
"invalid status code ({code}):\n{}",
397+
util::safe_slice(&body, 0, 512)
398+
);
399+
}
390400
Err(err) => return Err(err.into()),
391401
}
392402
}

engine/packages/pegboard/src/workflows/actor/actor_keys.rs renamed to engine/packages/pegboard/src/workflows/actor/keys.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -241,21 +241,20 @@ pub async fn reserve_actor_key(
241241
input.name.clone(),
242242
input.key.clone(),
243243
));
244-
let (start, end) = actor_key_subspace.range();
245244

246245
let mut stream = tx.get_ranges_keyvalues(
247246
universaldb::RangeOption {
248247
mode: StreamingMode::Iterator,
249-
..(start, end).into()
248+
..(&actor_key_subspace).into()
250249
},
251250
Serializable,
252251
);
253252

254253
while let Some(entry) = stream.try_next().await? {
255-
let (_idx_key, data) = tx.read_entry::<keys::ns::ActorByKeyKey>(&entry)?;
254+
let (idx_key, data) = tx.read_entry::<keys::ns::ActorByKeyKey>(&entry)?;
256255
if !data.is_destroyed {
257256
return Ok(ReserveActorKeyOutput::ExistingActor {
258-
existing_actor_id: _idx_key.actor_id,
257+
existing_actor_id: idx_key.actor_id,
259258
});
260259
}
261260
}

engine/packages/pegboard/src/workflows/actor/mod.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ use rivet_types::actors::CrashPolicy;
55

66
use crate::{errors, workflows::runner::AllocatePendingActorsInput};
77

8-
mod actor_keys;
98
mod destroy;
9+
mod keys;
1010
mod runtime;
1111
mod setup;
1212

@@ -147,7 +147,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
147147
.await?;
148148

149149
if let Some(key) = &input.key {
150-
match actor_keys::reserve_key(
150+
match keys::reserve_key(
151151
ctx,
152152
input.namespace_id,
153153
input.name.clone(),
@@ -156,8 +156,8 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
156156
)
157157
.await?
158158
{
159-
actor_keys::ReserveKeyOutput::Success => {}
160-
actor_keys::ReserveKeyOutput::ForwardToDatacenter { dc_label } => {
159+
keys::ReserveKeyOutput::Success => {}
160+
keys::ReserveKeyOutput::ForwardToDatacenter { dc_label } => {
161161
ctx.msg(Failed {
162162
error: errors::Actor::KeyReservedInDifferentDatacenter {
163163
datacenter_label: dc_label,
@@ -181,7 +181,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
181181

182182
return Ok(());
183183
}
184-
actor_keys::ReserveKeyOutput::KeyExists { existing_actor_id } => {
184+
keys::ReserveKeyOutput::KeyExists { existing_actor_id } => {
185185
ctx.msg(Failed {
186186
error: errors::Actor::DuplicateKey {
187187
key: key.clone(),
@@ -696,6 +696,7 @@ pub struct Lost {
696696
/// Immediately reschedules the actor regardless of its crash policy.
697697
pub force_reschedule: bool,
698698
/// Resets the rescheduling retry count to 0.
699+
#[serde(default)]
699700
pub reset_rescheduling: bool,
700701
}
701702

engine/packages/pegboard/src/workflows/runner.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1024,8 +1024,6 @@ pub(crate) async fn allocate_pending_actors(
10241024
let mut stream = tx.get_ranges_keyvalues(
10251025
universaldb::RangeOption {
10261026
mode: StreamingMode::Iterator,
1027-
// Containers bin pack so we reverse the order
1028-
reverse: true,
10291027
..(&runner_alloc_subspace).into()
10301028
},
10311029
// NOTE: This is not Serializable because we don't want to conflict with all of the

0 commit comments

Comments
 (0)