From eb700e1bcb5c870fafca6df9c5c6b1084bfb392c Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Mon, 8 Dec 2025 14:14:23 -0800 Subject: [PATCH] fix(engine): fix routing request to correct dc for get or create --- .../api-public/src/actors/get_or_create.rs | 95 +++++++++++-------- .../packages/pegboard/src/ops/actor/create.rs | 8 ++ engine/packages/pegboard/src/ops/actor/get.rs | 11 +++ .../pegboard/src/ops/actor/get_for_gateway.rs | 9 ++ .../pegboard/src/ops/actor/get_runner.rs | 11 +++ 5 files changed, 95 insertions(+), 39 deletions(-) diff --git a/engine/packages/api-public/src/actors/get_or_create.rs b/engine/packages/api-public/src/actors/get_or_create.rs index 1c560536a4..039260f9f7 100644 --- a/engine/packages/api-public/src/actors/get_or_create.rs +++ b/engine/packages/api-public/src/actors/get_or_create.rs @@ -12,14 +12,14 @@ use utoipa::{IntoParams, ToSchema}; use crate::actors::utils; use crate::ctx::ApiCtx; -#[derive(Debug, Deserialize, IntoParams)] +#[derive(Debug, Serialize, Deserialize, IntoParams)] #[serde(deny_unknown_fields)] #[into_params(parameter_in = Query)] pub struct GetOrCreateQuery { pub namespace: String, } -#[derive(Deserialize, ToSchema)] +#[derive(Serialize, Deserialize, ToSchema)] #[serde(deny_unknown_fields)] #[schema(as = ActorsGetOrCreateRequest)] pub struct GetOrCreateRequest { @@ -31,7 +31,7 @@ pub struct GetOrCreateRequest { pub crash_policy: CrashPolicy, } -#[derive(Serialize, ToSchema)] +#[derive(Serialize, Deserialize, ToSchema)] #[schema(as = ActorsGetOrCreateResponse)] pub struct GetOrCreateResponse { pub actor: rivet_types::actors::Actor, @@ -127,44 +127,61 @@ async fn get_or_create_inner( ) .await?; - let actor_id = Id::new_v1(target_dc_label); + // Forward to correct datacenter or create locally + if target_dc_label == ctx.config().dc_label() { + // Create in current datacenter + let actor_id = Id::new_v1(ctx.config().dc_label()); - match ctx - .op(pegboard::ops::actor::create::Input { - actor_id, - namespace_id: namespace.namespace_id, - name: body.name.clone(), - key: Some(body.key.clone()), - runner_name_selector: body.runner_name_selector, - input: body.input.clone(), - crash_policy: body.crash_policy, - forward_request: true, - datacenter_name: body.datacenter.clone(), - }) - .await - { - Ok(res) => Ok(GetOrCreateResponse { - actor: res.actor, - created: true, - }), - Err(err) => { - // Check if this is a DuplicateKey error and extract the existing actor ID - if let Some(existing_actor_id) = utils::extract_duplicate_key_error(&err) { - tracing::info!( - ?existing_actor_id, - "received duplicate key error, returning existing actor id" - ); - let actor = - utils::fetch_actor_by_id(&ctx, existing_actor_id, query.namespace.clone()) - .await?; - return Ok(GetOrCreateResponse { - actor, - created: false, - }); - } + match ctx + .op(pegboard::ops::actor::create::Input { + actor_id, + namespace_id: namespace.namespace_id, + name: body.name.clone(), + key: Some(body.key.clone()), + runner_name_selector: body.runner_name_selector, + input: body.input.clone(), + crash_policy: body.crash_policy, + forward_request: false, + datacenter_name: body.datacenter.clone(), + }) + .await + { + Ok(res) => Ok(GetOrCreateResponse { + actor: res.actor, + created: true, + }), + Err(err) => { + // Check if this is a DuplicateKey error and extract the existing actor ID + if let Some(existing_actor_id) = utils::extract_duplicate_key_error(&err) { + tracing::info!( + ?existing_actor_id, + "received duplicate key error, returning existing actor id" + ); + let actor = + utils::fetch_actor_by_id(&ctx, existing_actor_id, query.namespace.clone()) + .await?; + return Ok(GetOrCreateResponse { + actor, + created: false, + }); + } - // Re-throw the original error if it's not a DuplicateKey - Err(err) + // Re-throw the original error if it's not a DuplicateKey + Err(err) + } } + } else { + // Forward to remote datacenter + rivet_api_util::request_remote_datacenter::( + ctx.config(), + target_dc_label, + "/actors", + axum::http::Method::PUT, + Some(&GetOrCreateQuery { + namespace: query.namespace, + }), + Some(&body), + ) + .await } } diff --git a/engine/packages/pegboard/src/ops/actor/create.rs b/engine/packages/pegboard/src/ops/actor/create.rs index 1b6e5e02d0..c19e75ba4e 100644 --- a/engine/packages/pegboard/src/ops/actor/create.rs +++ b/engine/packages/pegboard/src/ops/actor/create.rs @@ -29,6 +29,14 @@ pub struct Output { #[operation] pub async fn pegboard_actor_create(ctx: &OperationCtx, input: &Input) -> Result { + // Ensure actor ID belongs to this datacenter + ensure!( + input.actor_id.label() == ctx.config().dc_label(), + "actor id belongs to datacenter {} but current datacenter is {}", + input.actor_id.label(), + ctx.config().dc_label() + ); + // Set up subscriptions before dispatching workflow let mut create_sub = ctx .subscribe::(("actor_id", input.actor_id)) diff --git a/engine/packages/pegboard/src/ops/actor/get.rs b/engine/packages/pegboard/src/ops/actor/get.rs index fbf0977d5d..d62ec36196 100644 --- a/engine/packages/pegboard/src/ops/actor/get.rs +++ b/engine/packages/pegboard/src/ops/actor/get.rs @@ -17,6 +17,17 @@ pub struct Output { #[operation] pub async fn pegboard_actor_get(ctx: &OperationCtx, input: &Input) -> Result { + // Ensure all actor IDs belong to this datacenter + for actor_id in &input.actor_ids { + ensure!( + actor_id.label() == ctx.config().dc_label(), + "actor id {} belongs to datacenter {} but current datacenter is {}", + actor_id, + actor_id.label(), + ctx.config().dc_label() + ); + } + let actors_with_wf_ids = ctx .udb()? .run(|tx| async move { diff --git a/engine/packages/pegboard/src/ops/actor/get_for_gateway.rs b/engine/packages/pegboard/src/ops/actor/get_for_gateway.rs index b63acbe817..296375df57 100644 --- a/engine/packages/pegboard/src/ops/actor/get_for_gateway.rs +++ b/engine/packages/pegboard/src/ops/actor/get_for_gateway.rs @@ -24,6 +24,15 @@ pub async fn pegboard_actor_get_for_gateway( ctx: &OperationCtx, input: &Input, ) -> Result> { + // Ensure actor ID belongs to this datacenter + ensure!( + input.actor_id.label() == ctx.config().dc_label(), + "actor id {} belongs to datacenter {} but current datacenter is {}", + input.actor_id, + input.actor_id.label(), + ctx.config().dc_label() + ); + ctx.udb()? .run(|tx| async move { let tx = tx.with_subspace(keys::subspace()); diff --git a/engine/packages/pegboard/src/ops/actor/get_runner.rs b/engine/packages/pegboard/src/ops/actor/get_runner.rs index 3503b88ec3..fdda024c62 100644 --- a/engine/packages/pegboard/src/ops/actor/get_runner.rs +++ b/engine/packages/pegboard/src/ops/actor/get_runner.rs @@ -24,6 +24,17 @@ pub struct Actor { // TODO: Add cache (remember to purge cache when runner changes) #[operation] pub async fn pegboard_actor_get_runner(ctx: &OperationCtx, input: &Input) -> Result { + // Ensure all actor IDs belong to this datacenter + for actor_id in &input.actor_ids { + ensure!( + actor_id.label() == ctx.config().dc_label(), + "actor id {} belongs to datacenter {} but current datacenter is {}", + actor_id, + actor_id.label(), + ctx.config().dc_label() + ); + } + let actors = ctx .udb()? .run(|tx| async move {