Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/observability/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ The Dynamo HTTP Frontend (`python -m dynamo.frontend`) exposes `dynamo_frontend_
- `dynamo_frontend_queued_requests`: Number of requests in HTTP processing queue (gauge)
- `dynamo_frontend_disconnected_clients`: Number of disconnected clients (gauge)
- `dynamo_frontend_input_sequence_tokens`: Input sequence length (histogram)
- `dynamo_frontend_cached_sequence_length`: Number of cached tokens (prefix cache hits) per request (histogram)
- `dynamo_frontend_inter_token_latency_seconds`: Inter-token latency (histogram)
- `dynamo_frontend_output_sequence_tokens`: Output sequence length (histogram)
- `dynamo_frontend_output_tokens_total`: Total number of output tokens generated (counter)
Expand Down
6 changes: 6 additions & 0 deletions lib/bindings/python/src/dynamo/prometheus_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class frontend_service:
INPUT_SEQUENCE_TOKENS = "input_sequence_tokens"
# Output sequence length in tokens
OUTPUT_SEQUENCE_TOKENS = "output_sequence_tokens"
# Number of cached tokens (prefix cache hits) per request
CACHED_SEQUENCE_LENGTH = "cached_sequence_length"
# Total number of output tokens generated (counter that updates in real-time)
OUTPUT_TOKENS_TOTAL = "output_tokens_total"
# Time to first token in seconds
Expand Down Expand Up @@ -93,6 +95,10 @@ class kvbm:
ONBOARD_BLOCKS_D2D = "onboard_blocks_d2d"
# The number of matched tokens
MATCHED_TOKENS = "matched_tokens"
# Host cache hit rate (0.0-1.0) from the sliding window
HOST_CACHE_HIT_RATE = "host_cache_hit_rate"
# Disk cache hit rate (0.0-1.0) from the sliding window
DISK_CACHE_HIT_RATE = "disk_cache_hit_rate"


class kvrouter:
Expand Down
210 changes: 205 additions & 5 deletions lib/llm/src/http/service/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ pub struct Metrics {
request_duration: HistogramVec,
input_sequence_length: HistogramVec,
output_sequence_length: HistogramVec,
cached_sequence_length: HistogramVec,
output_tokens_counter: IntCounterVec,
time_to_first_token: HistogramVec,
inter_token_latency: HistogramVec,
Expand Down Expand Up @@ -252,6 +253,8 @@ pub struct ResponseMetricCollector {
// be computed.
last_response_time: Option<Duration>,
osl: usize,
// we track if cached_tokens has been observed to ensure we only increment once per request
cached_tokens_observed: bool,
}

impl Default for Metrics {
Expand Down Expand Up @@ -378,7 +381,7 @@ impl Metrics {
frontend_metric_name(frontend_service::INPUT_SEQUENCE_TOKENS),
"Input sequence length in tokens",
)
.buckets(input_sequence_buckets),
.buckets(input_sequence_buckets.clone()),
&["model"],
)
.unwrap();
Expand Down Expand Up @@ -436,6 +439,16 @@ impl Metrics {
)
.unwrap();

let cached_sequence_length = HistogramVec::new(
HistogramOpts::new(
frontend_metric_name(frontend_service::CACHED_SEQUENCE_LENGTH),
"Number of cached tokens (prefix cache hits) per request",
)
.buckets(input_sequence_buckets.clone()),
&["model"],
)
.unwrap();

// Runtime configuration metrics
// Note: Some of these metrics represent counter-like values from source systems,
// but are implemented as gauges because they are copied/synchronized from upstream
Expand Down Expand Up @@ -502,6 +515,7 @@ impl Metrics {
request_duration,
input_sequence_length,
output_sequence_length,
cached_sequence_length,
output_tokens_counter,
time_to_first_token,
inter_token_latency,
Expand Down Expand Up @@ -597,6 +611,7 @@ impl Metrics {
registry.register(Box::new(self.request_duration.clone()))?;
registry.register(Box::new(self.input_sequence_length.clone()))?;
registry.register(Box::new(self.output_sequence_length.clone()))?;
registry.register(Box::new(self.cached_sequence_length.clone()))?;
registry.register(Box::new(self.output_tokens_counter.clone()))?;
registry.register(Box::new(self.time_to_first_token.clone()))?;
registry.register(Box::new(self.inter_token_latency.clone()))?;
Expand Down Expand Up @@ -830,6 +845,7 @@ impl ResponseMetricCollector {
last_response_time: None,
start_time: Instant::now(),
osl: 0,
cached_tokens_observed: false,
}
}

Expand All @@ -843,6 +859,19 @@ impl ResponseMetricCollector {
self.is_first_token
}

/// Observe cached tokens (prefix cache hits), observing only once per request when value is available
pub fn observe_cached_tokens(&mut self, cached_tokens: Option<usize>) {
if let Some(tokens) = cached_tokens
&& !self.cached_tokens_observed
{
self.cached_tokens_observed = true;
self.metrics
.cached_sequence_length
.with_label_values(&[&self.model])
.observe(tokens as f64);
}
}

/// Observe a response with input sequence length and number of new tokens
pub fn observe_response(&mut self, isl: usize, num_tokens: usize) {
if num_tokens == 0 {
Expand Down Expand Up @@ -943,18 +972,21 @@ impl<T> From<crate::types::Annotated<T>> for EventConverter<T> {
///
/// This function handles metrics collection, http_queue_guard management, and converts
/// annotated responses to SSE events for streaming responses.
///
/// Returns None for service events (events with no data and no event type) to filter them out.
pub fn process_response_using_event_converter_and_observe_metrics<T: Serialize>(
annotated: EventConverter<T>,
response_collector: &mut ResponseMetricCollector,
http_queue_guard: &mut Option<HttpQueueGuard>,
) -> Result<Event, axum::Error> {
) -> Result<Option<Event>, axum::Error> {
use crate::preprocessor::LLMMetricAnnotation;

let mut annotated = annotated.0;

// update metrics
if let Ok(Some(metrics)) = LLMMetricAnnotation::from_annotation(&annotated) {
response_collector.observe_current_osl(metrics.output_tokens);
response_collector.observe_cached_tokens(metrics.cached_tokens);

// Drop http_queue_guard on first token for streaming
if response_collector.is_first_token()
Expand All @@ -976,11 +1008,11 @@ pub fn process_response_using_event_converter_and_observe_metrics<T: Serialize>(

let mut event = Event::default();

if let Some(data) = annotated.data {
if let Some(ref data) = annotated.data {
event = event.json_data(data)?;
}

if let Some(msg) = annotated.event {
if let Some(ref msg) = annotated.event {
if msg == "error" {
let msgs = annotated
.comment
Expand All @@ -996,7 +1028,12 @@ pub fn process_response_using_event_converter_and_observe_metrics<T: Serialize>(
}
}

Ok(event)
// Filter out service events (events with no data and no event type)
if annotated.data.is_none() && annotated.event.is_none() {
Ok(None)
} else {
Ok(Some(event))
}
}

/// Create a new router with optional custom backend metrics support
Expand Down Expand Up @@ -1357,4 +1394,167 @@ mod tests {
20
);
}

#[test]
fn test_cached_tokens_counter_increments() {
let metrics = Arc::new(Metrics::new());
let registry = prometheus::Registry::new();
metrics.register(&registry).unwrap();

let model = "test-model";
let expected_metric_name = "dynamo_frontend_cached_sequence_length";
let mut collector = metrics.clone().create_response_collector(model);

// Create histogram handle first (this registers it with the registry)
let _histogram = metrics.cached_sequence_length.with_label_values(&[model]);

// Observe cached tokens
collector.observe_cached_tokens(Some(100));

// Verify histogram recorded the observation
let metric_families = registry.gather();
let histogram_family = metric_families
.iter()
.find(|mf| mf.name() == expected_metric_name)
.expect("histogram should be registered");
assert_eq!(histogram_family.get_metric().len(), 1);
assert_eq!(
histogram_family.get_metric()[0]
.get_histogram()
.get_sample_count(),
1
);

// Observe more cached tokens with the same collector (should be idempotent)
collector.observe_cached_tokens(Some(50));

// Sample count should remain 1 (idempotent)
let metric_families = registry.gather();
let histogram_family = metric_families
.iter()
.find(|mf| mf.name() == expected_metric_name)
.expect("histogram should be registered");
assert_eq!(
histogram_family.get_metric()[0]
.get_histogram()
.get_sample_count(),
1
);
}

#[test]
fn test_cached_tokens_once_per_request() {
let metrics = Arc::new(Metrics::new());
let registry = prometheus::Registry::new();
metrics.register(&registry).unwrap();

let model = "test-model";
let expected_metric_name = "dynamo_frontend_cached_sequence_length";
let mut collector = metrics.clone().create_response_collector(model);

// Create histogram handle first
let _histogram = metrics.cached_sequence_length.with_label_values(&[model]);

// First call should observe and record 1 sample
collector.observe_cached_tokens(Some(100));
let metric_families = registry.gather();
let histogram_family = metric_families
.iter()
.find(|mf| mf.name() == expected_metric_name)
.expect("histogram should be registered");
assert_eq!(
histogram_family.get_metric()[0]
.get_histogram()
.get_sample_count(),
1
);

// Second call with same collector should not observe again (idempotent)
collector.observe_cached_tokens(Some(50));
let metric_families = registry.gather();
let histogram_family = metric_families
.iter()
.find(|mf| mf.name() == expected_metric_name)
.expect("histogram should be registered");
assert_eq!(
histogram_family.get_metric()[0]
.get_histogram()
.get_sample_count(),
1
);

// Third call with different value should still be idempotent
collector.observe_cached_tokens(Some(75));
let metric_families = registry.gather();
let histogram_family = metric_families
.iter()
.find(|mf| mf.name() == expected_metric_name)
.expect("histogram should be registered");
assert_eq!(
histogram_family.get_metric()[0]
.get_histogram()
.get_sample_count(),
1
);
}

#[test]
fn test_ghost_event_handling() {
use crate::preprocessor::LLMMetricAnnotation;
use crate::types::Annotated;

let metrics = Arc::new(Metrics::new());
let registry = prometheus::Registry::new();
metrics.register(&registry).unwrap();

let model = "test-model";
let expected_metric_name = "dynamo_frontend_cached_sequence_length";
let mut collector = metrics.clone().create_response_collector(model);

// Create a service event (ghost event) with metrics annotation but no data/event
let mut annotated = Annotated::<
crate::protocols::openai::chat_completions::NvCreateChatCompletionStreamResponse,
> {
id: None,
data: None,
event: Some(crate::preprocessor::ANNOTATION_LLM_METRICS.to_string()),
comment: None,
};

// Add metrics annotation with cached_tokens
let llm_metrics = LLMMetricAnnotation {
input_tokens: 10,
output_tokens: 20,
chunk_tokens: 5,
cached_tokens: Some(15),
};

let annotation = llm_metrics.to_annotation::<()>().unwrap();
annotated.event = annotation.event;
annotated.comment = annotation.comment;

// Process the event
let mut http_queue_guard = None;
let result = process_response_using_event_converter_and_observe_metrics(
EventConverter::from(annotated),
&mut collector,
&mut http_queue_guard,
);

// Should return Ok(None) for service events
assert!(matches!(result, Ok(None)));

// Should have observed the cached tokens from the ghost event
let metric_families = registry.gather();
let histogram_family = metric_families
.iter()
.find(|mf| mf.name() == expected_metric_name)
.expect("histogram should be registered");
assert_eq!(
histogram_family.get_metric()[0]
.get_histogram()
.get_sample_count(),
1
);
}
}
Loading
Loading