Skip to content

Commit 9cd808a

Browse files
committed
fix: add metrics to ops and guard (#2429)
<!-- Please make sure there is an issue that this PR is correlated to. --> ## Changes <!-- If there are frontend changes, please include screenshots. -->
1 parent ad8ea6b commit 9cd808a

File tree

20 files changed

+829
-647
lines changed

20 files changed

+829
-647
lines changed

packages/common/chirp-workflow/core/src/ctx/common.rs

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::time::Duration;
1+
use std::time::{Duration, Instant};
22

33
use futures_util::StreamExt;
44
use global_error::{GlobalError, GlobalResult};
@@ -81,6 +81,16 @@ where
8181
{
8282
tracing::debug!(?input, "operation call");
8383

84+
// Record metrics
85+
crate::metrics::OPERATION_PENDING
86+
.with_label_values(&[I::Operation::NAME])
87+
.inc();
88+
crate::metrics::OPERATION_TOTAL
89+
.with_label_values(&[I::Operation::NAME])
90+
.inc();
91+
92+
let start_instant = Instant::now();
93+
8494
let ctx = OperationCtx::new(
8595
db.clone(),
8696
config,
@@ -95,9 +105,45 @@ where
95105

96106
let res = tokio::time::timeout(I::Operation::TIMEOUT, I::Operation::run(&ctx, &input))
97107
.await
98-
.map_err(|_| WorkflowError::OperationTimeout(0))?
99-
.map_err(WorkflowError::OperationFailure)
100-
.map_err(GlobalError::raw);
108+
.map_err(|_| WorkflowError::OperationTimeout(0))
109+
.map(|res| {
110+
res.map_err(WorkflowError::OperationFailure)
111+
.map_err(GlobalError::raw)
112+
});
113+
114+
// Record metrics
115+
{
116+
let error_code_str = match &res {
117+
Ok(Err(GlobalError::Internal { ty, .. })) => {
118+
let err_code_str = "__UNKNOWN__".to_string();
119+
crate::metrics::OPERATION_ERRORS
120+
.with_label_values(&[I::Operation::NAME, &err_code_str, ty])
121+
.inc();
122+
123+
err_code_str
124+
}
125+
Ok(Err(GlobalError::BadRequest { code, .. })) => {
126+
crate::metrics::OPERATION_ERRORS
127+
.with_label_values(&[I::Operation::NAME, code, "bad_request"])
128+
.inc();
129+
130+
code.clone()
131+
}
132+
Ok(_) => String::new(),
133+
Err(_) => "timeout".to_string(),
134+
};
135+
136+
// Other request metrics
137+
let dt = start_instant.elapsed().as_secs_f64();
138+
crate::metrics::OPERATION_PENDING
139+
.with_label_values(&[I::Operation::NAME])
140+
.dec();
141+
crate::metrics::OPERATION_DURATION
142+
.with_label_values(&[I::Operation::NAME, error_code_str.as_str()])
143+
.observe(dt);
144+
}
145+
146+
let res = res?;
101147

102148
tracing::debug!(?res, "operation response");
103149

packages/common/chirp-workflow/core/src/ctx/listen.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,16 @@ impl<'a> ListenCtx<'a> {
8383
.with_label_values(&[self.ctx.name(), &signal.signal_name])
8484
.observe(recv_lag);
8585

86+
if recv_lag > 15.0 {
87+
// We print an error here so the trace of this workflow does not get dropped
88+
tracing::error!(
89+
?recv_lag,
90+
signal_id=%signal.signal_id,
91+
signal_name=%signal.signal_name,
92+
"long signal recv time",
93+
);
94+
}
95+
8696
tracing::debug!(
8797
signal_id=%signal.signal_id,
8898
signal_name=%signal.signal_name,

packages/common/chirp-workflow/core/src/db/fdb_sqlite_nats/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ impl Database for DatabaseFdbSqliteNats {
364364
}
365365

366366
#[tracing::instrument(skip_all)]
367-
async fn publish_metrics(&self, worker_instance_id: Uuid) -> WorkflowResult<()> {
367+
async fn publish_metrics(&self, _worker_instance_id: Uuid) -> WorkflowResult<()> {
368368
// Attempt to be the only worker publishing metrics by writing to the lock key
369369
let acquired_lock = self
370370
.pools

packages/common/chirp-workflow/core/src/metrics.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,4 +184,31 @@ lazy_static::lazy_static! {
184184
BUCKETS.to_vec(),
185185
*REGISTRY,
186186
).unwrap();
187+
188+
// MARK: Ops
189+
pub static ref OPERATION_PENDING: IntGaugeVec = register_int_gauge_vec_with_registry!(
190+
"chirp_workflow_operation_pending",
191+
"Total number of operation calls in progress.",
192+
&["operation_name"],
193+
*REGISTRY,
194+
).unwrap();
195+
pub static ref OPERATION_TOTAL: IntCounterVec = register_int_counter_vec_with_registry!(
196+
"chirp_workflow_operation_total",
197+
"Total number of operation calls.",
198+
&["operation_name"],
199+
*REGISTRY,
200+
).unwrap();
201+
pub static ref OPERATION_DURATION: HistogramVec = register_histogram_vec_with_registry!(
202+
"chirp_workflow_operation_duration",
203+
"Total duration of an op call.",
204+
&["operation_name", "error_code"],
205+
BUCKETS.to_vec(),
206+
*REGISTRY,
207+
).unwrap();
208+
pub static ref OPERATION_ERRORS: IntCounterVec = register_int_counter_vec_with_registry!(
209+
"chirp_workflow_operation_errors",
210+
"All errors made by this operation.",
211+
&["operation_name", "error_code", "error_type"],
212+
*REGISTRY,
213+
).unwrap();
187214
}

packages/core/api/status/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ game-resolve-name-id.workspace = true
4141
game-namespace-resolve-name-id.workspace = true
4242

4343
token-create.workspace = true
44-
trust-dns-resolver = "0.23.2"
44+
hickory-resolver = "0.25.1"
4545
rivet-config.workspace = true
4646
rivet-env.workspace = true
4747

packages/core/api/status/src/route/actor.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ pub async fn status(
240240
Some(&system_test_env),
241241
None,
242242
)
243+
.instrument(tracing::info_span!("actor destroy request"))
243244
.await?;
244245

245246
// Unwrap res
@@ -259,6 +260,7 @@ pub async fn status(
259260
Ok(serde_json::json!({}))
260261
}
261262

263+
#[tracing::instrument]
262264
async fn test_actor_connection(hostname: &str, port: u16, actor_origin: &str) -> GlobalResult<()> {
263265
// Look up IP for the actor's host
264266
let gg_ips = lookup_dns(hostname).await?;
@@ -312,8 +314,9 @@ async fn test_actor_connection(hostname: &str, port: u16, actor_origin: &str) ->
312314
}
313315

314316
/// Returns the IP addresses for a given hostname.
317+
#[tracing::instrument]
315318
async fn lookup_dns(hostname: &str) -> GlobalResult<Vec<IpAddr>> {
316-
let resolver = trust_dns_resolver::TokioAsyncResolver::tokio_from_system_conf()?;
319+
let resolver = hickory_resolver::Resolver::builder_tokio()?.build();
317320
let addrs = resolver
318321
.lookup_ip(hostname)
319322
.await?
@@ -326,6 +329,7 @@ async fn lookup_dns(hostname: &str) -> GlobalResult<Vec<IpAddr>> {
326329
/// Tests HTTP connectivity to a hostname for a given address.
327330
///
328331
/// This lets us isolate of a specific GG IP address is not behaving correctly.
332+
#[tracing::instrument]
329333
async fn test_http(
330334
actor_origin: String,
331335
hostname: String,
@@ -340,13 +344,15 @@ async fn test_http(
340344
client
341345
.get(format!("{actor_origin}/health"))
342346
.send()
347+
.instrument(tracing::info_span!("health request"))
343348
.await?
344349
.error_for_status()?;
345350

346351
Ok(())
347352
}
348353

349354
/// Tests WebSocket connectivity to a hostname.
355+
#[tracing::instrument]
350356
async fn test_ws(actor_origin: &str) -> GlobalResult<()> {
351357
let actor_origin = actor_origin
352358
.replace("http://", "ws://")

packages/core/api/status/src/route/matchmaker.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ async fn test_lobby_connection(
246246

247247
/// Returns the IP addresses for a given hostname.
248248
async fn lookup_dns(hostname: &str) -> GlobalResult<Vec<IpAddr>> {
249-
let resolver = trust_dns_resolver::TokioAsyncResolver::tokio_from_system_conf()?;
249+
let resolver = hickory_resolver::Resolver::builder_tokio()?.build();
250250
let addrs = resolver
251251
.lookup_ip(hostname)
252252
.await?

packages/core/services/cluster/src/workflows/server/install/install_scripts/files/otel_collector.sh

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,20 @@ processors:
3939
status_code:
4040
status_codes:
4141
- ERROR
42-
42+
# Guard-only policy
43+
- name: policy-2
44+
type: and
45+
and:
46+
and_sub_policy:
47+
- name: latency-policy-1
48+
type: latency
49+
latency:
50+
threshold_ms: 15000
51+
- name: span-name-policy-1
52+
type: ottl_condition
53+
ottl_condition:
54+
span:
55+
- 'name == "routing_fn"'
4356
exporters:
4457
otlp:
4558
endpoint: 127.0.0.1:__TUNNEL_OTEL_PORT__

packages/core/services/cluster/src/workflows/server/install/install_scripts/files/rivet_guard_configure.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ ConditionPathExists=/etc/rivet-server/
1717
[Service]
1818
Environment="RIVET_OTEL_ENABLED=1"
1919
Environment="RIVET_OTEL_SAMPLER_RATIO=1"
20-
Environment="RIVET_SERVICE_NAME=rivet-guard"
20+
Environment="RIVET_SERVICE_NAME=guard"
2121
Environment="RIVET_CLUSTER_ID=___CLUSTER_ID___"
2222
Environment="RIVET_DATACENTER_ID=___DATACENTER_ID___"
2323
Environment="RIVET_SERVER_ID=___SERVER_ID___"

packages/core/services/cluster/src/workflows/server/install/install_scripts/files/rivet_worker_configure.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ ConditionPathExists=/etc/rivet-server/
1818
# OTeL env vars
1919
Environment="RIVET_OTEL_ENABLED=1"
2020
Environment="RIVET_OTEL_SAMPLER_RATIO=1"
21-
Environment="RIVET_SERVICE_NAME=rivet-edge"
21+
Environment="RIVET_SERVICE_NAME=edge"
2222
Environment="RIVET_CLUSTER_ID=___CLUSTER_ID___"
2323
Environment="RIVET_DATACENTER_ID=___DATACENTER_ID___"
2424
Environment="RIVET_SERVER_ID=___SERVER_ID___"

0 commit comments

Comments
 (0)