diff --git a/.pipelines/Modelkit E2E Test.yml b/.pipelines/Modelkit E2E Test.yml index ea7ba9512..a42a84e0f 100644 --- a/.pipelines/Modelkit E2E Test.yml +++ b/.pipelines/Modelkit E2E Test.yml @@ -1,5 +1,15 @@ trigger: none +parameters: + - name: evalDate + displayName: 'Eval date (leave empty for today, e.g. 2026-04-01)' + type: string + default: '' + - name: continueRun + displayName: 'Skip already-evaluated models (--continue)' + type: boolean + default: true + variables: evalOutputBase: 'c:/eval_results' @@ -43,17 +53,24 @@ jobs: displayName: 'Install dependencies' - powershell: | - $evalDate = Get-Date -Format 'yyyy-MM-dd' + $evalDate = '${{ parameters.evalDate }}' + if (-not $evalDate) { $evalDate = Get-Date -Format 'yyyy-MM-dd' } $dir = "$(evalOutputBase)/$evalDate" Write-Host "##vso[task.setvariable variable=EVAL_DIR;isOutput=true]$dir" Write-Host "Eval output directory: $dir" name: set_output_dir displayName: 'Set eval output directory' - - script: > - uv run python scripts/e2e_eval/run_eval.py - --list-json temp/model_list.json - --device npu + - powershell: | + $args = @( + "run", "python", "scripts/e2e_eval/run_eval.py", + "--list-json", "temp/model_list.json", + "--device", "npu" + ) + if ('${{ parameters.continueRun }}' -eq 'True') { + $args += @("--continue", "--output-dir", "$(set_output_dir.EVAL_DIR)") + } + & uv @args workingDirectory: $(Build.SourcesDirectory) displayName: 'Generate model list' @@ -61,13 +78,22 @@ jobs: $models = Get-Content "$(Build.SourcesDirectory)/temp/model_list.json" | ConvertFrom-Json $total = $models.Count if ($total -eq 0) { - throw "No models found in temp/model_list.json" + Write-Host "All models already evaluated — nothing to run" + Write-Host "##vso[task.setvariable variable=modelMatrix;isOutput=true]{}" + Write-Host "##vso[task.setvariable variable=skipEval;isOutput=true]true" + return } $matrix = @{} for ($i = 0; $i -lt $total; $i++) { $m = $models[$i] - $key = ($m.hf_id -replace '[^A-Za-z0-9]', '_') + $slug = (($m.hf_id + '_' + $m.task) -replace '[^A-Za-z0-9]', '_') + $key = $slug + $suffix = 2 + while ($matrix.ContainsKey($key)) { + $key = "${slug}_${suffix}" + $suffix++ + } $matrix[$key] = @{ hf_id = [string]$m.hf_id hf_task = [string]$m.task @@ -84,9 +110,11 @@ jobs: displayName: 'Create matrix variables' - job: EvalModel - displayName: 'Run Model Eval' + displayName: 'Eval' dependsOn: Prepare - timeoutInMinutes: 2400 + condition: and(succeeded(), ne(dependencies.Prepare.outputs['set_matrix.skipEval'], 'true')) + timeoutInMinutes: 90 + cancelTimeoutInMinutes: 2 pool: name: modelkit-selfhost-pool demands: @@ -121,16 +149,18 @@ jobs: "--verbose", "--timeout", "1800", "--no-report", - "--clean-hf-cache" + "--clean-cache" ) if ("$(hf_task)") { $uvArgs += @("--task", "$(hf_task)") } & uv @uvArgs - if ($LASTEXITCODE -ne 0) { - throw "Model evaluation failed for $(hf_id) / $(hf_task)" + $evalExit = $LASTEXITCODE + if ($evalExit -ne 0) { + Write-Warning "Model eval exited with code $evalExit for $(hf_id) / $(hf_task) (model failure — non-blocking)" } + exit 0 workingDirectory: $(Build.SourcesDirectory) displayName: 'Run eval for current model' @@ -162,3 +192,9 @@ jobs: --input-dir $(EVAL_DIR) workingDirectory: $(Build.SourcesDirectory) displayName: 'Generate evaluation report' + + - task: PublishPipelineArtifact@1 + inputs: + targetPath: $(EVAL_DIR) + artifactName: EvalReport + displayName: 'Publish eval results as artifact' diff --git a/scripts/e2e_eval/run_eval.py b/scripts/e2e_eval/run_eval.py index 4c859bec1..0b5ab54d4 100644 --- a/scripts/e2e_eval/run_eval.py +++ b/scripts/e2e_eval/run_eval.py @@ -36,6 +36,7 @@ import shutil import subprocess import sys +import tempfile import threading import time from datetime import date, datetime, timezone @@ -116,6 +117,8 @@ def _get_timeout_skip_reason(hf_id: str, task: str) -> str: _HF_CACHE = Path.home() / ".cache" / "huggingface" _WML_CACHE = Path.home() / ".cache" / "winml" +_TEMP_DIR = Path(os.environ.get("TEMP", os.environ.get("TMP", tempfile.gettempdir()))) +_TEMP_PREFIXES = ("wmk_", "modelkit_compat_") def _is_no_space_error(proc: dict) -> bool: @@ -125,17 +128,43 @@ def _is_no_space_error(proc: dict) -> bool: def _clear_disk_caches() -> None: - """Delete HuggingFace and WML cache directories to free disk space.""" + """Delete HuggingFace, WML cache directories and leaked temp files.""" for cache_dir in (_HF_CACHE, _WML_CACHE): if cache_dir.exists(): - safe_print(f" [disk-full] Removing cache: {cache_dir}") + safe_print(f" [cleanup] Removing cache: {cache_dir}") try: shutil.rmtree(cache_dir) - safe_print(f" [disk-full] Removed: {cache_dir}") + safe_print(f" [cleanup] Removed: {cache_dir}") except OSError as exc: - safe_print(f" [disk-full] Warning: could not remove {cache_dir}: {exc}") - else: - safe_print(f" [disk-full] Cache not found (skipping): {cache_dir}") + safe_print(f" [cleanup] Warning: could not remove {cache_dir}: {exc}") + + # Clean leaked temp directories/files (wmk_*, modelkit_compat_*, tmp*.onnx*) + if _TEMP_DIR.is_dir(): + cleaned = 0 + for entry in _TEMP_DIR.iterdir(): + name = entry.name + should_clean = False + if any(name.startswith(p) for p in _TEMP_PREFIXES): + should_clean = ( + entry.is_dir() + or entry.suffix in (".onnx", ".out", ".err") + or name.endswith(".onnx.data") + ) + elif name.startswith("tmp") and name.endswith((".onnx", ".onnx.data")): + # Python tempfile creates tmp* prefixed files; only clean ONNX artifacts + should_clean = True + if should_clean: + safe_print(f" [cleanup] Leaked temp: {entry}") + try: + if entry.is_dir(): + shutil.rmtree(entry) + else: + entry.unlink() + cleaned += 1 + except OSError: + pass # Best-effort cleanup; ignore if file is locked or already removed + if cleaned: + safe_print(f" [cleanup] Removed {cleaned} leaked temp entries from {_TEMP_DIR}") def safe_print(text: str) -> None: @@ -822,15 +851,30 @@ def _get_disk_free_gb() -> float: return shutil.disk_usage(anchor).free / (1024**3) -def _clean_model_hf_cache(hf_id: str) -> None: - """Delete cached HuggingFace files for a specific model.""" - slug = f"models--{hf_id.replace('/', '--')}" - cache_path = HF_CACHE_DIR / slug - if not cache_path.exists(): - return - size_mb = sum(f.stat().st_size for f in cache_path.rglob("*") if f.is_file()) / (1024 * 1024) - shutil.rmtree(cache_path, ignore_errors=True) - safe_print(f" [cache] Cleaned {slug} ({size_mb:.0f} MB freed)") +def _should_skip_existing(existing: dict, retry_types: set[str] | None, eval_type: str) -> bool: + """Return True if an existing eval_result should be skipped (not re-run). + + Used by both --list-json and the main eval loop to share continue/retry logic. + """ + if retry_types is None: + return True # --continue without --retry-failed: skip all existing + + perf = existing.get("perf") or {} + acc = existing.get("accuracy") + + # Check perf failure (only when perf ran) + if eval_type != "accuracy" and not perf.get("passed"): + cls = classify_result(existing) or "UNKNOWN" + if not retry_types or cls in retry_types: + return False # Should retry + + # Check accuracy verdict + if acc is not None and not acc.get("skipped"): + verdict = derive_verdict(acc).value + if not retry_types or verdict in retry_types: + return False # Should retry + + return True # No retry criteria matched — skip def model_result_dir(output_dir: Path, hf_id: str, task: str = "") -> Path: @@ -876,9 +920,10 @@ def parse_args() -> argparse.Namespace: "--timeout", type=int, default=600, help="Per-subprocess timeout in seconds (default: 600)" ) parser.add_argument( - "--clean-hf-cache", + "--clean-cache", + dest="clean_cache", action="store_true", - help="Delete HuggingFace hub cache for each model after evaluation (saves disk space)", + help="Delete caches and leaked temp files after each model evaluation (saves disk space)", ) parser.add_argument("--list", action="store_true", help="List filtered models and exit") parser.add_argument( @@ -984,6 +1029,34 @@ def main() -> None: # --list-json mode: write machine-readable JSON and exit if args.list_json: + # --continue / --retry-failed: filter out already-evaluated models + if args.continue_run or args.retry_failed is not None: + output_dir = args.output_dir or Path(f"eval_results/{date.today().isoformat()}") + retry_types: set[str] | None = None + if args.retry_failed is not None: + args.continue_run = True + retry_types = {t.upper() for t in args.retry_failed} if args.retry_failed else set() + + filtered: list[ModelEntry] = [] + skipped_count = 0 + for e in entries: + result_path = model_result_dir(output_dir, e.hf_id, e.task) / "eval_result.json" + if args.continue_run and result_path.exists(): + try: + existing = load_result_json(result_path) + if _should_skip_existing(existing, retry_types, args.eval_type): + skipped_count += 1 + continue + except Exception: + pass # Corrupt result file — include model for re-evaluation + filtered.append(e) + if skipped_count: + safe_print( + f"--continue: skipped {skipped_count} already-evaluated models " + f"(output_dir: {output_dir})" + ) + entries = filtered + model_list = [ { "hf_id": e.hf_id, @@ -1025,8 +1098,8 @@ def main() -> None: safe_print(f"E2E Evaluation: {len(entries)} models -> {output_dir}") safe_print(f"Device: {args.device} | Timeout: {args.timeout}s | Eval: {args.eval_type}") safe_print(f"Disk free: {_get_disk_free_gb():.1f} GB") - if args.clean_hf_cache: - safe_print("Cache cleanup: ON (HF cache cleaned after each model)") + if args.clean_cache: + safe_print("Cache cleanup: ON (caches + temp files cleaned after each model)") if retry_types is not None: if retry_types: safe_print(f"Retry mode: {', '.join(sorted(retry_types))}") @@ -1077,29 +1150,12 @@ def main() -> None: if args.continue_run and result_path.exists(): try: existing = load_result_json(result_path) - skip = True - - perf = existing.get("perf") or {} - acc = existing.get("accuracy") - - # Derive current classification / verdict to check against retry types - if retry_types is not None: - should_retry = False - # Check perf failure (only when perf ran) - if args.eval_type != "accuracy" and not perf.get("passed"): - cls = classify_result(existing) or "UNKNOWN" - if not retry_types or cls in retry_types: - should_retry = True - # Check accuracy verdict - if not should_retry and acc is not None and not acc.get("skipped"): - verdict = derive_verdict(acc).value - if not retry_types or verdict in retry_types: - should_retry = True - skip = not should_retry - - if skip: + + if _should_skip_existing(existing, retry_types, args.eval_type): results.append(existing) skipped += 1 + perf = existing.get("perf") or {} + acc = existing.get("accuracy") perf_cls = classify_result(existing) or "UNKNOWN" perf_tag = "PASS" if perf.get("passed") else f"FAIL/{perf_cls}" acc_tag = "" @@ -1224,8 +1280,8 @@ def main() -> None: else: safe_print(f" [acc only]{acc_tag}") - if args.clean_hf_cache: - _clean_model_hf_cache(entry.hf_id) + if args.clean_cache: + _clear_disk_caches() run_duration = time.perf_counter() - run_start