Skip to content

Commit b04b0a9

Browse files
keivenchangziqifan617
authored andcommitted
fix: callback registration, fix metric name access, ensure ordered vec, etc... (#3541)
Signed-off-by: Keiven Chang <[email protected]> Co-authored-by: Keiven Chang <[email protected]>
1 parent 4b2e6a6 commit b04b0a9

File tree

9 files changed

+168
-110
lines changed

9 files changed

+168
-110
lines changed

lib/bindings/python/examples/metrics/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ Method 2 supports all standard Prometheus metric types:
216216
- **CounterVec**: `CounterVec` (float with labels), `IntCounterVec` (integer with labels)
217217
- **Histograms**: `Histogram`
218218

219-
All metrics are imported from `dynamo._prometheus_metrics`.
219+
All metrics are imported from `dynamo.prometheus_metrics`.
220220

221221
#### Adding/Changing Metrics in Method 2
222222

lib/bindings/python/examples/metrics/server_with_callback.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,12 @@
1010
2. Register them with an endpoint
1111
3. Update their values using type-safe methods (set for gauges, inc for counters)
1212
4. The metrics are automatically served via the /metrics endpoint
13+
14+
Usage:
15+
DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=8081 ./server_with_callback.py
16+
17+
# In another terminal, query the metrics:
18+
curl http://localhost:8081/metrics
1319
"""
1420

1521
import asyncio
@@ -18,8 +24,8 @@
1824

1925
# Note that these imports are for type hints only. They cannot be instantiated directly.
2026
# You can instantiate them using the endpoint.metrics.create_*() methods.
21-
from dynamo._prometheus_metrics import Gauge, IntCounter, IntGauge, IntGaugeVec
22-
from dynamo.runtime import DistributedRuntime, dynamo_worker
27+
from dynamo.prometheus_metrics import Gauge, IntCounter, IntGauge, IntGaugeVec
28+
from dynamo.runtime import Component, DistributedRuntime, Endpoint, dynamo_worker
2329

2430

2531
@dynamo_worker()
@@ -29,10 +35,10 @@ async def worker(runtime: DistributedRuntime) -> None:
2935

3036
async def init(runtime: DistributedRuntime):
3137
# Create component and endpoint
32-
component = runtime.namespace("ns556").component("cp556")
38+
component: Component = runtime.namespace("ns556").component("cp556")
3339
await component.create_service()
3440

35-
endpoint = component.endpoint("ep556")
41+
endpoint: Endpoint = component.endpoint("ep556")
3642

3743
# Step 1: Create metrics using the endpoint's metrics property
3844
print("[python] Creating metrics...")
@@ -59,11 +65,10 @@ async def init(runtime: DistributedRuntime):
5965
[("update_method", "callback")],
6066
)
6167

62-
print(f"[python] Created IntGauge: {request_total_slots.name}")
63-
print(f"[python] Created Gauge: {gpu_cache_usage_perc.name}")
64-
print(f"[python] Created IntGaugeVec: {worker_active_requests.name}")
65-
print(f"[python] Created IntCounter with constant labels: {update_count.name}")
66-
print(f"[python] Const labels: {update_count.const_labels}")
68+
print(f"[python] Created IntGauge: {request_total_slots.name()}")
69+
print(f"[python] Created Gauge: {gpu_cache_usage_perc.name()}")
70+
print(f"[python] Created IntGaugeVec: {worker_active_requests.name()}")
71+
print(f"[python] Created IntCounter: {update_count.name()}")
6772
print("[python] Metrics automatically registered with endpoint!")
6873

6974
# Step 2: Register a callback to update metrics on-demand

lib/bindings/python/examples/metrics/server_with_loop.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,12 @@
1010
2. A background thread continuously updates metrics in a loop
1111
3. No callback is used - metrics are updated directly by the thread
1212
4. The metrics are automatically served via the /metrics endpoint
13+
14+
Usage:
15+
DYN_SYSTEM_ENABLED=true DYN_SYSTEM_PORT=8081 ./server_with_loop.py
16+
17+
# In another terminal, query the metrics:
18+
curl http://localhost:8081/metrics
1319
"""
1420

1521
import asyncio
@@ -18,8 +24,8 @@
1824

1925
import uvloop
2026

21-
from dynamo._prometheus_metrics import Gauge, IntCounter, IntGauge, IntGaugeVec
22-
from dynamo.runtime import DistributedRuntime, dynamo_worker
27+
from dynamo.prometheus_metrics import Gauge, IntCounter, IntGauge, IntGaugeVec
28+
from dynamo.runtime import Component, DistributedRuntime, Endpoint, dynamo_worker
2329

2430

2531
def metrics_updater_thread(
@@ -60,10 +66,10 @@ async def worker(runtime: DistributedRuntime) -> None:
6066

6167
async def init(runtime: DistributedRuntime):
6268
# Create component and endpoint
63-
component = runtime.namespace("ns557").component("cp557")
69+
component: Component = runtime.namespace("ns557").component("cp557")
6470
await component.create_service()
6571

66-
endpoint = component.endpoint("ep557")
72+
endpoint: Endpoint = component.endpoint("ep557")
6773

6874
# Create metrics using the endpoint's metrics property
6975
print("[python] Creating metrics...")
@@ -87,10 +93,10 @@ async def init(runtime: DistributedRuntime):
8793
[("update_method", "background_thread")],
8894
)
8995

90-
print(f"[python] Created IntGauge: {request_total_slots.name}")
91-
print(f"[python] Created Gauge: {gpu_cache_usage_perc.name}")
92-
print(f"[python] Created IntGaugeVec: {worker_active_requests.name}")
93-
print(f"[python] Created IntCounter: {update_count.name}")
96+
print(f"[python] Created IntGauge: {request_total_slots.name()}")
97+
print(f"[python] Created Gauge: {gpu_cache_usage_perc.name()}")
98+
print(f"[python] Created IntGaugeVec: {worker_active_requests.name()}")
99+
print(f"[python] Created IntCounter: {update_count.name()}")
94100
print("[python] Metrics automatically registered with endpoint!")
95101

96102
# Set initial values

lib/bindings/python/rust/lib.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
185185
engine::add_to_module(m)?;
186186
parsers::add_to_module(m)?;
187187

188-
m.add_class::<prometheus_metrics::PyRuntimeMetrics>()?;
188+
m.add_class::<prometheus_metrics::RuntimeMetrics>()?;
189189
let prometheus_metrics = PyModule::new(m.py(), "prometheus_metrics")?;
190190
prometheus_metrics::add_to_module(&prometheus_metrics)?;
191191
m.add_submodule(&prometheus_metrics)?;
@@ -637,8 +637,8 @@ impl Component {
637637

638638
/// Get a RuntimeMetrics helper for creating Prometheus metrics
639639
#[getter]
640-
fn metrics(&self) -> prometheus_metrics::PyRuntimeMetrics {
641-
prometheus_metrics::PyRuntimeMetrics::from_component(self.inner.clone())
640+
fn metrics(&self) -> prometheus_metrics::RuntimeMetrics {
641+
prometheus_metrics::RuntimeMetrics::from_component(self.inner.clone())
642642
}
643643
}
644644

@@ -726,8 +726,8 @@ impl Endpoint {
726726

727727
/// Get a RuntimeMetrics helper for creating Prometheus metrics
728728
#[getter]
729-
fn metrics(&self) -> prometheus_metrics::PyRuntimeMetrics {
730-
prometheus_metrics::PyRuntimeMetrics::from_endpoint(self.inner.clone())
729+
fn metrics(&self) -> prometheus_metrics::RuntimeMetrics {
730+
prometheus_metrics::RuntimeMetrics::from_endpoint(self.inner.clone())
731731
}
732732
}
733733

@@ -743,8 +743,8 @@ impl Namespace {
743743

744744
/// Get a RuntimeMetrics helper for creating Prometheus metrics
745745
#[getter]
746-
fn metrics(&self) -> prometheus_metrics::PyRuntimeMetrics {
747-
prometheus_metrics::PyRuntimeMetrics::from_namespace(self.inner.clone())
746+
fn metrics(&self) -> prometheus_metrics::RuntimeMetrics {
747+
prometheus_metrics::RuntimeMetrics::from_namespace(self.inner.clone())
748748
}
749749
}
750750

lib/bindings/python/rust/prometheus_metrics.rs

Lines changed: 71 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,37 @@ use std::sync::Arc;
1818

1919
use crate::rs;
2020

21+
/// Helper function to order label values according to variable_labels declaration.
22+
/// This ensures labels are passed to with_label_values() in the correct order.
23+
///
24+
/// # Arguments
25+
/// * `variable_labels` - The ordered list of label names as declared in the metric
26+
/// * `labels` - The HashMap of label name-value pairs from Python
27+
///
28+
/// # Returns
29+
/// * `Ok(Vec<&str>)` - Ordered vector of label values matching variable_labels order
30+
/// * `Err(PyErr)` - If a required label is missing
31+
fn collect_ordered_label_values<'a>(
32+
variable_labels: &[String],
33+
labels: &'a HashMap<String, String>,
34+
) -> PyResult<Vec<&'a str>> {
35+
let mut ordered_values = Vec::with_capacity(variable_labels.len());
36+
for label_name in variable_labels {
37+
match labels.get(label_name) {
38+
Some(value) => ordered_values.push(value.as_str()),
39+
None => {
40+
return Err(pyo3::exceptions::PyValueError::new_err(format!(
41+
"Missing required label '{}'. Expected labels: {:?}, Got: {:?}",
42+
label_name,
43+
variable_labels,
44+
labels.keys().collect::<Vec<_>>()
45+
)));
46+
}
47+
}
48+
}
49+
Ok(ordered_values)
50+
}
51+
2152
// Python wrappers for Prometheus metric types.
2253
//
2354
// These wrapper structs are necessary because Prometheus types from the external `prometheus` crate
@@ -205,21 +236,24 @@ impl CounterVec {
205236

206237
/// Increment counter by 1 with labels
207238
fn inc(&self, labels: HashMap<String, String>) -> PyResult<()> {
208-
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
239+
let desc = self.counter.desc();
240+
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
209241
self.counter.with_label_values(&label_values).inc();
210242
Ok(())
211243
}
212244

213245
/// Increment counter by value with labels
214246
fn inc_by(&self, labels: HashMap<String, String>, value: f64) -> PyResult<()> {
215-
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
247+
let desc = self.counter.desc();
248+
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
216249
self.counter.with_label_values(&label_values).inc_by(value);
217250
Ok(())
218251
}
219252

220253
/// Get counter value with labels
221254
fn get(&self, labels: HashMap<String, String>) -> PyResult<f64> {
222-
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
255+
let desc = self.counter.desc();
256+
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
223257
Ok(self.counter.with_label_values(&label_values).get())
224258
}
225259
}
@@ -257,21 +291,24 @@ impl IntCounterVec {
257291

258292
/// Increment counter by 1 with labels
259293
fn inc(&self, labels: HashMap<String, String>) -> PyResult<()> {
260-
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
294+
let desc = self.counter.desc();
295+
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
261296
self.counter.with_label_values(&label_values).inc();
262297
Ok(())
263298
}
264299

265300
/// Increment counter by value with labels
266301
fn inc_by(&self, labels: HashMap<String, String>, value: u64) -> PyResult<()> {
267-
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
302+
let desc = self.counter.desc();
303+
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
268304
self.counter.with_label_values(&label_values).inc_by(value);
269305
Ok(())
270306
}
271307

272308
/// Get counter value with labels
273309
fn get(&self, labels: HashMap<String, String>) -> PyResult<u64> {
274-
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
310+
let desc = self.counter.desc();
311+
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
275312
Ok(self.counter.with_label_values(&label_values).get())
276313
}
277314
}
@@ -443,41 +480,47 @@ impl GaugeVec {
443480

444481
/// Set gauge value with labels
445482
fn set(&self, value: f64, labels: HashMap<String, String>) -> PyResult<()> {
446-
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
483+
let desc = self.gauge.desc();
484+
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
447485
self.gauge.with_label_values(&label_values).set(value);
448486
Ok(())
449487
}
450488

451489
/// Get gauge value with labels
452490
fn get(&self, labels: HashMap<String, String>) -> PyResult<f64> {
453-
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
491+
let desc = self.gauge.desc();
492+
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
454493
Ok(self.gauge.with_label_values(&label_values).get())
455494
}
456495

457496
/// Increment gauge by 1 with labels
458497
fn inc(&self, labels: HashMap<String, String>) -> PyResult<()> {
459-
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
498+
let desc = self.gauge.desc();
499+
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
460500
self.gauge.with_label_values(&label_values).inc();
461501
Ok(())
462502
}
463503

464504
/// Decrement gauge by 1 with labels
465505
fn dec(&self, labels: HashMap<String, String>) -> PyResult<()> {
466-
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
506+
let desc = self.gauge.desc();
507+
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
467508
self.gauge.with_label_values(&label_values).dec();
468509
Ok(())
469510
}
470511

471512
/// Add value to gauge with labels
472513
fn add(&self, labels: HashMap<String, String>, value: f64) -> PyResult<()> {
473-
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
514+
let desc = self.gauge.desc();
515+
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
474516
self.gauge.with_label_values(&label_values).add(value);
475517
Ok(())
476518
}
477519

478520
/// Subtract value from gauge with labels
479521
fn sub(&self, labels: HashMap<String, String>, value: f64) -> PyResult<()> {
480-
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
522+
let desc = self.gauge.desc();
523+
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
481524
self.gauge.with_label_values(&label_values).sub(value);
482525
Ok(())
483526
}
@@ -516,41 +559,47 @@ impl IntGaugeVec {
516559

517560
/// Set gauge value with labels
518561
fn set(&self, value: i64, labels: HashMap<String, String>) -> PyResult<()> {
519-
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
562+
let desc = self.gauge.desc();
563+
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
520564
self.gauge.with_label_values(&label_values).set(value);
521565
Ok(())
522566
}
523567

524568
/// Get gauge value with labels
525569
fn get(&self, labels: HashMap<String, String>) -> PyResult<i64> {
526-
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
570+
let desc = self.gauge.desc();
571+
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
527572
Ok(self.gauge.with_label_values(&label_values).get())
528573
}
529574

530575
/// Increment gauge by 1 with labels
531576
fn inc(&self, labels: HashMap<String, String>) -> PyResult<()> {
532-
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
577+
let desc = self.gauge.desc();
578+
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
533579
self.gauge.with_label_values(&label_values).inc();
534580
Ok(())
535581
}
536582

537583
/// Decrement gauge by 1 with labels
538584
fn dec(&self, labels: HashMap<String, String>) -> PyResult<()> {
539-
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
585+
let desc = self.gauge.desc();
586+
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
540587
self.gauge.with_label_values(&label_values).dec();
541588
Ok(())
542589
}
543590

544591
/// Add value to gauge with labels
545592
fn add(&self, labels: HashMap<String, String>, value: i64) -> PyResult<()> {
546-
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
593+
let desc = self.gauge.desc();
594+
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
547595
self.gauge.with_label_values(&label_values).add(value);
548596
Ok(())
549597
}
550598

551599
/// Subtract value from gauge with labels
552600
fn sub(&self, labels: HashMap<String, String>, value: i64) -> PyResult<()> {
553-
let label_values: Vec<&str> = labels.values().map(|s| s.as_str()).collect();
601+
let desc = self.gauge.desc();
602+
let label_values = collect_ordered_label_values(&desc[0].variable_labels, &labels)?;
554603
self.gauge.with_label_values(&label_values).sub(value);
555604
Ok(())
556605
}
@@ -598,15 +647,15 @@ impl Histogram {
598647
/// and utilities for registering metrics callbacks.
599648
/// Exposed as endpoint.metrics, component.metrics, and namespace.metrics in Python.
600649
///
601-
/// NOTE: The create_* methods in PyRuntimeMetrics must stay in sync with the MetricsRegistry trait
650+
/// NOTE: The create_* methods in RuntimeMetrics must stay in sync with the MetricsRegistry trait
602651
/// in lib/runtime/src/metrics.rs. When adding new metric types, update both locations.
603652
#[pyclass]
604653
#[derive(Clone)]
605-
pub struct PyRuntimeMetrics {
654+
pub struct RuntimeMetrics {
606655
metricsregistry: Arc<dyn rs::metrics::MetricsRegistry>,
607656
}
608657

609-
impl PyRuntimeMetrics {
658+
impl RuntimeMetrics {
610659
/// Create from Endpoint
611660
pub fn from_endpoint(endpoint: dynamo_runtime::component::Endpoint) -> Self {
612661
Self {
@@ -668,7 +717,7 @@ impl PyRuntimeMetrics {
668717
}
669718

670719
#[pymethods]
671-
impl PyRuntimeMetrics {
720+
impl RuntimeMetrics {
672721
/// Register a Python callback to be invoked before metrics are scraped
673722
/// This callback will be called for this endpoint's metrics hierarchy
674723
fn register_update_callback(&self, callback: PyObject, _py: Python) -> PyResult<()> {

0 commit comments

Comments
 (0)