From 66c34bd60a42fcdd95d0551260a46256786e569b Mon Sep 17 00:00:00 2001 From: Grzegorz Chlebus Date: Fri, 27 Feb 2026 16:05:21 +0100 Subject: [PATCH 1/2] fix: eliminate floating-point error in response stats averages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace running average computation (which applied round(..., 2) at each step, compounding error over thousands of requests) with exact sum-based tracking. Averages are now computed as sum/count at save time. Changes: - Track exact sums (sum_prompt_tokens, sum_completion_tokens, etc.) internally alongside running averages - Compute avg_* from sum/count instead of incremental running average - Round avg_* to 2dp only at file-save time (not during accumulation) - Add total_completion_tokens, total_prompt_tokens, total_total_tokens to eval_factory_metrics.json output for exact totals - Backward compatible: loads old cached stats by back-computing sums from avg * successful_count Impact: For large evaluations (>10K requests), the old running average could drift by 60+ tokens/request from the true mean. For example, on a 12K-request MMLU-Pro evaluation, avg_completion_tokens was 7776.98 vs the exact value of 7711.3 — a 0.85% error that compounds with request count. The new total_* fields enable consumers to compute exact averages without relying on the rounded avg_* values. Signed-off-by: Grzegorz Chlebus --- .../response_stats_interceptor.py | 69 +++++++++-- .../test_response_stats_interceptor.py | 110 ++++++++++++++++++ 2 files changed, 167 insertions(+), 12 deletions(-) diff --git a/packages/nemo-evaluator/src/nemo_evaluator/adapters/interceptors/response_stats_interceptor.py b/packages/nemo-evaluator/src/nemo_evaluator/adapters/interceptors/response_stats_interceptor.py index a1ca2c0c6..aa75a9046 100644 --- a/packages/nemo-evaluator/src/nemo_evaluator/adapters/interceptors/response_stats_interceptor.py +++ b/packages/nemo-evaluator/src/nemo_evaluator/adapters/interceptors/response_stats_interceptor.py @@ -105,11 +105,16 @@ def __init__(self, params: Params): self._lock = threading.Lock() self._adapter_start_time = time.time() # Record adapter initialization time self._stats = { - # Average statistics + # Average statistics (computed from sums at save time) "avg_prompt_tokens": None, "avg_total_tokens": None, "avg_completion_tokens": None, "avg_latency_ms": None, + # Exact sum statistics (used to compute precise averages) + "sum_prompt_tokens": 0, + "sum_total_tokens": 0, + "sum_completion_tokens": 0, + "sum_latency_ms": 0.0, # Maximum statistics "max_prompt_tokens": None, "max_total_tokens": None, @@ -209,6 +214,30 @@ def _load_aggregated_cached_stats(self) -> None: status_codes[key] = value aggregated_stats["status_codes"] = status_codes + # Backward compatibility: if cached stats lack sum_* fields (from + # older versions that used running averages), back-compute sums + # from avg * successful_count. + successful_count = aggregated_stats.get("successful_count", 0) + for token_type in ["prompt_tokens", "total_tokens", "completion_tokens"]: + sum_key = f"sum_{token_type}" + avg_key = f"avg_{token_type}" + total_key = f"total_{token_type}" + if sum_key not in aggregated_stats: + # Try total_* first (from new save format), then back-compute from avg + if total_key in aggregated_stats: + aggregated_stats[sum_key] = aggregated_stats.pop(total_key) + elif aggregated_stats.get(avg_key) is not None and successful_count > 0: + aggregated_stats[sum_key] = aggregated_stats[avg_key] * successful_count + else: + aggregated_stats[sum_key] = 0 + + if "sum_latency_ms" not in aggregated_stats: + avg_latency = aggregated_stats.get("avg_latency_ms") + if avg_latency is not None and successful_count > 0: + aggregated_stats["sum_latency_ms"] = avg_latency * successful_count + else: + aggregated_stats["sum_latency_ms"] = 0.0 + # Set current stats to cached data (cached stats already contain accumulated data) self._stats = aggregated_stats # Note: run_id increment is handled in _save_run_ids_info() @@ -256,7 +285,13 @@ def _update_basic_stats(self, resp: AdapterResponse, current_time: float) -> Non self._stats["inference_time"] += delta def _update_running_stats(self, stat_name: str, value: float) -> None: - """Update running average and max for a given statistic.""" + """Update sum, average, and max for a given statistic. + + Tracks exact sums to avoid floating-point error accumulation from + running averages. The average is recomputed from sum / count at each + step for live monitoring, and rounded to 2 decimal places only at + file-save time. + """ # Skip if value is not a valid number if not isinstance(value, (int, float)): self.logger.warning( @@ -264,18 +299,17 @@ def _update_running_stats(self, stat_name: str, value: float) -> None: ) return - # Calculate running average using current successful count + sum_key = f"sum_{stat_name}" avg_key = f"avg_{stat_name}" - if self._stats[avg_key] is None: - self._stats[avg_key] = value - else: - self._stats[avg_key] = round( - (self._stats[avg_key] * self._stats["successful_count"] + value) - / (self._stats["successful_count"] + 1), - 2, - ) - # Update max valuename + # Accumulate exact sum + self._stats[sum_key] += value + + # Compute average from exact sum (successful_count not yet incremented) + new_count = self._stats["successful_count"] + 1 + self._stats[avg_key] = self._stats[sum_key] / new_count + + # Update max value max_key = f"max_{stat_name}" if self._stats[max_key] is None or value > self._stats[max_key]: self._stats[max_key] = value @@ -478,6 +512,17 @@ def _save_stats_to_file(self, context: AdapterGlobalContext) -> None: self.logger.debug("No response statistics collected, skipping file write") return + # Round averages to 2 decimal places for display + for avg_key in ["avg_prompt_tokens", "avg_total_tokens", "avg_completion_tokens", "avg_latency_ms"]: + if stats[avg_key] is not None: + stats[avg_key] = round(stats[avg_key], 2) + + # Expose exact totals as total_* fields and remove internal sum_* fields + for token_type in ["prompt_tokens", "total_tokens", "completion_tokens"]: + stats[f"total_{token_type}"] = stats.pop(f"sum_{token_type}", 0) + # Remove internal sum_latency_ms (not useful as an output field) + stats.pop("sum_latency_ms", None) + # Convert timestamps to readable dates in inference_run_times and add time_to_first_request if "inference_run_times" in stats: for run_id, run_data in stats["inference_run_times"].items(): diff --git a/packages/nemo-evaluator/tests/unit_tests/adapters/interceptors/test_response_stats_interceptor.py b/packages/nemo-evaluator/tests/unit_tests/adapters/interceptors/test_response_stats_interceptor.py index e3350ea10..c7701cde3 100644 --- a/packages/nemo-evaluator/tests/unit_tests/adapters/interceptors/test_response_stats_interceptor.py +++ b/packages/nemo-evaluator/tests/unit_tests/adapters/interceptors/test_response_stats_interceptor.py @@ -102,6 +102,7 @@ def test_initialization(self, params, expected): # Check stats structure assert "count" in interceptor._stats assert "avg_prompt_tokens" in interceptor._stats + assert "sum_prompt_tokens" in interceptor._stats assert "max_prompt_tokens" in interceptor._stats def test_add_basic_response_stats(self, interceptor, context): @@ -502,6 +503,115 @@ def test_cached_response_stats_behavior( assert interceptor._stats["successful_count"] == expected_successful_responses + def test_average_precision_with_many_requests(self, tmp_path): + """Test that averages remain precise over many requests. + + Previously, running averages with round(..., 2) at each step accumulated + floating-point error. With sum-based computation, the average should be + exact (within floating-point precision of a single division). + """ + cache_dir = tmp_path / "test_cache_precision" + interceptor = ResponseStatsInterceptor( + ResponseStatsInterceptor.Params( + save_individuals=False, cache_dir=str(cache_dir) + ) + ) + context = AdapterGlobalContext( + output_dir=str(tmp_path), url="http://test.api.com/v1/chat/completions" + ) + + # Simulate 10,000 requests with varying token counts + total_completion = 0 + num_requests = 10000 + for i in range(num_requests): + completion_tokens = 100 + (i % 997) # Varying values + total_completion += completion_tokens + + mock_resp = Mock(spec=requests.Response) + mock_resp.status_code = 200 + mock_resp.headers = {} + mock_resp.json.return_value = { + "usage": { + "prompt_tokens": 50, + "total_tokens": 50 + completion_tokens, + "completion_tokens": completion_tokens, + }, + "choices": [{"finish_reason": "stop", "message": {}}], + } + adapter_response = AdapterResponse( + r=mock_resp, + rctx=AdapterRequestContext(request_id=f"req_{i}"), + latency_ms=10.0, + ) + interceptor.intercept_response(adapter_response, context) + + # Verify exact sum + assert interceptor._stats["sum_completion_tokens"] == total_completion + + # Verify average is precise (should match exact computation) + expected_avg = total_completion / num_requests + assert abs(interceptor._stats["avg_completion_tokens"] - expected_avg) < 1e-10 + + # Verify count + assert interceptor._stats["successful_count"] == num_requests + + def test_total_fields_in_saved_file(self, tmp_path): + """Test that saved metrics include exact total_* fields.""" + cache_dir = tmp_path / "test_cache_totals" + interceptor = ResponseStatsInterceptor( + ResponseStatsInterceptor.Params( + save_individuals=False, cache_dir=str(cache_dir) + ) + ) + context = AdapterGlobalContext( + output_dir=str(tmp_path), url="http://test.api.com/v1/chat/completions" + ) + + # Process a few requests + token_values = [100, 200, 333] + for i, ct in enumerate(token_values): + mock_resp = Mock(spec=requests.Response) + mock_resp.status_code = 200 + mock_resp.headers = {} + mock_resp.json.return_value = { + "usage": { + "prompt_tokens": 50, + "total_tokens": 50 + ct, + "completion_tokens": ct, + }, + "choices": [{"finish_reason": "stop", "message": {}}], + } + adapter_response = AdapterResponse( + r=mock_resp, + rctx=AdapterRequestContext(request_id=f"req_{i}"), + latency_ms=10.0, + ) + interceptor.intercept_response(adapter_response, context) + + # Save to file + interceptor.post_eval_hook(context) + + # Read and verify + metrics_path = Path(tmp_path) / "eval_factory_metrics.json" + with open(metrics_path) as f: + metrics = json.load(f) + + rs = metrics["response_stats"] + + # Exact totals should be present + assert rs["total_completion_tokens"] == sum(token_values) + assert rs["total_prompt_tokens"] == 50 * len(token_values) + + # Averages should be rounded to 2dp + expected_avg_ct = round(sum(token_values) / len(token_values), 2) + assert rs["avg_completion_tokens"] == expected_avg_ct + + # Internal sum_* fields should NOT be in the output + assert "sum_completion_tokens" not in rs + assert "sum_prompt_tokens" not in rs + assert "sum_latency_ms" not in rs + + class TestResponseStatsInterceptorCache: """Test ResponseStatsInterceptor caching and aggregation functionality.""" From 425aa1e71d24ad8d5621244ee11e229f87e62f1a Mon Sep 17 00:00:00 2001 From: Grzegorz Chlebus Date: Fri, 27 Feb 2026 16:28:36 +0100 Subject: [PATCH 2/2] fix: deduplicate retried requests in response stats When chain jobs requeue (e.g., due to Slurm TIMEOUT), previously in-flight requests are re-sent. The response_stats interceptor now detects these retries via cache_key (SHA256 of request content) and excludes them from token stats to avoid inflating averages. Observed impact: NeMoTron MMLU-Pro had count=12639 vs expected 12032 (607 retries), inflating avg_completion_tokens from 7711.3 to 7776.98. Changes: - Track seen cache_keys in memory and persist across allocations - Retry requests still increment count (total API calls) but NOT successful_count or token sums - Add retry_count field to eval_factory_metrics.json output - Backward compatible: old cached stats without seen_cache_keys or retry_count are handled gracefully Tests: - test_retry_deduplication: verifies retries are detected and excluded - test_retry_deduplication_persists_across_reloads: verifies the seen_cache_keys set survives cache save/load (chain job scenario) Signed-off-by: Grzegorz Chlebus --- .../response_stats_interceptor.py | 94 +++++++++--- .../test_response_stats_interceptor.py | 135 +++++++++++++++++- 2 files changed, 208 insertions(+), 21 deletions(-) diff --git a/packages/nemo-evaluator/src/nemo_evaluator/adapters/interceptors/response_stats_interceptor.py b/packages/nemo-evaluator/src/nemo_evaluator/adapters/interceptors/response_stats_interceptor.py index aa75a9046..bf4edb784 100644 --- a/packages/nemo-evaluator/src/nemo_evaluator/adapters/interceptors/response_stats_interceptor.py +++ b/packages/nemo-evaluator/src/nemo_evaluator/adapters/interceptors/response_stats_interceptor.py @@ -128,6 +128,8 @@ def __init__(self, params: Params): "finish_reason": {}, "stop_reason": {}, "status_codes": {}, + # Retry deduplication + "retry_count": 0, # Time tracking "inference_time": 0.0, "run_id": 0, @@ -135,6 +137,12 @@ def __init__(self, params: Params): "inference_run_times": {}, # {run_id: {"start": time, "end": time, "inference_time": time}} } + # Set of cache_keys (request content hashes) already counted in stats. + # Used to detect retries across chain-job allocations: when a job + # requeues, previously in-flight requests are re-sent but should not + # be double-counted in aggregated statistics. + self._seen_cache_keys: set[str] = set() + # Always initialize cache database cache_path = Path(self.cache_dir) cache_path.mkdir(parents=True, exist_ok=True) @@ -226,8 +234,13 @@ def _load_aggregated_cached_stats(self) -> None: # Try total_* first (from new save format), then back-compute from avg if total_key in aggregated_stats: aggregated_stats[sum_key] = aggregated_stats.pop(total_key) - elif aggregated_stats.get(avg_key) is not None and successful_count > 0: - aggregated_stats[sum_key] = aggregated_stats[avg_key] * successful_count + elif ( + aggregated_stats.get(avg_key) is not None + and successful_count > 0 + ): + aggregated_stats[sum_key] = ( + aggregated_stats[avg_key] * successful_count + ) else: aggregated_stats[sum_key] = 0 @@ -238,10 +251,18 @@ def _load_aggregated_cached_stats(self) -> None: else: aggregated_stats["sum_latency_ms"] = 0.0 + # Backward compatibility: ensure retry_count exists + if "retry_count" not in aggregated_stats: + aggregated_stats["retry_count"] = 0 + # Set current stats to cached data (cached stats already contain accumulated data) self._stats = aggregated_stats # Note: run_id increment is handled in _save_run_ids_info() + # Restore seen cache keys for retry deduplication + if "seen_cache_keys" in interceptor_state: + self._seen_cache_keys = set(interceptor_state["seen_cache_keys"]) + self.logger.info( f"Loaded interceptor state with run_id {aggregated_stats.get('run_id', 0)}, count={aggregated_stats.get('count', 0)}" ) @@ -438,6 +459,14 @@ def intercept_response( return resp status_code = resp.r.status_code + # Detect retries: if we've already counted a successful response for + # the same request content (identified by cache_key), this is a retry + # from a chain-job requeue. We still track it in count/status_codes + # (total API calls) but skip updating token stats and successful_count + # to avoid inflating averages. + cache_key = getattr(resp.rctx, "cache_key", None) + is_retry = cache_key is not None and cache_key in self._seen_cache_keys + # Update time tracking with current timestamp current_time = time.time() self._update_time_tracking(current_time) @@ -448,29 +477,46 @@ def intercept_response( # Always add basic response stats (count, status_code) self._add_basic_response_stats(resp, context) + if is_retry: + with self._lock: + self._stats["retry_count"] += 1 + self.logger.debug( + "Detected retry request, skipping token stats update", + request_id=resp.rctx.request_id, + cache_key=cache_key[:8] + "..." if cache_key else None, + ) # Extract detailed stats once and reuse them detailed_stats = None - try: - # Try to parse response as JSON - response_data = resp.r.json() + if not is_retry: + try: + # Try to parse response as JSON + response_data = resp.r.json() + + if status_code == 200: + detailed_stats = self._extract_detailed_response_stats( + response_data + ) - if status_code == 200: - detailed_stats = self._extract_detailed_response_stats(response_data) + # Add detailed stats for aggregation + self._update_response_stats(detailed_stats) - # Add detailed stats for aggregation - self._update_response_stats(detailed_stats) + # Mark this cache_key as seen + if cache_key is not None: + self._seen_cache_keys.add(cache_key) - self.logger.debug( - "Collected detailed response stats", - request_id=resp.rctx.request_id, - response_count=self._stats["count"], - status_code=status_code, - ) + self.logger.debug( + "Collected detailed response stats", + request_id=resp.rctx.request_id, + response_count=self._stats["count"], + status_code=status_code, + ) - except (json.JSONDecodeError, Exception) as e: - # Handle both JSON parsing errors and other exceptions - # In case of any error, only basic stats are collected - self.logger.warning(f"Error parsing response body for token counting: {e}") + except (json.JSONDecodeError, Exception) as e: + # Handle both JSON parsing errors and other exceptions + # In case of any error, only basic stats are collected + self.logger.warning( + f"Error parsing response body for token counting: {e}" + ) # Save stats to file if interval reached if ( @@ -513,7 +559,12 @@ def _save_stats_to_file(self, context: AdapterGlobalContext) -> None: return # Round averages to 2 decimal places for display - for avg_key in ["avg_prompt_tokens", "avg_total_tokens", "avg_completion_tokens", "avg_latency_ms"]: + for avg_key in [ + "avg_prompt_tokens", + "avg_total_tokens", + "avg_completion_tokens", + "avg_latency_ms", + ]: if stats[avg_key] is not None: stats[avg_key] = round(stats[avg_key], 2) @@ -636,6 +687,9 @@ def _save_aggregated_stats_to_cache(self) -> None: # Update aggregated stats in interceptor state interceptor_state["aggregated_stats"] = stats_to_cache + # Persist seen cache keys for retry deduplication across allocations + interceptor_state["seen_cache_keys"] = list(self._seen_cache_keys) + # Save updated interceptor state self._save_interceptor_state(interceptor_state) diff --git a/packages/nemo-evaluator/tests/unit_tests/adapters/interceptors/test_response_stats_interceptor.py b/packages/nemo-evaluator/tests/unit_tests/adapters/interceptors/test_response_stats_interceptor.py index c7701cde3..a09e76d21 100644 --- a/packages/nemo-evaluator/tests/unit_tests/adapters/interceptors/test_response_stats_interceptor.py +++ b/packages/nemo-evaluator/tests/unit_tests/adapters/interceptors/test_response_stats_interceptor.py @@ -502,7 +502,6 @@ def test_cached_response_stats_behavior( assert interceptor._stats["count"] == expected_total_responses assert interceptor._stats["successful_count"] == expected_successful_responses - def test_average_precision_with_many_requests(self, tmp_path): """Test that averages remain precise over many requests. @@ -555,6 +554,140 @@ def test_average_precision_with_many_requests(self, tmp_path): # Verify count assert interceptor._stats["successful_count"] == num_requests + def test_retry_deduplication(self, tmp_path): + """Test that retried requests (same cache_key) are not double-counted. + + When chain jobs requeue, previously in-flight requests are re-sent. + The response_stats interceptor should detect these retries via cache_key + and exclude them from token stats to avoid inflating averages. + """ + cache_dir = tmp_path / "test_cache_retry" + interceptor = ResponseStatsInterceptor( + ResponseStatsInterceptor.Params( + save_individuals=False, cache_dir=str(cache_dir) + ) + ) + context = AdapterGlobalContext( + output_dir=str(tmp_path), url="http://test.api.com/v1/chat/completions" + ) + + def make_response(req_id, cache_key, completion_tokens): + mock_resp = Mock(spec=requests.Response) + mock_resp.status_code = 200 + mock_resp.headers = {} + mock_resp.json.return_value = { + "usage": { + "prompt_tokens": 50, + "total_tokens": 50 + completion_tokens, + "completion_tokens": completion_tokens, + }, + "choices": [{"finish_reason": "stop", "message": {}}], + } + rctx = AdapterRequestContext(request_id=req_id) + rctx.cache_key = cache_key + return AdapterResponse(r=mock_resp, rctx=rctx, latency_ms=10.0) + + # Simulate allocation 1: process 3 unique requests + interceptor.intercept_response(make_response("req_1", "hash_aaa", 100), context) + interceptor.intercept_response(make_response("req_2", "hash_bbb", 200), context) + interceptor.intercept_response(make_response("req_3", "hash_ccc", 300), context) + + assert interceptor._stats["successful_count"] == 3 + assert interceptor._stats["sum_completion_tokens"] == 600 + assert interceptor._stats["retry_count"] == 0 + assert interceptor._stats["count"] == 3 + + # Simulate chain job requeue: re-send req_2 and req_3 (same cache_keys) + interceptor.intercept_response( + make_response("req_2_retry", "hash_bbb", 210), context + ) + interceptor.intercept_response( + make_response("req_3_retry", "hash_ccc", 310), context + ) + + # Retries should NOT affect token stats + assert interceptor._stats["successful_count"] == 3 # unchanged + assert interceptor._stats["sum_completion_tokens"] == 600 # unchanged + assert interceptor._stats["retry_count"] == 2 # 2 retries detected + assert interceptor._stats["count"] == 5 # total API calls still tracked + + # New unique request should still be counted + interceptor.intercept_response(make_response("req_4", "hash_ddd", 400), context) + assert interceptor._stats["successful_count"] == 4 + assert interceptor._stats["sum_completion_tokens"] == 1000 + assert interceptor._stats["retry_count"] == 2 + assert interceptor._stats["count"] == 6 + + def test_retry_deduplication_persists_across_reloads(self, tmp_path): + """Test that seen_cache_keys survive cache save/load cycle. + + This simulates the real chain-job scenario: allocation 1 processes + requests and saves state, allocation 2 loads state and should still + detect retries of requests from allocation 1. + """ + cache_dir = tmp_path / "test_cache_retry_persist" + context = AdapterGlobalContext( + output_dir=str(tmp_path), url="http://test.api.com/v1/chat/completions" + ) + + def make_response(req_id, cache_key, completion_tokens): + mock_resp = Mock(spec=requests.Response) + mock_resp.status_code = 200 + mock_resp.headers = {} + mock_resp.json.return_value = { + "usage": { + "prompt_tokens": 50, + "total_tokens": 50 + completion_tokens, + "completion_tokens": completion_tokens, + }, + "choices": [{"finish_reason": "stop", "message": {}}], + } + rctx = AdapterRequestContext(request_id=req_id) + rctx.cache_key = cache_key + return AdapterResponse(r=mock_resp, rctx=rctx, latency_ms=10.0) + + # Allocation 1 + interceptor1 = ResponseStatsInterceptor( + ResponseStatsInterceptor.Params( + save_individuals=False, cache_dir=str(cache_dir) + ) + ) + interceptor1.intercept_response( + make_response("req_1", "hash_aaa", 100), context + ) + interceptor1.intercept_response( + make_response("req_2", "hash_bbb", 200), context + ) + # Force save to cache + interceptor1._save_aggregated_stats_to_cache() + + # Allocation 2 — new interceptor instance, loads from cache + interceptor2 = ResponseStatsInterceptor( + ResponseStatsInterceptor.Params( + save_individuals=False, cache_dir=str(cache_dir) + ) + ) + + # Verify state was restored + assert interceptor2._stats["successful_count"] == 2 + assert interceptor2._stats["sum_completion_tokens"] == 300 + assert "hash_aaa" in interceptor2._seen_cache_keys + assert "hash_bbb" in interceptor2._seen_cache_keys + + # Retry of req_2 from allocation 1 — should be detected + interceptor2.intercept_response( + make_response("req_2_retry", "hash_bbb", 200), context + ) + assert interceptor2._stats["successful_count"] == 2 # unchanged + assert interceptor2._stats["retry_count"] == 1 + + # New unique request — should be counted + interceptor2.intercept_response( + make_response("req_3", "hash_ccc", 300), context + ) + assert interceptor2._stats["successful_count"] == 3 + assert interceptor2._stats["sum_completion_tokens"] == 600 + def test_total_fields_in_saved_file(self, tmp_path): """Test that saved metrics include exact total_* fields.""" cache_dir = tmp_path / "test_cache_totals"