diff --git a/components/backends/sglang/prometheus.md b/components/backends/sglang/prometheus.md new file mode 100644 index 0000000000..30a1c38ba8 --- /dev/null +++ b/components/backends/sglang/prometheus.md @@ -0,0 +1,99 @@ +# SGLang Prometheus Metrics + +**📚 Official Documentation**: [SGLang Production Metrics](https://docs.sglang.ai/references/production_metrics.html) + +This document describes how SGLang Prometheus metrics are exposed in Dynamo. + +## Overview + +When running SGLang through Dynamo, SGLang engine metrics are automatically passed through and exposed on Dynamo's `/metrics` endpoint (default port 8081). This allows you to access both SGLang engine metrics (prefixed with `sglang:`) and Dynamo runtime metrics (prefixed with `dynamo_*`) from a single worker backend endpoint. + +For the complete and authoritative list of all SGLang metrics, always refer to the official documentation linked above. + +Dynamo runtime metrics are documented in [docs/guides/metrics.md](../../../docs/guides/metrics.md). + +## Metric Reference + +The official documentation includes: +- Complete metric definitions with HELP and TYPE descriptions +- Example metric output in Prometheus exposition format +- Counter, Gauge, and Histogram metrics +- Metric labels (e.g., `model_name`, `engine_type`, `tp_rank`, `pp_rank`) +- Setup guide for Prometheus + Grafana monitoring +- Troubleshooting tips and configuration examples + +## Metric Categories + +SGLang provides metrics in the following categories (all prefixed with `sglang:`): +- Throughput metrics +- Resource usage +- Latency metrics +- Disaggregation metrics (when enabled) + +**Note:** Specific metrics are subject to change between SGLang versions. Always refer to the [official documentation](https://docs.sglang.ai/references/production_metrics.html) or inspect the `/metrics` endpoint for your SGLang version. + +## Enabling Metrics in Dynamo + +SGLang metrics are automatically exposed when running SGLang through Dynamo with metrics enabled. + +## Inspecting Metrics + +To see the actual metrics available in your SGLang version: + +### 1. Launch SGLang with Metrics Enabled + +```bash +# Set environment variables +export DYN_SYSTEM_ENABLED=true +export DYN_SYSTEM_PORT=8081 + +# Start SGLang worker with metrics enabled +python -m dynamo.sglang --model --enable-metrics + +# Wait for engine to initialize +``` + +Metrics will be available at: `http://localhost:8081/metrics` + +### 2. Fetch Metrics via curl + +```bash +curl http://localhost:8081/metrics | grep "^sglang:" +``` + +### 3. Example Output + +**Note:** The specific metrics shown below are examples and may vary depending on your SGLang version. Always inspect your actual `/metrics` endpoint for the current list. + +``` +# HELP sglang:prompt_tokens_total Number of prefill tokens processed. +# TYPE sglang:prompt_tokens_total counter +sglang:prompt_tokens_total{model_name="meta-llama/Llama-3.1-8B-Instruct"} 8128902.0 +# HELP sglang:generation_tokens_total Number of generation tokens processed. +# TYPE sglang:generation_tokens_total counter +sglang:generation_tokens_total{model_name="meta-llama/Llama-3.1-8B-Instruct"} 7557572.0 +# HELP sglang:cache_hit_rate The cache hit rate +# TYPE sglang:cache_hit_rate gauge +sglang:cache_hit_rate{model_name="meta-llama/Llama-3.1-8B-Instruct"} 0.0075 +``` + +## Implementation Details + +- SGLang uses multiprocess metrics collection via `prometheus_client.multiprocess.MultiProcessCollector` +- Metrics are filtered by the `sglang:` prefix before being exposed +- The integration uses Dynamo's `register_engine_metrics_callback()` function +- Metrics appear after SGLang engine initialization completes + +## See Also + +### SGLang Metrics +- [Official SGLang Production Metrics](https://docs.sglang.ai/references/production_metrics.html) +- [SGLang GitHub - Metrics Collector](https://github.com/sgl-project/sglang/blob/main/python/sglang/srt/metrics/collector.py) + +### Dynamo Metrics +- **Dynamo Metrics Guide**: See `docs/guides/metrics.md` for complete documentation on Dynamo runtime metrics +- **Dynamo Runtime Metrics**: Metrics prefixed with `dynamo_*` for runtime, components, endpoints, and namespaces + - Implementation: `lib/runtime/src/metrics.rs` (Rust runtime metrics) + - Metric names: `lib/runtime/src/metrics/prometheus_names.rs` (metric name constants) + - Available at the same `/metrics` endpoint alongside SGLang metrics +- **Integration Code**: `components/src/dynamo/common/utils/prometheus.py` - Prometheus utilities and callback registration diff --git a/components/backends/vllm/prometheus.md b/components/backends/vllm/prometheus.md new file mode 100644 index 0000000000..fce3f5eb6d --- /dev/null +++ b/components/backends/vllm/prometheus.md @@ -0,0 +1,104 @@ +# vLLM Prometheus Metrics + +**📚 Official Documentation**: [vLLM Metrics Design](https://docs.vllm.ai/en/latest/design/metrics.html) + +This document describes how vLLM Prometheus metrics are exposed in Dynamo. + +## Overview + +When running vLLM through Dynamo, vLLM engine metrics are automatically passed through and exposed on Dynamo's `/metrics` endpoint (default port 8081). This allows you to access both vLLM engine metrics (prefixed with `vllm:`) and Dynamo runtime metrics (prefixed with `dynamo_*`) from a single worker backend endpoint. + +For the complete and authoritative list of all vLLM metrics, always refer to the official documentation linked above. + +Dynamo runtime metrics are documented in [docs/guides/metrics.md](../../../docs/guides/metrics.md). + +## Metric Reference + +The official documentation includes: +- Complete metric definitions with detailed explanations +- Counter, Gauge, and Histogram metrics +- Metric labels (e.g., `model_name`, `finished_reason`, `scheduling_event`) +- Design rationale and implementation details +- Information about v1 metrics migration +- Future work and deprecated metrics + +## Metric Categories + +vLLM provides metrics in the following categories (all prefixed with `vllm:`): +- Request metrics +- Performance metrics +- Resource usage +- Scheduler metrics +- Disaggregation metrics (when enabled) + +**Note:** Specific metrics are subject to change between vLLM versions. Always refer to the [official documentation](https://docs.vllm.ai/en/latest/design/metrics.html) or inspect the `/metrics` endpoint for your vLLM version. + +## Enabling Metrics in Dynamo + +vLLM metrics are automatically exposed when running vLLM through Dynamo with metrics enabled. + +## Inspecting Metrics + +To see the actual metrics available in your vLLM version: + +### 1. Launch vLLM with Metrics Enabled + +```bash +# Set environment variables +export DYN_SYSTEM_ENABLED=true +export DYN_SYSTEM_PORT=8081 + +# Start vLLM worker (metrics enabled by default via --disable-log-stats=false) +python -m dynamo.vllm --model + +# Wait for engine to initialize +``` + +Metrics will be available at: `http://localhost:8081/metrics` + +### 2. Fetch Metrics via curl + +```bash +curl http://localhost:8081/metrics | grep "^vllm:" +``` + +### 3. Example Output + +**Note:** The specific metrics shown below are examples and may vary depending on your vLLM version. Always inspect your actual `/metrics` endpoint for the current list. + +``` +# HELP vllm:request_success_total Number of successfully finished requests. +# TYPE vllm:request_success_total counter +vllm:request_success_total{finished_reason="length",model_name="meta-llama/Llama-3.1-8B"} 15.0 +vllm:request_success_total{finished_reason="stop",model_name="meta-llama/Llama-3.1-8B"} 150.0 +# HELP vllm:time_to_first_token_seconds Histogram of time to first token in seconds. +# TYPE vllm:time_to_first_token_seconds histogram +vllm:time_to_first_token_seconds_bucket{le="0.001",model_name="meta-llama/Llama-3.1-8B"} 0.0 +vllm:time_to_first_token_seconds_bucket{le="0.005",model_name="meta-llama/Llama-3.1-8B"} 5.0 +vllm:time_to_first_token_seconds_count{model_name="meta-llama/Llama-3.1-8B"} 165.0 +vllm:time_to_first_token_seconds_sum{model_name="meta-llama/Llama-3.1-8B"} 89.38 +``` + +## Implementation Details + +- vLLM v1 uses multiprocess metrics collection via `prometheus_client.multiprocess` +- `PROMETHEUS_MULTIPROC_DIR`: vLLM sets this environment variable to a temporary directory where multiprocess metrics are stored as memory-mapped files. Each worker process writes its metrics to separate files in this directory, which are aggregated when `/metrics` is scraped. +- Metrics are filtered by the `vllm:` prefix before being exposed +- The integration uses Dynamo's `register_engine_metrics_callback()` function +- Metrics appear after vLLM engine initialization completes +- vLLM v1 metrics are different from v0 - see the [official documentation](https://docs.vllm.ai/en/latest/design/metrics.html) for migration details + +## See Also + +### vLLM Metrics +- [Official vLLM Metrics Design Documentation](https://docs.vllm.ai/en/latest/design/metrics.html) +- [vLLM Production Metrics User Guide](https://docs.vllm.ai/en/latest/user/production_metrics.html) +- [vLLM GitHub - Metrics Implementation](https://github.com/vllm-project/vllm/tree/main/vllm/engine/metrics) + +### Dynamo Metrics +- **Dynamo Metrics Guide**: See `docs/guides/metrics.md` for complete documentation on Dynamo runtime metrics +- **Dynamo Runtime Metrics**: Metrics prefixed with `dynamo_*` for runtime, components, endpoints, and namespaces + - Implementation: `lib/runtime/src/metrics.rs` (Rust runtime metrics) + - Metric names: `lib/runtime/src/metrics/prometheus_names.rs` (metric name constants) + - Available at the same `/metrics` endpoint alongside vLLM metrics +- **Integration Code**: `components/src/dynamo/common/utils/prometheus.py` - Prometheus utilities and callback registration diff --git a/components/src/dynamo/common/__init__.py b/components/src/dynamo/common/__init__.py index 2424349413..4da60c20d6 100644 --- a/components/src/dynamo/common/__init__.py +++ b/components/src/dynamo/common/__init__.py @@ -9,9 +9,10 @@ Main submodules: - config_dump: Configuration dumping and system diagnostics utilities + - utils: Common utilities including environment and prometheus helpers """ -from dynamo.common import config_dump +from dynamo.common import config_dump, utils try: from ._version import __version__ @@ -23,4 +24,4 @@ except Exception: __version__ = "0.0.0+unknown" -__all__ = ["__version__", "config_dump"] +__all__ = ["__version__", "config_dump", "utils"] diff --git a/components/src/dynamo/common/utils/__init__.py b/components/src/dynamo/common/utils/__init__.py new file mode 100644 index 0000000000..e518d4678b --- /dev/null +++ b/components/src/dynamo/common/utils/__init__.py @@ -0,0 +1,16 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Dynamo Common Utils Module + +This module contains shared utility functions used across multiple +Dynamo backends and components. + +Submodules: + - prometheus: Prometheus metrics collection and logging utilities +""" + +from dynamo.common.utils import prometheus + +__all__ = ["prometheus"] diff --git a/components/src/dynamo/common/utils/prometheus.py b/components/src/dynamo/common/utils/prometheus.py new file mode 100644 index 0000000000..e6bcda0631 --- /dev/null +++ b/components/src/dynamo/common/utils/prometheus.py @@ -0,0 +1,129 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +""" +Prometheus metrics utilities for Dynamo components. + +This module provides shared functionality for collecting and exposing Prometheus metrics +from backend engines (SGLang, vLLM, etc.) via Dynamo's metrics endpoint. + +Note: Engine metrics take time to appear after engine initialization, +while Dynamo runtime metrics are available immediately after component creation. +""" + +import logging +import re +from typing import TYPE_CHECKING, Optional + +from prometheus_client import generate_latest + +from dynamo._core import Endpoint + +# Import CollectorRegistry only for type hints to avoid importing prometheus_client at module load time. +# prometheus_client must be imported AFTER set_prometheus_multiproc_dir() is called. +# See main.py worker() function for detailed explanation. +if TYPE_CHECKING: + from prometheus_client import CollectorRegistry + + +def register_engine_metrics_callback( + endpoint: Endpoint, + registry: "CollectorRegistry", + metric_prefix: str, + engine_name: str, +) -> None: + """ + Register a callback to expose engine Prometheus metrics via Dynamo's metrics endpoint. + + This registers a callback that is invoked when /metrics is scraped, passing through + engine-specific metrics alongside Dynamo runtime metrics. + + Args: + endpoint: Dynamo endpoint object with metrics.register_prometheus_expfmt_callback() + registry: Prometheus registry to collect from (e.g., REGISTRY or CollectorRegistry) + metric_prefix: Prefix to filter metrics (e.g., "vllm:" or "sglang:") + engine_name: Name of the engine for logging (e.g., "vLLM" or "SGLang") + + Example: + from prometheus_client import REGISTRY + register_engine_metrics_callback( + generate_endpoint, REGISTRY, "vllm:", "vLLM" + ) + """ + + def get_expfmt() -> str: + """Callback to return engine Prometheus metrics in exposition format""" + return get_prometheus_expfmt(registry, metric_prefix_filter=metric_prefix) + + endpoint.metrics.register_prometheus_expfmt_callback(get_expfmt) + + +def get_prometheus_expfmt( + registry, + metric_prefix_filter: Optional[str] = None, +) -> str: + """ + Get Prometheus metrics from a registry formatted as text using the standard text encoder. + + Collects all metrics from the registry and returns them in Prometheus text exposition format. + Optionally filters metrics by prefix. + + Prometheus exposition format consists of: + - Comment lines starting with # (HELP and TYPE declarations) + - Metric lines with format: metric_name{label="value"} metric_value timestamp + + Example output format: + # HELP vllm:request_success_total Number of successful requests + # TYPE vllm:request_success_total counter + vllm:request_success_total{model="llama2",endpoint="generate"} 150.0 + # HELP vllm:time_to_first_token_seconds Time to first token + # TYPE vllm:time_to_first_token_seconds histogram + vllm:time_to_first_token_seconds_bucket{model="llama2",le="0.01"} 10.0 + vllm:time_to_first_token_seconds_bucket{model="llama2",le="0.1"} 45.0 + vllm:time_to_first_token_seconds_count{model="llama2"} 50.0 + vllm:time_to_first_token_seconds_sum{model="llama2"} 2.5 + + Args: + registry: Prometheus registry to collect from. + Pass CollectorRegistry with MultiProcessCollector for SGLang. + Pass REGISTRY for vLLM single-process mode. + metric_prefix_filter: Optional prefix to filter displayed metrics (e.g., "vllm:"). + If None, returns all metrics. (default: None) + + Returns: + Formatted metrics text in Prometheus exposition format. Returns empty string on error. + + Example: + from prometheus_client import REGISTRY + metrics_text = get_prometheus_expfmt(REGISTRY) + print(metrics_text) + + # With filter + vllm_metrics = get_prometheus_expfmt(REGISTRY, metric_prefix_filter="vllm:") + """ + try: + # Generate metrics in Prometheus text format + metrics_text = generate_latest(registry).decode("utf-8") + + if metric_prefix_filter: + # Filter lines: keep metric lines starting with prefix and their HELP/TYPE comments + escaped_prefix = re.escape(metric_prefix_filter) + pattern = rf"^(?:{escaped_prefix}|# (?:HELP|TYPE) {escaped_prefix})" + filtered_lines = [ + line for line in metrics_text.split("\n") if re.match(pattern, line) + ] + result = "\n".join(filtered_lines) + if result: + # Ensure result ends with newline + if result and not result.endswith("\n"): + result += "\n" + return result + else: + # Ensure metrics_text ends with newline + if metrics_text and not metrics_text.endswith("\n"): + metrics_text += "\n" + return metrics_text + + except Exception as e: + logging.error(f"Error getting metrics: {e}") + return "" diff --git a/components/src/dynamo/sglang/publisher.py b/components/src/dynamo/sglang/publisher.py index 36985c90bc..fa07cfddca 100644 --- a/components/src/dynamo/sglang/publisher.py +++ b/components/src/dynamo/sglang/publisher.py @@ -9,8 +9,10 @@ import sglang as sgl import zmq import zmq.asyncio +from prometheus_client import CollectorRegistry, multiprocess from sglang.srt.utils import get_local_ip_auto, get_zmq_socket +from dynamo.common.utils.prometheus import register_engine_metrics_callback from dynamo.llm import ( ForwardPassMetrics, KvStats, @@ -217,6 +219,16 @@ async def setup_sgl_metrics( publisher.init_engine_metrics_publish() publisher.init_kv_event_publish() + # Register Prometheus metrics callback if enabled + if engine.server_args.enable_metrics: + # SGLang uses multiprocess architecture where metrics are stored in shared memory. + # MultiProcessCollector aggregates metrics from all worker processes. + registry = CollectorRegistry() + multiprocess.MultiProcessCollector(registry) + register_engine_metrics_callback( + generate_endpoint, registry, "sglang:", "SGLang" + ) + task = asyncio.create_task(publisher.run()) logging.info("SGLang metrics loop started") return publisher, task, metrics_labels diff --git a/components/src/dynamo/vllm/main.py b/components/src/dynamo/vllm/main.py index f81f1a310f..2d5961a753 100644 --- a/components/src/dynamo/vllm/main.py +++ b/components/src/dynamo/vllm/main.py @@ -11,8 +11,10 @@ from vllm.distributed.kv_events import ZmqEventPublisher from vllm.usage.usage_lib import UsageContext from vllm.v1.engine.async_llm import AsyncLLM +from vllm.v1.metrics.prometheus import setup_multiprocess_prometheus from dynamo.common.config_dump import dump_config +from dynamo.common.utils.prometheus import register_engine_metrics_callback from dynamo.llm import ( ModelInput, ModelRuntimeConfig, @@ -125,6 +127,11 @@ def setup_kv_event_publisher( def setup_vllm_engine(config, stat_logger=None): + setup_multiprocess_prometheus() + logger.debug( + f"Prometheus multiproc dir set to: {os.environ.get('PROMETHEUS_MULTIPROC_DIR')}" + ) + os.environ["VLLM_NO_USAGE_STATS"] = "1" # Avoid internal HTTP requests os.environ["VLLM_WORKER_MULTIPROC_METHOD"] = "spawn" @@ -161,6 +168,7 @@ def setup_vllm_engine(config, stat_logger=None): logger.info(f"VllmWorker for {config.model} has been initialized with LMCache") else: logger.info(f"VllmWorker for {config.model} has been initialized") + return engine_client, vllm_config, default_sampling_params @@ -272,6 +280,11 @@ async def init(runtime: DistributedRuntime, config: Config): if kv_publisher: handler.kv_publisher = kv_publisher + if config.engine_args.disable_log_stats is False: + from prometheus_client import REGISTRY + + register_engine_metrics_callback(generate_endpoint, REGISTRY, "vllm:", "vLLM") + if not config.engine_args.data_parallel_rank: # if rank is 0 or None then register runtime_config = ModelRuntimeConfig() diff --git a/lib/bindings/python/examples/metrics/README.md b/lib/bindings/python/examples/metrics/README.md index ad0ff2b220..6855714fb3 100644 --- a/lib/bindings/python/examples/metrics/README.md +++ b/lib/bindings/python/examples/metrics/README.md @@ -188,7 +188,7 @@ def update_metrics(): request_slots.set(compute_current_slots()) gpu_usage.set(get_gpu_usage()) -endpoint.metrics.register_update_callback(update_metrics) +endpoint.metrics.register_callback(update_metrics) ``` Both examples support vector metrics with labels: @@ -355,7 +355,7 @@ graph TD MT -->|return to Python| PY PY -->|metric.set/get| MT MT -->|direct FFI call| PROM - PY -.->|endpoint.metrics.register_update_callback| PM + PY -.->|endpoint.metrics.register_callback| PM PM -.->|drt.register_metrics_callback| DRT SS ==>|execute_metrics_callbacks| DRT DRT -.->|invoke Python callback| PY diff --git a/lib/bindings/python/examples/metrics/server_with_callback.py b/lib/bindings/python/examples/metrics/server_with_callback.py index 315191cc0c..52899cdc7b 100755 --- a/lib/bindings/python/examples/metrics/server_with_callback.py +++ b/lib/bindings/python/examples/metrics/server_with_callback.py @@ -83,7 +83,7 @@ def update_metrics(): gpu_cache_usage_perc.set(0.01 + (count * 0.01)) print(f"[python] Updated metrics (call #{count})") - endpoint.metrics.register_update_callback(update_metrics) + endpoint.metrics.register_callback(update_metrics) print("[python] update (metrics) callback registered!") # Step 3: Set initial values and test vector metrics diff --git a/lib/bindings/python/rust/prometheus_metrics.rs b/lib/bindings/python/rust/prometheus_metrics.rs index 20943559f1..a4d151350e 100644 --- a/lib/bindings/python/rust/prometheus_metrics.rs +++ b/lib/bindings/python/rust/prometheus_metrics.rs @@ -699,7 +699,9 @@ impl RuntimeMetrics { let hierarchy = registry_item.hierarchy(); // Store the callback in the DRT's metrics callback registry using the registry_item's hierarchy - registry_item.drt().register_metrics_callback( + // TODO: rename this to register_callback, once we move the the MetricsRegistry trait + // out of the runtime, and make it into a composed module. + registry_item.drt().register_prometheus_update_callback( vec![hierarchy.clone()], Arc::new(move || { // Execute the Python callback in the Python event loop @@ -720,10 +722,45 @@ impl RuntimeMetrics { impl RuntimeMetrics { /// Register a Python callback to be invoked before metrics are scraped /// This callback will be called for this endpoint's metrics hierarchy - fn register_update_callback(&self, callback: PyObject, _py: Python) -> PyResult<()> { + fn register_callback(&self, callback: PyObject, _py: Python) -> PyResult<()> { Self::register_callback_for(self.metricsregistry.as_ref(), callback) } + /// Register a Python callback that returns Prometheus exposition text + /// The returned text will be appended to the /metrics endpoint output + /// The callback should return a string in Prometheus text exposition format + fn register_prometheus_expfmt_callback(&self, callback: PyObject, _py: Python) -> PyResult<()> { + let hierarchy = self.metricsregistry.hierarchy(); + + // Store the callback in the DRT's metrics exposition text callback registry + self.metricsregistry.drt().register_prometheus_expfmt_callback( + vec![hierarchy.clone()], + Arc::new(move || { + // Execute the Python callback in the Python event loop + Python::with_gil(|py| { + match callback.call0(py) { + Ok(result) => { + // Try to extract a string from the result + match result.extract::(py) { + Ok(text) => Ok(text), + Err(e) => { + tracing::error!("Metrics exposition text callback must return a string: {}", e); + Ok(String::new()) + } + } + } + Err(e) => { + tracing::error!("Metrics exposition text callback failed: {}", e); + Ok(String::new()) + } + } + }) + }), + ); + + Ok(()) + } + // NOTE: The order of create_* methods below matches lib/runtime/src/metrics.rs::MetricsRegistry trait // Keep them synchronized when adding new metric types diff --git a/lib/bindings/python/src/dynamo/prometheus_metrics.pyi b/lib/bindings/python/src/dynamo/prometheus_metrics.pyi index 9b7482dfd6..cc320938bc 100644 --- a/lib/bindings/python/src/dynamo/prometheus_metrics.pyi +++ b/lib/bindings/python/src/dynamo/prometheus_metrics.pyi @@ -231,7 +231,7 @@ class RuntimeMetrics: Also provides utilities for registering metrics callbacks. """ - def register_update_callback(self, callback: Callable[[], None]) -> None: + def register_callback(self, callback: Callable[[], None]) -> None: """ Register a Python callback to be invoked before metrics are scraped. @@ -250,7 +250,30 @@ class RuntimeMetrics: def update_metrics(): counter.inc() - metrics.register_update_callback(update_metrics) + metrics.register_callback(update_metrics) + ``` + """ + ... + + def register_prometheus_expfmt_callback(self, callback: Callable[[], str]) -> None: + """ + Register a Python callback that returns Prometheus exposition text. + The returned text will be appended to the /metrics endpoint output. + + This allows you to integrate external Prometheus metrics (e.g. from vLLM) + directly into the endpoint's metrics output. + + Args: + callback: A callable that takes no arguments and returns a string + in Prometheus text exposition format + + Example: + ```python + def get_external_metrics(): + # Fetch metrics from external source + return "# HELP external_metric Some metric\\nexternal_metric 42.0\\n" + + metrics.register_prometheus_expfmt_callback(get_external_metrics) ``` """ ... diff --git a/lib/llm/src/kv_router/publisher.rs b/lib/llm/src/kv_router/publisher.rs index 98095392c3..1adca35f04 100644 --- a/lib/llm/src/kv_router/publisher.rs +++ b/lib/llm/src/kv_router/publisher.rs @@ -1438,8 +1438,8 @@ mod test_integration_publisher { assert_eq!(hit_rate_gauge.get(), 0.75); // Test 4: Verify metrics are properly registered in the component's registry - // Component implements MetricsRegistry trait which provides prometheus_metrics_fmt() - let prometheus_output = component.prometheus_metrics_fmt().unwrap(); + // Component implements MetricsRegistry trait which provides prometheus_expfmt() + let prometheus_output = component.prometheus_expfmt().unwrap(); // Verify metric names are present assert!(prometheus_output.contains(kvstats::ACTIVE_BLOCKS)); diff --git a/lib/runtime/src/distributed.rs b/lib/runtime/src/distributed.rs index ac114167a5..dabca1c8b6 100644 --- a/lib/runtime/src/distributed.rs +++ b/lib/runtime/src/distributed.rs @@ -5,7 +5,7 @@ pub use crate::component::Component; use crate::storage::key_value_store::{EtcdStore, KeyValueStore, MemoryStore}; use crate::transports::nats::DRTNatsClientPrometheusMetrics; use crate::{ - ErrorContext, RuntimeCallback, + ErrorContext, PrometheusUpdateCallback, component::{self, ComponentBuilder, Endpoint, InstanceSource, Namespace}, discovery::DiscoveryClient, metrics::MetricsRegistry, @@ -110,7 +110,8 @@ impl DistributedRuntime { Ok(()) } }); - distributed_runtime.register_metrics_callback(drt_hierarchies, nats_client_callback); + distributed_runtime + .register_prometheus_update_callback(drt_hierarchies, nats_client_callback); // Initialize the uptime gauge in SystemHealth distributed_runtime @@ -311,27 +312,31 @@ impl DistributedRuntime { .map_err(|e| e.into()) } - /// Add a callback function to metrics registries for the given hierarchies - // TODO: Rename to register_metrics_update_callback for consistency with Python API. - // Do this after we move the MetricsRegistry trait to composition pattern. - pub fn register_metrics_callback(&self, hierarchies: Vec, callback: RuntimeCallback) { + /// Add a Prometheus update callback to the given hierarchies + /// TODO: rename this to register_callback, once we move the the MetricsRegistry trait + /// out of the runtime, and make it into a composed module. + pub fn register_prometheus_update_callback( + &self, + hierarchies: Vec, + callback: PrometheusUpdateCallback, + ) { let mut registries = self.hierarchy_to_metricsregistry.write().unwrap(); for hierarchy in &hierarchies { registries .entry(hierarchy.clone()) .or_default() - .add_callback(callback.clone()); + .add_prometheus_update_callback(callback.clone()); } } - /// Execute all callbacks for a given hierarchy key and return their results - pub fn execute_metrics_callbacks(&self, hierarchy: &str) -> Vec> { + /// Execute all Prometheus update callbacks for a given hierarchy and return their results + pub fn execute_prometheus_update_callbacks(&self, hierarchy: &str) -> Vec> { // Clone callbacks while holding read lock (fast operation) let callbacks = { let registries = self.hierarchy_to_metricsregistry.read().unwrap(); registries .get(hierarchy) - .map(|entry| entry.runtime_callbacks.clone()) + .map(|entry| entry.prometheus_update_callbacks.clone()) }; // Read lock released here // Execute callbacks without holding the lock @@ -341,6 +346,21 @@ impl DistributedRuntime { } } + /// Add a Prometheus exposition text callback that returns Prometheus text for the given hierarchies + pub fn register_prometheus_expfmt_callback( + &self, + hierarchies: Vec, + callback: crate::PrometheusExpositionFormatCallback, + ) { + let mut registries = self.hierarchy_to_metricsregistry.write().unwrap(); + for hierarchy in &hierarchies { + registries + .entry(hierarchy.clone()) + .or_default() + .add_prometheus_expfmt_callback(callback.clone()); + } + } + /// Get all registered hierarchy keys. Private because it is only used for testing. fn get_registered_hierarchies(&self) -> Vec { let registries = self.hierarchy_to_metricsregistry.read().unwrap(); diff --git a/lib/runtime/src/lib.rs b/lib/runtime/src/lib.rs index db954d6a3a..37649ac49d 100644 --- a/lib/runtime/src/lib.rs +++ b/lib/runtime/src/lib.rs @@ -88,14 +88,20 @@ pub struct Runtime { /// - Used in generic contexts requiring 'static lifetime /// /// The Arc wrapper is included in the type to make sharing explicit. -type RuntimeCallback = Arc anyhow::Result<()> + Send + Sync + 'static>; +type PrometheusUpdateCallback = Arc anyhow::Result<()> + Send + Sync + 'static>; + +/// Type alias for exposition text callback functions that return Prometheus text +type PrometheusExpositionFormatCallback = + Arc anyhow::Result + Send + Sync + 'static>; /// Structure to hold Prometheus registries and associated callbacks for a given hierarchy pub struct MetricsRegistryEntry { /// The Prometheus registry for this prefix pub prometheus_registry: prometheus::Registry, - /// List of function callbacks that receive a reference to any MetricsRegistry - pub runtime_callbacks: Vec, + /// List of update callbacks invoked before metrics are scraped + pub prometheus_update_callbacks: Vec, + /// List of callbacks that return Prometheus exposition text to be appended to metrics output + pub prometheus_expfmt_callbacks: Vec, } impl MetricsRegistryEntry { @@ -103,23 +109,50 @@ impl MetricsRegistryEntry { pub fn new() -> Self { Self { prometheus_registry: prometheus::Registry::new(), - runtime_callbacks: Vec::new(), + prometheus_update_callbacks: Vec::new(), + prometheus_expfmt_callbacks: Vec::new(), } } /// Add a callback function that receives a reference to any MetricsRegistry - pub fn add_callback(&mut self, callback: RuntimeCallback) { - self.runtime_callbacks.push(callback); + pub fn add_prometheus_update_callback(&mut self, callback: PrometheusUpdateCallback) { + self.prometheus_update_callbacks.push(callback); + } + + /// Add an exposition text callback that returns Prometheus text + pub fn add_prometheus_expfmt_callback(&mut self, callback: PrometheusExpositionFormatCallback) { + self.prometheus_expfmt_callbacks.push(callback); } - /// Execute all runtime callbacks and return their results - pub fn execute_callbacks(&self) -> Vec> { - self.runtime_callbacks + /// Execute all update callbacks and return their results + pub fn execute_prometheus_update_callbacks(&self) -> Vec> { + self.prometheus_update_callbacks .iter() .map(|callback| callback()) .collect() } + /// Execute all exposition text callbacks and return their concatenated text + pub fn execute_prometheus_expfmt_callbacks(&self) -> String { + let mut result = String::new(); + for callback in &self.prometheus_expfmt_callbacks { + match callback() { + Ok(text) => { + if !text.is_empty() { + if !result.is_empty() && !result.ends_with('\n') { + result.push('\n'); + } + result.push_str(&text); + } + } + Err(e) => { + tracing::error!("Error executing exposition text callback: {}", e); + } + } + } + result + } + /// Returns true if a metric with the given name already exists in the Prometheus registry pub fn has_metric_named(&self, metric_name: &str) -> bool { self.prometheus_registry @@ -139,7 +172,8 @@ impl Clone for MetricsRegistryEntry { fn clone(&self) -> Self { Self { prometheus_registry: self.prometheus_registry.clone(), - runtime_callbacks: Vec::new(), // Callbacks cannot be cloned, so we start with an empty list + prometheus_update_callbacks: Vec::new(), // Callbacks cannot be cloned, so we start with an empty list + prometheus_expfmt_callbacks: Vec::new(), // Callbacks cannot be cloned, so we start with an empty list } } } diff --git a/lib/runtime/src/metrics.rs b/lib/runtime/src/metrics.rs index d0b5df289c..fe99b616ab 100644 --- a/lib/runtime/src/metrics.rs +++ b/lib/runtime/src/metrics.rs @@ -554,9 +554,11 @@ pub trait MetricsRegistry: Send + Sync + DistributedRuntimeProvider { } /// Get metrics in Prometheus text format - fn prometheus_metrics_fmt(&self) -> anyhow::Result { + fn prometheus_expfmt(&self) -> anyhow::Result { // Execute callbacks first to ensure any new metrics are added to the registry - let callback_results = self.drt().execute_metrics_callbacks(&self.hierarchy()); + let callback_results = self + .drt() + .execute_prometheus_update_callbacks(&self.hierarchy()); // Log any callback errors but continue for result in callback_results { @@ -565,20 +567,31 @@ pub trait MetricsRegistry: Send + Sync + DistributedRuntimeProvider { } } - // Get the Prometheus registry for this hierarchy - let prometheus_registry = { + // Get the Prometheus registry for this hierarchy and execute exposition text callbacks + let (prometheus_registry, expfmt) = { let mut registry_entry = self.drt().hierarchy_to_metricsregistry.write().unwrap(); - registry_entry - .entry(self.hierarchy()) - .or_default() - .prometheus_registry - .clone() + let entry = registry_entry.entry(self.hierarchy()).or_default(); + let registry = entry.prometheus_registry.clone(); + let text = entry.execute_prometheus_expfmt_callbacks(); + (registry, text) }; + + // Encode metrics from the registry let metric_families = prometheus_registry.gather(); let encoder = prometheus::TextEncoder::new(); let mut buffer = Vec::new(); encoder.encode(&metric_families, &mut buffer)?; - Ok(String::from_utf8(buffer)?) + let mut result = String::from_utf8(buffer)?; + + // Append exposition text callback results if any + if !expfmt.is_empty() { + if !result.ends_with('\n') { + result.push('\n'); + } + result.push_str(&expfmt); + } + + Ok(result) } } @@ -769,7 +782,7 @@ mod test_metricsregistry_units { // Add callbacks with different increment values for increment in [1, 10, 100] { let counter_clone = counter.clone(); - entry.add_callback(Arc::new(move || { + entry.add_prometheus_update_callback(Arc::new(move || { counter_clone.fetch_add(increment, Ordering::SeqCst); Ok(()) })); @@ -779,23 +792,23 @@ mod test_metricsregistry_units { assert_eq!(counter.load(Ordering::SeqCst), 0); // First execution - let results = entry.execute_callbacks(); + let results = entry.execute_prometheus_update_callbacks(); assert_eq!(results.len(), 3); assert!(results.iter().all(|r| r.is_ok())); assert_eq!(counter.load(Ordering::SeqCst), 111); // 1 + 10 + 100 // Second execution - callbacks should be reusable - let results = entry.execute_callbacks(); + let results = entry.execute_prometheus_update_callbacks(); assert_eq!(results.len(), 3); assert_eq!(counter.load(Ordering::SeqCst), 222); // 111 + 111 // Test cloning - cloned entry should have no callbacks let cloned = entry.clone(); - assert_eq!(cloned.execute_callbacks().len(), 0); + assert_eq!(cloned.execute_prometheus_update_callbacks().len(), 0); assert_eq!(counter.load(Ordering::SeqCst), 222); // No change // Original still has callbacks - entry.execute_callbacks(); + entry.execute_prometheus_update_callbacks(); assert_eq!(counter.load(Ordering::SeqCst), 333); // 222 + 111 } @@ -806,23 +819,25 @@ mod test_metricsregistry_units { // Successful callback let counter_clone = counter.clone(); - entry.add_callback(Arc::new(move || { + entry.add_prometheus_update_callback(Arc::new(move || { counter_clone.fetch_add(1, Ordering::SeqCst); Ok(()) })); // Error callback - entry.add_callback(Arc::new(|| Err(anyhow::anyhow!("Simulated error")))); + entry.add_prometheus_update_callback(Arc::new(|| { + Err(anyhow::anyhow!("Simulated error")) + })); // Another successful callback let counter_clone = counter.clone(); - entry.add_callback(Arc::new(move || { + entry.add_prometheus_update_callback(Arc::new(move || { counter_clone.fetch_add(10, Ordering::SeqCst); Ok(()) })); // Execute and verify mixed results - let results = entry.execute_callbacks(); + let results = entry.execute_prometheus_update_callbacks(); assert_eq!(results.len(), 3); assert!(results[0].is_ok()); assert!(results[1].is_err()); @@ -838,7 +853,7 @@ mod test_metricsregistry_units { assert_eq!(counter.load(Ordering::SeqCst), 11); // 1 + 10 // Execute again - errors should be consistent - let results = entry.execute_callbacks(); + let results = entry.execute_prometheus_update_callbacks(); assert!(results[1].is_err()); assert_eq!(counter.load(Ordering::SeqCst), 22); // 11 + 11 } @@ -846,7 +861,7 @@ mod test_metricsregistry_units { // Test 3: Empty registry { let entry = MetricsRegistryEntry::new(); - let results = entry.execute_callbacks(); + let results = entry.execute_prometheus_update_callbacks(); assert_eq!(results.len(), 0); } } @@ -1023,7 +1038,7 @@ mod test_metricsregistry_prometheus_fmt_outputs { let epsilon = 0.01; assert!((counter.get() - 123.456789).abs() < epsilon); - let endpoint_output_raw = endpoint.prometheus_metrics_fmt().unwrap(); + let endpoint_output_raw = endpoint.prometheus_expfmt().unwrap(); println!("Endpoint output:"); println!("{}", endpoint_output_raw); @@ -1052,7 +1067,7 @@ dynamo_component_testcounter{dynamo_component="comp345",dynamo_endpoint="ep345", assert_eq!(gauge.get(), 50000.0); // Test Prometheus format output for Component (gauge + histogram) - let component_output_raw = component.prometheus_metrics_fmt().unwrap(); + let component_output_raw = component.prometheus_expfmt().unwrap(); println!("Component output:"); println!("{}", component_output_raw); @@ -1083,7 +1098,7 @@ dynamo_component_testgauge{dynamo_component="comp345",dynamo_namespace="ns345"} assert_eq!(intcounter.get(), 12345); // Test Prometheus format output for Namespace (int_counter + gauge + histogram) - let namespace_output_raw = namespace.prometheus_metrics_fmt().unwrap(); + let namespace_output_raw = namespace.prometheus_expfmt().unwrap(); println!("Namespace output:"); println!("{}", namespace_output_raw); @@ -1154,7 +1169,7 @@ dynamo_component_testintcounter{dynamo_namespace="ns345"} 12345"#.to_string(); histogram.observe(4.0); // Test Prometheus format output for DRT (all metrics combined) - let drt_output_raw = drt.prometheus_metrics_fmt().unwrap(); + let drt_output_raw = drt.prometheus_expfmt().unwrap(); println!("DRT output:"); println!("{}", drt_output_raw); @@ -1270,7 +1285,7 @@ mod test_metricsregistry_nats { let drt = create_test_drt_async().await; // Get DRT output which should include NATS client metrics - let drt_output = drt.prometheus_metrics_fmt().unwrap(); + let drt_output = drt.prometheus_expfmt().unwrap(); println!("DRT output with NATS metrics:"); println!("{}", drt_output); @@ -1342,7 +1357,7 @@ mod test_metricsregistry_nats { // Get components output which should include NATS client metrics // Additional checks for NATS client metrics (without checking specific values) let component_nats_metrics = - super::test_helpers::extract_nats_lines(&components.prometheus_metrics_fmt().unwrap()); + super::test_helpers::extract_nats_lines(&components.prometheus_expfmt().unwrap()); println!( "Component NATS metrics count: {}", component_nats_metrics.len() @@ -1356,7 +1371,7 @@ mod test_metricsregistry_nats { // Check for specific NATS client metric names (without values) let component_metrics = - super::test_helpers::extract_metrics(&components.prometheus_metrics_fmt().unwrap()); + super::test_helpers::extract_metrics(&components.prometheus_expfmt().unwrap()); let actual_component_nats_metrics_sorted: Vec<&str> = component_metrics .iter() .map(|line| { @@ -1392,7 +1407,7 @@ mod test_metricsregistry_nats { ); // Get both DRT and component output and filter for NATS metrics only - let drt_output = drt.prometheus_metrics_fmt().unwrap(); + let drt_output = drt.prometheus_expfmt().unwrap(); let drt_nats_lines = super::test_helpers::extract_nats_lines(&drt_output); let drt_and_component_nats_metrics = super::test_helpers::extract_metrics(&drt_nats_lines.join("\n")); @@ -1454,7 +1469,7 @@ mod test_metricsregistry_nats { sleep(Duration::from_millis(500)).await; println!("✓ Launched endpoint service in background successfully"); - let drt_output = drt.prometheus_metrics_fmt().unwrap(); + let drt_output = drt.prometheus_expfmt().unwrap(); let parsed_metrics: Vec<_> = drt_output .lines() .filter_map(super::test_helpers::parse_prometheus_metric) @@ -1583,7 +1598,7 @@ mod test_metricsregistry_nats { sleep(Duration::from_millis(500)).await; println!("✓ Wait complete, getting final metrics..."); - let final_drt_output = drt.prometheus_metrics_fmt().unwrap(); + let final_drt_output = drt.prometheus_expfmt().unwrap(); println!("\n=== Final Prometheus DRT output ==="); println!("{}", final_drt_output); diff --git a/lib/runtime/src/service.rs b/lib/runtime/src/service.rs index 6c4fafb4e4..8fbc8c6f35 100644 --- a/lib/runtime/src/service.rs +++ b/lib/runtime/src/service.rs @@ -304,7 +304,7 @@ mod tests { /// 3. Prometheus scrapes these Gauge values (snapshots, not live data) /// /// Flow: NATS Service → NatsStatsMetrics (Counters) → Metrics Callback → Prometheus Gauge -/// Note: These are snapshots updated when execute_metrics_callbacks() is called. +/// Note: These are snapshots updated when execute_prometheus_update_callbacks() is called. #[derive(Debug, Clone)] /// Prometheus metrics for NATS server components. /// Note: Metrics with `_total` names use IntGauge because we copy counter values diff --git a/lib/runtime/src/system_status_server.rs b/lib/runtime/src/system_status_server.rs index 816ad540e3..1cc4fd6def 100644 --- a/lib/runtime/src/system_status_server.rs +++ b/lib/runtime/src/system_status_server.rs @@ -200,7 +200,7 @@ async fn metrics_handler(state: Arc) -> impl IntoResponse { }; for hierarchy in &all_hierarchies { - let callback_results = state.drt().execute_metrics_callbacks(hierarchy); + let callback_results = state.drt().execute_prometheus_update_callbacks(hierarchy); for result in callback_results { if let Err(e) = result { tracing::error!( @@ -213,16 +213,37 @@ async fn metrics_handler(state: Arc) -> impl IntoResponse { } // Get all metrics from DistributedRuntime (top-level) - match state.drt().prometheus_metrics_fmt() { - Ok(response) => (StatusCode::OK, response), + let mut response = match state.drt().prometheus_expfmt() { + Ok(r) => r, Err(e) => { tracing::error!("Failed to get metrics from registry: {}", e); - ( + return ( StatusCode::INTERNAL_SERVER_ERROR, "Failed to get metrics".to_string(), - ) + ); + } + }; + + // Collect and append Prometheus exposition text from all hierarchies + for hierarchy in &all_hierarchies { + let expfmt = { + let registries = state.drt().hierarchy_to_metricsregistry.read().unwrap(); + if let Some(entry) = registries.get(hierarchy) { + entry.execute_prometheus_expfmt_callbacks() + } else { + String::new() + } + }; + + if !expfmt.is_empty() { + if !response.ends_with('\n') { + response.push('\n'); + } + response.push_str(&expfmt); } } + + (StatusCode::OK, response) } // Regular tests: cargo test system_status_server --lib @@ -301,7 +322,7 @@ mod integration_tests { // so we don't need to create it again here // The uptime_seconds metric should already be registered and available - let response = drt.prometheus_metrics_fmt().unwrap(); + let response = drt.prometheus_expfmt().unwrap(); println!("Full metrics response:\n{}", response); // Filter out NATS client metrics for comparison