Skip to content

Commit eef5ea6

Browse files
authored
fix: fix epoxy replica ids (#3223)
1 parent 2b53da9 commit eef5ea6

File tree

7 files changed

+28
-13
lines changed

7 files changed

+28
-13
lines changed

out/errors/guard.service_unavailable.json

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/core/api-peer/src/internal.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,9 +97,13 @@ pub async fn epoxy_replica_reconfigure(
9797
_query: (),
9898
_body: ReplicaReconfigureRequest,
9999
) -> Result<ReplicaReconfigureResponse> {
100-
ctx.signal(epoxy::workflows::coordinator::ReplicaReconfigure {})
101-
.send()
102-
.await?;
100+
if ctx.config().is_leader() {
101+
ctx.signal(epoxy::workflows::coordinator::ReplicaReconfigure {})
102+
.to_workflow::<epoxy::workflows::coordinator::Workflow>()
103+
.tag("replica", ctx.config().epoxy_replica_id())
104+
.send()
105+
.await?;
106+
}
103107

104108
Ok(ReplicaReconfigureResponse {})
105109
}

packages/core/guard/core/src/errors.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ pub struct ConnectionError {
6969
pub remote_addr: String,
7070
}
7171

72+
#[derive(RivetError, Serialize, Deserialize)]
73+
#[error("guard", "service_unavailable", "Service unavailable.")]
74+
pub struct ServiceUnavailable;
75+
7276
#[derive(RivetError, Serialize, Deserialize)]
7377
#[error(
7478
"guard",

packages/core/pegboard-gateway/src/lib.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,11 @@ use gas::prelude::*;
66
use http_body_util::{BodyExt, Full};
77
use hyper::{Request, Response, StatusCode, header::HeaderName};
88
use rivet_guard_core::{
9-
WebSocketHandle, custom_serve::CustomServeTrait, errors::WebSocketServiceUnavailable,
10-
proxy_service::ResponseBody, request_context::RequestContext,
9+
WebSocketHandle,
10+
custom_serve::CustomServeTrait,
11+
errors::{ServiceUnavailable, WebSocketServiceUnavailable},
12+
proxy_service::ResponseBody,
13+
request_context::RequestContext,
1114
};
1215
use rivet_runner_protocol as protocol;
1316
use rivet_util::serde::HashableMap;
@@ -122,20 +125,20 @@ impl CustomServeTrait for PegboardGateway {
122125
},
123126
TunnelMessageData::Timeout => {
124127
tracing::warn!("tunnel message timeout");
125-
return Err(WebSocketServiceUnavailable.build());
128+
return Err(ServiceUnavailable.build());
126129
}
127130
}
128131
}
129132

130133
tracing::warn!("received no message response");
131-
Err(WebSocketServiceUnavailable.build())
134+
Err(ServiceUnavailable.build())
132135
};
133136
let response_start = tokio::time::timeout(TUNNEL_ACK_TIMEOUT, fut)
134137
.await
135138
.map_err(|_| {
136139
tracing::warn!("timed out waiting for tunnel ack");
137140

138-
WebSocketServiceUnavailable.build()
141+
ServiceUnavailable.build()
139142
})??;
140143
tracing::debug!("response handler task ended");
141144

packages/services/epoxy/src/workflows/replica/setup.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use crate::types;
1616

1717
#[tracing::instrument(skip_all)]
1818
pub async fn setup_replica(ctx: &mut WorkflowCtx, _input: &super::Input) -> Result<()> {
19-
// Wait for cooridinator to send begin learning signal
19+
// Wait for coordinator to send begin learning signal
2020
let begin_learning = ctx.listen::<super::BeginLearning>().await?;
2121

2222
// TODO: Paralellize replicas

packages/services/pegboard/src/ops/runner/find_dc_with_runner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use futures_util::{FutureExt, StreamExt, TryFutureExt, stream::FuturesUnordered}
55
use gas::prelude::*;
66
use rivet_api_types::{runner_configs::list as runner_configs_list, runners::list as runners_list};
77
use rivet_api_util::{Method, request_remote_datacenter};
8-
use rivet_types::runner_configs::{RunnerConfig, RunnerConfigKind};
8+
use rivet_types::runner_configs::RunnerConfigKind;
99
use serde::de::DeserializeOwned;
1010

1111
#[derive(Debug, Clone, Serialize, Deserialize)]

packages/services/pegboard/src/workflows/actor/runtime.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,8 @@ use gas::prelude::*;
66
use rivet_metrics::KeyValue;
77
use rivet_runner_protocol as protocol;
88
use rivet_types::{
9-
actors::CrashPolicy,
10-
keys::namespace::runner_config::RunnerConfigVariant,
11-
runner_configs::{RunnerConfig, RunnerConfigKind},
9+
actors::CrashPolicy, keys::namespace::runner_config::RunnerConfigVariant,
10+
runner_configs::RunnerConfigKind,
1211
};
1312
use std::time::Instant;
1413
use universaldb::options::{ConflictRangeType, MutationType, StreamingMode};

0 commit comments

Comments
 (0)