diff --git a/docs/observability/metrics.md b/docs/observability/metrics.md index 4b4781f761..6eb410d706 100644 --- a/docs/observability/metrics.md +++ b/docs/observability/metrics.md @@ -152,6 +152,7 @@ The Dynamo HTTP Frontend (`python -m dynamo.frontend`) exposes `dynamo_frontend_ - `dynamo_frontend_queued_requests`: Number of requests in HTTP processing queue (gauge) - `dynamo_frontend_disconnected_clients`: Number of disconnected clients (gauge) - `dynamo_frontend_input_sequence_tokens`: Input sequence length (histogram) +- `dynamo_frontend_cached_sequence_length`: Number of cached tokens (prefix cache hits) per request (histogram) - `dynamo_frontend_inter_token_latency_seconds`: Inter-token latency (histogram) - `dynamo_frontend_output_sequence_tokens`: Output sequence length (histogram) - `dynamo_frontend_output_tokens_total`: Total number of output tokens generated (counter) diff --git a/lib/bindings/python/src/dynamo/prometheus_names.py b/lib/bindings/python/src/dynamo/prometheus_names.py index 615edad127..88cfe156de 100644 --- a/lib/bindings/python/src/dynamo/prometheus_names.py +++ b/lib/bindings/python/src/dynamo/prometheus_names.py @@ -55,6 +55,8 @@ class frontend_service: INPUT_SEQUENCE_TOKENS = "input_sequence_tokens" # Output sequence length in tokens OUTPUT_SEQUENCE_TOKENS = "output_sequence_tokens" + # Number of cached tokens (prefix cache hits) per request + CACHED_SEQUENCE_LENGTH = "cached_sequence_length" # Total number of output tokens generated (counter that updates in real-time) OUTPUT_TOKENS_TOTAL = "output_tokens_total" # Time to first token in seconds @@ -93,6 +95,10 @@ class kvbm: ONBOARD_BLOCKS_D2D = "onboard_blocks_d2d" # The number of matched tokens MATCHED_TOKENS = "matched_tokens" + # Host cache hit rate (0.0-1.0) from the sliding window + HOST_CACHE_HIT_RATE = "host_cache_hit_rate" + # Disk cache hit rate (0.0-1.0) from the sliding window + DISK_CACHE_HIT_RATE = "disk_cache_hit_rate" class kvrouter: diff --git a/lib/llm/src/http/service/metrics.rs b/lib/llm/src/http/service/metrics.rs index 65f3867f39..35a344322c 100644 --- a/lib/llm/src/http/service/metrics.rs +++ b/lib/llm/src/http/service/metrics.rs @@ -165,6 +165,7 @@ pub struct Metrics { request_duration: HistogramVec, input_sequence_length: HistogramVec, output_sequence_length: HistogramVec, + cached_sequence_length: HistogramVec, output_tokens_counter: IntCounterVec, time_to_first_token: HistogramVec, inter_token_latency: HistogramVec, @@ -252,6 +253,8 @@ pub struct ResponseMetricCollector { // be computed. last_response_time: Option, osl: usize, + // we track if cached_tokens has been observed to ensure we only increment once per request + cached_tokens_observed: bool, } impl Default for Metrics { @@ -378,7 +381,7 @@ impl Metrics { frontend_metric_name(frontend_service::INPUT_SEQUENCE_TOKENS), "Input sequence length in tokens", ) - .buckets(input_sequence_buckets), + .buckets(input_sequence_buckets.clone()), &["model"], ) .unwrap(); @@ -436,6 +439,16 @@ impl Metrics { ) .unwrap(); + let cached_sequence_length = HistogramVec::new( + HistogramOpts::new( + frontend_metric_name(frontend_service::CACHED_SEQUENCE_LENGTH), + "Number of cached tokens (prefix cache hits) per request", + ) + .buckets(input_sequence_buckets.clone()), + &["model"], + ) + .unwrap(); + // Runtime configuration metrics // Note: Some of these metrics represent counter-like values from source systems, // but are implemented as gauges because they are copied/synchronized from upstream @@ -502,6 +515,7 @@ impl Metrics { request_duration, input_sequence_length, output_sequence_length, + cached_sequence_length, output_tokens_counter, time_to_first_token, inter_token_latency, @@ -597,6 +611,7 @@ impl Metrics { registry.register(Box::new(self.request_duration.clone()))?; registry.register(Box::new(self.input_sequence_length.clone()))?; registry.register(Box::new(self.output_sequence_length.clone()))?; + registry.register(Box::new(self.cached_sequence_length.clone()))?; registry.register(Box::new(self.output_tokens_counter.clone()))?; registry.register(Box::new(self.time_to_first_token.clone()))?; registry.register(Box::new(self.inter_token_latency.clone()))?; @@ -830,6 +845,7 @@ impl ResponseMetricCollector { last_response_time: None, start_time: Instant::now(), osl: 0, + cached_tokens_observed: false, } } @@ -843,6 +859,19 @@ impl ResponseMetricCollector { self.is_first_token } + /// Observe cached tokens (prefix cache hits), observing only once per request when value is available + pub fn observe_cached_tokens(&mut self, cached_tokens: Option) { + if let Some(tokens) = cached_tokens + && !self.cached_tokens_observed + { + self.cached_tokens_observed = true; + self.metrics + .cached_sequence_length + .with_label_values(&[&self.model]) + .observe(tokens as f64); + } + } + /// Observe a response with input sequence length and number of new tokens pub fn observe_response(&mut self, isl: usize, num_tokens: usize) { if num_tokens == 0 { @@ -943,11 +972,13 @@ impl From> for EventConverter { /// /// This function handles metrics collection, http_queue_guard management, and converts /// annotated responses to SSE events for streaming responses. +/// +/// Returns None for service events (events with no data and no event type) to filter them out. pub fn process_response_using_event_converter_and_observe_metrics( annotated: EventConverter, response_collector: &mut ResponseMetricCollector, http_queue_guard: &mut Option, -) -> Result { +) -> Result, axum::Error> { use crate::preprocessor::LLMMetricAnnotation; let mut annotated = annotated.0; @@ -955,6 +986,7 @@ pub fn process_response_using_event_converter_and_observe_metrics( // update metrics if let Ok(Some(metrics)) = LLMMetricAnnotation::from_annotation(&annotated) { response_collector.observe_current_osl(metrics.output_tokens); + response_collector.observe_cached_tokens(metrics.cached_tokens); // Drop http_queue_guard on first token for streaming if response_collector.is_first_token() @@ -976,11 +1008,11 @@ pub fn process_response_using_event_converter_and_observe_metrics( let mut event = Event::default(); - if let Some(data) = annotated.data { + if let Some(ref data) = annotated.data { event = event.json_data(data)?; } - if let Some(msg) = annotated.event { + if let Some(ref msg) = annotated.event { if msg == "error" { let msgs = annotated .comment @@ -996,7 +1028,12 @@ pub fn process_response_using_event_converter_and_observe_metrics( } } - Ok(event) + // Filter out service events (events with no data and no event type) + if annotated.data.is_none() && annotated.event.is_none() { + Ok(None) + } else { + Ok(Some(event)) + } } /// Create a new router with optional custom backend metrics support @@ -1357,4 +1394,167 @@ mod tests { 20 ); } + + #[test] + fn test_cached_tokens_counter_increments() { + let metrics = Arc::new(Metrics::new()); + let registry = prometheus::Registry::new(); + metrics.register(®istry).unwrap(); + + let model = "test-model"; + let expected_metric_name = "dynamo_frontend_cached_sequence_length"; + let mut collector = metrics.clone().create_response_collector(model); + + // Create histogram handle first (this registers it with the registry) + let _histogram = metrics.cached_sequence_length.with_label_values(&[model]); + + // Observe cached tokens + collector.observe_cached_tokens(Some(100)); + + // Verify histogram recorded the observation + let metric_families = registry.gather(); + let histogram_family = metric_families + .iter() + .find(|mf| mf.name() == expected_metric_name) + .expect("histogram should be registered"); + assert_eq!(histogram_family.get_metric().len(), 1); + assert_eq!( + histogram_family.get_metric()[0] + .get_histogram() + .get_sample_count(), + 1 + ); + + // Observe more cached tokens with the same collector (should be idempotent) + collector.observe_cached_tokens(Some(50)); + + // Sample count should remain 1 (idempotent) + let metric_families = registry.gather(); + let histogram_family = metric_families + .iter() + .find(|mf| mf.name() == expected_metric_name) + .expect("histogram should be registered"); + assert_eq!( + histogram_family.get_metric()[0] + .get_histogram() + .get_sample_count(), + 1 + ); + } + + #[test] + fn test_cached_tokens_once_per_request() { + let metrics = Arc::new(Metrics::new()); + let registry = prometheus::Registry::new(); + metrics.register(®istry).unwrap(); + + let model = "test-model"; + let expected_metric_name = "dynamo_frontend_cached_sequence_length"; + let mut collector = metrics.clone().create_response_collector(model); + + // Create histogram handle first + let _histogram = metrics.cached_sequence_length.with_label_values(&[model]); + + // First call should observe and record 1 sample + collector.observe_cached_tokens(Some(100)); + let metric_families = registry.gather(); + let histogram_family = metric_families + .iter() + .find(|mf| mf.name() == expected_metric_name) + .expect("histogram should be registered"); + assert_eq!( + histogram_family.get_metric()[0] + .get_histogram() + .get_sample_count(), + 1 + ); + + // Second call with same collector should not observe again (idempotent) + collector.observe_cached_tokens(Some(50)); + let metric_families = registry.gather(); + let histogram_family = metric_families + .iter() + .find(|mf| mf.name() == expected_metric_name) + .expect("histogram should be registered"); + assert_eq!( + histogram_family.get_metric()[0] + .get_histogram() + .get_sample_count(), + 1 + ); + + // Third call with different value should still be idempotent + collector.observe_cached_tokens(Some(75)); + let metric_families = registry.gather(); + let histogram_family = metric_families + .iter() + .find(|mf| mf.name() == expected_metric_name) + .expect("histogram should be registered"); + assert_eq!( + histogram_family.get_metric()[0] + .get_histogram() + .get_sample_count(), + 1 + ); + } + + #[test] + fn test_ghost_event_handling() { + use crate::preprocessor::LLMMetricAnnotation; + use crate::types::Annotated; + + let metrics = Arc::new(Metrics::new()); + let registry = prometheus::Registry::new(); + metrics.register(®istry).unwrap(); + + let model = "test-model"; + let expected_metric_name = "dynamo_frontend_cached_sequence_length"; + let mut collector = metrics.clone().create_response_collector(model); + + // Create a service event (ghost event) with metrics annotation but no data/event + let mut annotated = Annotated::< + crate::protocols::openai::chat_completions::NvCreateChatCompletionStreamResponse, + > { + id: None, + data: None, + event: Some(crate::preprocessor::ANNOTATION_LLM_METRICS.to_string()), + comment: None, + }; + + // Add metrics annotation with cached_tokens + let llm_metrics = LLMMetricAnnotation { + input_tokens: 10, + output_tokens: 20, + chunk_tokens: 5, + cached_tokens: Some(15), + }; + + let annotation = llm_metrics.to_annotation::<()>().unwrap(); + annotated.event = annotation.event; + annotated.comment = annotation.comment; + + // Process the event + let mut http_queue_guard = None; + let result = process_response_using_event_converter_and_observe_metrics( + EventConverter::from(annotated), + &mut collector, + &mut http_queue_guard, + ); + + // Should return Ok(None) for service events + assert!(matches!(result, Ok(None))); + + // Should have observed the cached tokens from the ghost event + let metric_families = registry.gather(); + let histogram_family = metric_families + .iter() + .find(|mf| mf.name() == expected_metric_name) + .expect("histogram should be registered"); + assert_eq!( + histogram_family.get_metric()[0] + .get_histogram() + .get_sample_count(), + 1 + ); + } } diff --git a/lib/llm/src/http/service/openai.rs b/lib/llm/src/http/service/openai.rs index 4f65f16c10..d1baac0abd 100644 --- a/lib/llm/src/http/service/openai.rs +++ b/lib/llm/src/http/service/openai.rs @@ -408,14 +408,20 @@ async fn completions_single( if streaming { // For streaming, we'll drop the http_queue_guard on the first token let mut http_queue_guard = Some(http_queue_guard); - let stream = stream.map(move |response| { - // Calls observe_response() on each token - process_response_using_event_converter_and_observe_metrics( - EventConverter::from(response), - &mut response_collector, - &mut http_queue_guard, - ) - }); + let stream = stream + .map(move |response| { + // Calls observe_response() on each token + process_response_using_event_converter_and_observe_metrics( + EventConverter::from(response), + &mut response_collector, + &mut http_queue_guard, + ) + }) + .filter_map(|result| { + use futures::future; + // Transpose Result> -> Option> + future::ready(result.transpose()) + }); let stream = monitor_for_disconnects(stream, ctx, inflight_guard, stream_handle); let mut sse_stream = Sse::new(stream); @@ -564,14 +570,20 @@ async fn completions_batch( if streaming { // For streaming, we'll drop the http_queue_guard on the first token let mut http_queue_guard = Some(http_queue_guard); - let stream = merged_stream.map(move |response| { - // Calls observe_response() on each token - process_response_using_event_converter_and_observe_metrics( - EventConverter::from(response), - &mut response_collector, - &mut http_queue_guard, - ) - }); + let stream = merged_stream + .map(move |response| { + // Calls observe_response() on each token + process_response_using_event_converter_and_observe_metrics( + EventConverter::from(response), + &mut response_collector, + &mut http_queue_guard, + ) + }) + .filter_map(|result| { + use futures::future; + // Transpose Result> -> Option> + future::ready(result.transpose()) + }); let stream = monitor_for_disconnects(stream, ctx, inflight_guard, stream_handle); let mut sse_stream = Sse::new(stream); @@ -825,14 +837,20 @@ async fn chat_completions( stream_handle.arm(); // allows the system to detect client disconnects and cancel the LLM generation let mut http_queue_guard = Some(http_queue_guard); - let stream = stream.map(move |response| { - // Calls observe_response() on each token - process_response_using_event_converter_and_observe_metrics( - EventConverter::from(response), - &mut response_collector, - &mut http_queue_guard, - ) - }); + let stream = stream + .map(move |response| { + // Calls observe_response() on each token + process_response_using_event_converter_and_observe_metrics( + EventConverter::from(response), + &mut response_collector, + &mut http_queue_guard, + ) + }) + .filter_map(|result| { + use futures::future; + // Transpose Result> -> Option> + future::ready(result.transpose()) + }); let stream = monitor_for_disconnects(stream, ctx, inflight_guard, stream_handle); let mut sse_stream = Sse::new(stream); diff --git a/lib/llm/src/preprocessor.rs b/lib/llm/src/preprocessor.rs index ac9244a58b..492c143c95 100644 --- a/lib/llm/src/preprocessor.rs +++ b/lib/llm/src/preprocessor.rs @@ -70,6 +70,7 @@ pub struct LLMMetricAnnotation { pub input_tokens: usize, pub output_tokens: usize, pub chunk_tokens: usize, + pub cached_tokens: Option, } impl LLMMetricAnnotation { @@ -625,6 +626,7 @@ impl OpenAIPreprocessor { input_tokens: isl, output_tokens: current_osl, chunk_tokens, + cached_tokens: None, }; if let Ok(metrics_annotated) = llm_metrics.to_annotation::<()>() { @@ -652,20 +654,39 @@ impl OpenAIPreprocessor { // again. The stream is exhausted and will panic if polled after None. inner.finished = true; - // Check if we need to send a usage chunk - if inner.response_generator.is_usage_enabled() - && inner.finish_reason_sent - && !inner.usage_chunk_sent - { + if inner.finish_reason_sent && !inner.usage_chunk_sent { inner.usage_chunk_sent = true; - // Create the final usage chunk let usage_chunk = inner.response_generator.create_usage_chunk(); + let usage = inner.response_generator.get_usage(); + let llm_metrics = LLMMetricAnnotation { + input_tokens: usage.prompt_tokens as usize, + output_tokens: usage.completion_tokens as usize, + chunk_tokens: 0, + cached_tokens: usage + .prompt_tokens_details + .as_ref() + .and_then(|d| d.cached_tokens.map(|c| c as usize)), + }; + + // Create annotation string + let annotation = llm_metrics + .to_annotation::<()>() + .map_err(|e| format!("Failed to serialize metrics: {}", e)) + .unwrap_or_else(|_| Annotated::<()>::from_data(())); + + // Send the usage chunk if needed + let data = if inner.response_generator.is_usage_enabled() { + Some(usage_chunk) + } else { + None + }; + let annotated_usage = Annotated:: { id: None, - data: Some(usage_chunk), - event: None, - comment: None, + data, + event: Some(ANNOTATION_LLM_METRICS.to_string()), + comment: annotation.comment, }; tracing::trace!( diff --git a/lib/llm/src/protocols/openai.rs b/lib/llm/src/protocols/openai.rs index 574d7db31c..50f964c497 100644 --- a/lib/llm/src/protocols/openai.rs +++ b/lib/llm/src/protocols/openai.rs @@ -224,6 +224,9 @@ pub trait DeltaGeneratorExt: /// Check if usage tracking is enabled. fn is_usage_enabled(&self) -> bool; + + /// Get the current usage statistics with properly calculated total_tokens. + fn get_usage(&self) -> dynamo_async_openai::types::CompletionUsage; } #[derive(Clone, Debug, Serialize, Deserialize, Default)] diff --git a/lib/llm/src/protocols/openai/chat_completions/delta.rs b/lib/llm/src/protocols/openai/chat_completions/delta.rs index 186bb7f095..6292342922 100644 --- a/lib/llm/src/protocols/openai/chat_completions/delta.rs +++ b/lib/llm/src/protocols/openai/chat_completions/delta.rs @@ -268,8 +268,7 @@ impl DeltaGenerator { /// # Returns /// * A [`CreateChatCompletionStreamResponse`] with empty choices and usage stats. pub fn create_usage_chunk(&self) -> NvCreateChatCompletionStreamResponse { - let mut usage = self.usage.clone(); - usage.total_tokens = usage.prompt_tokens.saturating_add(usage.completion_tokens); + let usage = self.get_usage(); dynamo_async_openai::types::CreateChatCompletionStreamResponse { id: self.id.clone(), @@ -288,6 +287,12 @@ impl DeltaGenerator { pub fn is_usage_enabled(&self) -> bool { self.options.enable_usage } + + pub fn get_usage(&self) -> dynamo_async_openai::types::CompletionUsage { + let mut usage = self.usage.clone(); + usage.total_tokens = usage.prompt_tokens.saturating_add(usage.completion_tokens); + usage + } } /// Implements the [`crate::protocols::openai::DeltaGeneratorExt`] trait for [`DeltaGenerator`], allowing @@ -307,27 +312,25 @@ impl crate::protocols::openai::DeltaGeneratorExt anyhow::Result { - // Aggregate token usage if enabled. - if self.options.enable_usage { - // SAFETY: Casting from `usize` to `u32` could lead to precision loss after `u32::MAX`, - // but this will not be an issue until context lengths exceed 4_294_967_295. - let token_length: u32 = delta - .token_ids - .len() - .try_into() - .expect("token_ids length exceeds u32::MAX"); - - self.usage.completion_tokens += token_length; - - // If backend provides completion_usage with prompt token details, - // propagate the entire details struct to usage tracking - if let Some(prompt_details) = delta - .completion_usage - .as_ref() - .and_then(|usage| usage.prompt_tokens_details.as_ref()) - { - self.usage.prompt_tokens_details = Some(prompt_details.clone()); - } + // Aggregate token usage even if usage tracking is disabled for metrics tracking + // SAFETY: Casting from `usize` to `u32` could lead to precision loss after `u32::MAX`, + // but this will not be an issue until context lengths exceed 4_294_967_295. + let token_length: u32 = delta + .token_ids + .len() + .try_into() + .expect("token_ids length exceeds u32::MAX"); + + self.usage.completion_tokens += token_length; + + // If backend provides completion_usage with prompt token details, + // propagate the entire details struct to usage tracking + if let Some(prompt_details) = delta + .completion_usage + .as_ref() + .and_then(|usage| usage.prompt_tokens_details.as_ref()) + { + self.usage.prompt_tokens_details = Some(prompt_details.clone()); } let logprobs = self.create_logprobs( @@ -410,6 +413,10 @@ impl crate::protocols::openai::DeltaGeneratorExt bool { DeltaGenerator::is_usage_enabled(self) } + + fn get_usage(&self) -> dynamo_async_openai::types::CompletionUsage { + DeltaGenerator::get_usage(self) + } } #[cfg(test)] diff --git a/lib/llm/src/protocols/openai/completions/delta.rs b/lib/llm/src/protocols/openai/completions/delta.rs index 3b27ffebdb..0a4bc28b09 100644 --- a/lib/llm/src/protocols/openai/completions/delta.rs +++ b/lib/llm/src/protocols/openai/completions/delta.rs @@ -201,8 +201,7 @@ impl DeltaGenerator { /// # Returns /// * A [`NvCreateCompletionResponse`] with empty choices and usage stats. pub fn create_usage_chunk(&self) -> NvCreateCompletionResponse { - let mut usage = self.usage.clone(); - usage.total_tokens = usage.prompt_tokens.saturating_add(usage.completion_tokens); + let usage = self.get_usage(); let inner = dynamo_async_openai::types::CreateCompletionResponse { id: self.id.clone(), @@ -222,6 +221,12 @@ impl DeltaGenerator { pub fn is_usage_enabled(&self) -> bool { self.options.enable_usage } + + pub fn get_usage(&self) -> dynamo_async_openai::types::CompletionUsage { + let mut usage = self.usage.clone(); + usage.total_tokens = usage.prompt_tokens.saturating_add(usage.completion_tokens); + usage + } } impl crate::protocols::openai::DeltaGeneratorExt for DeltaGenerator { @@ -229,27 +234,25 @@ impl crate::protocols::openai::DeltaGeneratorExt for &mut self, delta: common::llm_backend::BackendOutput, ) -> anyhow::Result { - // aggregate usage - if self.options.enable_usage { - // SAFETY: Casting from `usize` to `u32` could lead to precision loss after `u32::MAX`, - // but this will not be an issue until context lengths exceed 4_294_967_295. - let token_length: u32 = delta - .token_ids - .len() - .try_into() - .expect("token_ids length exceeds u32::MAX"); - - self.usage.completion_tokens += token_length; - - // If backend provides completion_usage with prompt token details, - // propagate the entire details struct to usage tracking - if let Some(prompt_details) = delta - .completion_usage - .as_ref() - .and_then(|usage| usage.prompt_tokens_details.as_ref()) - { - self.usage.prompt_tokens_details = Some(prompt_details.clone()); - } + // Aggregate token usage even if usage tracking is disabled for metrics tracking + // SAFETY: Casting from `usize` to `u32` could lead to precision loss after `u32::MAX`, + // but this will not be an issue until context lengths exceed 4_294_967_295. + let token_length: u32 = delta + .token_ids + .len() + .try_into() + .expect("token_ids length exceeds u32::MAX"); + + self.usage.completion_tokens += token_length; + + // If backend provides completion_usage with prompt token details, + // propagate the entire details struct to usage tracking + if let Some(prompt_details) = delta + .completion_usage + .as_ref() + .and_then(|usage| usage.prompt_tokens_details.as_ref()) + { + self.usage.prompt_tokens_details = Some(prompt_details.clone()); } let logprobs = self.create_logprobs( @@ -313,4 +316,8 @@ impl crate::protocols::openai::DeltaGeneratorExt for fn is_usage_enabled(&self) -> bool { DeltaGenerator::is_usage_enabled(self) } + + fn get_usage(&self) -> dynamo_async_openai::types::CompletionUsage { + DeltaGenerator::get_usage(self) + } } diff --git a/lib/llm/tests/test_streaming_usage.rs b/lib/llm/tests/test_streaming_usage.rs index 068b85bab7..b50a152263 100644 --- a/lib/llm/tests/test_streaming_usage.rs +++ b/lib/llm/tests/test_streaming_usage.rs @@ -208,11 +208,29 @@ async fn test_streaming_without_usage() { // Collect all chunks let chunks: Vec<_> = transformed_stream.collect().await; - // Verify we got exactly 3 chunks (no extra usage chunk) - assert_eq!(chunks.len(), 3, "Should have exactly 3 content chunks"); + // Filter out ghost events (service events with metrics that get filtered in HTTP layer) + let content_chunks: Vec<_> = chunks + .into_iter() + .filter(|chunk| { + // Ghost events have event=Some(ANNOTATION_LLM_METRICS) and data=None + !(chunk + .event + .as_ref() + .map(|e| e == "llm_metrics") + .unwrap_or(false) + && chunk.data.is_none()) + }) + .collect(); + + // Verify we got exactly 3 content chunks (no extra usage chunk) + assert_eq!( + content_chunks.len(), + 3, + "Should have exactly 3 content chunks" + ); // Verify all chunks have usage: None - for (i, chunk) in chunks.iter().enumerate() { + for (i, chunk) in content_chunks.iter().enumerate() { if let Some(response) = &chunk.data { assert!( response.usage.is_none(), @@ -322,15 +340,29 @@ async fn test_streaming_with_usage_false() { // Collect all chunks let chunks: Vec<_> = transformed_stream.collect().await; + // Filter out ghost events (service events with metrics that get filtered in HTTP layer) + let content_chunks: Vec<_> = chunks + .into_iter() + .filter(|chunk| { + // Ghost events have event=Some(ANNOTATION_LLM_METRICS) and data=None + !(chunk + .event + .as_ref() + .map(|e| e == "llm_metrics") + .unwrap_or(false) + && chunk.data.is_none()) + }) + .collect(); + // Verify we got exactly 3 chunks (no extra usage chunk when explicitly false) assert_eq!( - chunks.len(), + content_chunks.len(), 3, "Should have exactly 3 content chunks when include_usage is false" ); // Verify all chunks have usage: None - for (i, chunk) in chunks.iter().enumerate() { + for (i, chunk) in content_chunks.iter().enumerate() { if let Some(response) = &chunk.data { assert!( response.usage.is_none(), diff --git a/lib/runtime/src/metrics/prometheus_names.rs b/lib/runtime/src/metrics/prometheus_names.rs index 91153a134d..7e4c622357 100644 --- a/lib/runtime/src/metrics/prometheus_names.rs +++ b/lib/runtime/src/metrics/prometheus_names.rs @@ -113,6 +113,9 @@ pub mod frontend_service { /// Output sequence length in tokens pub const OUTPUT_SEQUENCE_TOKENS: &str = "output_sequence_tokens"; + /// Number of cached tokens (prefix cache hits) per request + pub const CACHED_SEQUENCE_LENGTH: &str = "cached_sequence_length"; + /// Total number of output tokens generated (counter that updates in real-time) pub const OUTPUT_TOKENS_TOTAL: &str = "output_tokens_total";