Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 48 additions & 12 deletions .pipelines/Modelkit E2E Test.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
trigger: none

parameters:
- name: evalDate
displayName: 'Eval date (leave empty for today, e.g. 2026-04-01)'
type: string
default: ''
- name: continueRun
displayName: 'Skip already-evaluated models (--continue)'
type: boolean
default: true

variables:
evalOutputBase: 'c:/eval_results'

Expand Down Expand Up @@ -43,31 +53,47 @@ jobs:
displayName: 'Install dependencies'

- powershell: |
$evalDate = Get-Date -Format 'yyyy-MM-dd'
$evalDate = '${{ parameters.evalDate }}'
if (-not $evalDate) { $evalDate = Get-Date -Format 'yyyy-MM-dd' }
$dir = "$(evalOutputBase)/$evalDate"
Write-Host "##vso[task.setvariable variable=EVAL_DIR;isOutput=true]$dir"
Write-Host "Eval output directory: $dir"
name: set_output_dir
displayName: 'Set eval output directory'

- script: >
uv run python scripts/e2e_eval/run_eval.py
--list-json temp/model_list.json
--device npu
- powershell: |
$args = @(
"run", "python", "scripts/e2e_eval/run_eval.py",
"--list-json", "temp/model_list.json",
"--device", "npu"
)
if ('${{ parameters.continueRun }}' -eq 'True') {
$args += @("--continue", "--output-dir", "$(set_output_dir.EVAL_DIR)")
}
& uv @args
workingDirectory: $(Build.SourcesDirectory)
displayName: 'Generate model list'

- powershell: |
$models = Get-Content "$(Build.SourcesDirectory)/temp/model_list.json" | ConvertFrom-Json
$total = $models.Count
if ($total -eq 0) {
throw "No models found in temp/model_list.json"
Write-Host "All models already evaluated — nothing to run"
Write-Host "##vso[task.setvariable variable=modelMatrix;isOutput=true]{}"
Write-Host "##vso[task.setvariable variable=skipEval;isOutput=true]true"
return
}

$matrix = @{}
for ($i = 0; $i -lt $total; $i++) {
$m = $models[$i]
$key = ($m.hf_id -replace '[^A-Za-z0-9]', '_')
$slug = (($m.hf_id + '_' + $m.task) -replace '[^A-Za-z0-9]', '_')
$key = $slug
$suffix = 2
while ($matrix.ContainsKey($key)) {
$key = "${slug}_${suffix}"
$suffix++
}
$matrix[$key] = @{
hf_id = [string]$m.hf_id
hf_task = [string]$m.task
Expand All @@ -84,9 +110,11 @@ jobs:
displayName: 'Create matrix variables'

- job: EvalModel
displayName: 'Run Model Eval'
displayName: 'Eval'
dependsOn: Prepare
timeoutInMinutes: 2400
condition: and(succeeded(), ne(dependencies.Prepare.outputs['set_matrix.skipEval'], 'true'))
timeoutInMinutes: 90
cancelTimeoutInMinutes: 2
pool:
name: modelkit-selfhost-pool
demands:
Expand Down Expand Up @@ -121,16 +149,18 @@ jobs:
"--verbose",
"--timeout", "1800",
"--no-report",
"--clean-hf-cache"
"--clean-cache"
)
if ("$(hf_task)") {
$uvArgs += @("--task", "$(hf_task)")
}

& uv @uvArgs
if ($LASTEXITCODE -ne 0) {
throw "Model evaluation failed for $(hf_id) / $(hf_task)"
$evalExit = $LASTEXITCODE
if ($evalExit -ne 0) {
Write-Warning "Model eval exited with code $evalExit for $(hf_id) / $(hf_task) (model failure — non-blocking)"
}
exit 0
workingDirectory: $(Build.SourcesDirectory)
displayName: 'Run eval for current model'

Expand Down Expand Up @@ -162,3 +192,9 @@ jobs:
--input-dir $(EVAL_DIR)
workingDirectory: $(Build.SourcesDirectory)
displayName: 'Generate evaluation report'

- task: PublishPipelineArtifact@1
inputs:
targetPath: $(EVAL_DIR)
artifactName: EvalReport
displayName: 'Publish eval results as artifact'
140 changes: 98 additions & 42 deletions scripts/e2e_eval/run_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import shutil
import subprocess
import sys
import tempfile
import threading
import time
from datetime import date, datetime, timezone
Expand Down Expand Up @@ -116,6 +117,8 @@ def _get_timeout_skip_reason(hf_id: str, task: str) -> str:

_HF_CACHE = Path.home() / ".cache" / "huggingface"
_WML_CACHE = Path.home() / ".cache" / "winml"
_TEMP_DIR = Path(os.environ.get("TEMP", os.environ.get("TMP", tempfile.gettempdir())))
_TEMP_PREFIXES = ("wmk_", "modelkit_compat_")


def _is_no_space_error(proc: dict) -> bool:
Expand All @@ -125,17 +128,43 @@ def _is_no_space_error(proc: dict) -> bool:


def _clear_disk_caches() -> None:
"""Delete HuggingFace and WML cache directories to free disk space."""
"""Delete HuggingFace, WML cache directories and leaked temp files."""
for cache_dir in (_HF_CACHE, _WML_CACHE):
if cache_dir.exists():
safe_print(f" [disk-full] Removing cache: {cache_dir}")
safe_print(f" [cleanup] Removing cache: {cache_dir}")
try:
shutil.rmtree(cache_dir)
safe_print(f" [disk-full] Removed: {cache_dir}")
safe_print(f" [cleanup] Removed: {cache_dir}")
except OSError as exc:
safe_print(f" [disk-full] Warning: could not remove {cache_dir}: {exc}")
else:
safe_print(f" [disk-full] Cache not found (skipping): {cache_dir}")
safe_print(f" [cleanup] Warning: could not remove {cache_dir}: {exc}")

# Clean leaked temp directories/files (wmk_*, modelkit_compat_*, tmp*.onnx*)
if _TEMP_DIR.is_dir():
cleaned = 0
for entry in _TEMP_DIR.iterdir():
name = entry.name
should_clean = False
if any(name.startswith(p) for p in _TEMP_PREFIXES):
should_clean = (
entry.is_dir()
or entry.suffix in (".onnx", ".out", ".err")
or name.endswith(".onnx.data")
)
elif name.startswith("tmp") and name.endswith((".onnx", ".onnx.data")):
# Python tempfile creates tmp* prefixed files; only clean ONNX artifacts
should_clean = True
if should_clean:
safe_print(f" [cleanup] Leaked temp: {entry}")
try:
if entry.is_dir():
shutil.rmtree(entry)
else:
entry.unlink()
cleaned += 1
except OSError:
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed
pass # Best-effort cleanup; ignore if file is locked or already removed
if cleaned:
safe_print(f" [cleanup] Removed {cleaned} leaked temp entries from {_TEMP_DIR}")


def safe_print(text: str) -> None:
Expand Down Expand Up @@ -822,15 +851,30 @@ def _get_disk_free_gb() -> float:
return shutil.disk_usage(anchor).free / (1024**3)


def _clean_model_hf_cache(hf_id: str) -> None:
"""Delete cached HuggingFace files for a specific model."""
slug = f"models--{hf_id.replace('/', '--')}"
cache_path = HF_CACHE_DIR / slug
if not cache_path.exists():
return
size_mb = sum(f.stat().st_size for f in cache_path.rglob("*") if f.is_file()) / (1024 * 1024)
shutil.rmtree(cache_path, ignore_errors=True)
safe_print(f" [cache] Cleaned {slug} ({size_mb:.0f} MB freed)")
def _should_skip_existing(existing: dict, retry_types: set[str] | None, eval_type: str) -> bool:
"""Return True if an existing eval_result should be skipped (not re-run).

Used by both --list-json and the main eval loop to share continue/retry logic.
"""
if retry_types is None:
return True # --continue without --retry-failed: skip all existing

perf = existing.get("perf") or {}
acc = existing.get("accuracy")

# Check perf failure (only when perf ran)
if eval_type != "accuracy" and not perf.get("passed"):
cls = classify_result(existing) or "UNKNOWN"
if not retry_types or cls in retry_types:
return False # Should retry

# Check accuracy verdict
if acc is not None and not acc.get("skipped"):
verdict = derive_verdict(acc).value
if not retry_types or verdict in retry_types:
return False # Should retry

return True # No retry criteria matched — skip


def model_result_dir(output_dir: Path, hf_id: str, task: str = "") -> Path:
Expand Down Expand Up @@ -876,9 +920,10 @@ def parse_args() -> argparse.Namespace:
"--timeout", type=int, default=600, help="Per-subprocess timeout in seconds (default: 600)"
)
parser.add_argument(
"--clean-hf-cache",
"--clean-cache",
dest="clean_cache",
action="store_true",
help="Delete HuggingFace hub cache for each model after evaluation (saves disk space)",
help="Delete caches and leaked temp files after each model evaluation (saves disk space)",
)
parser.add_argument("--list", action="store_true", help="List filtered models and exit")
parser.add_argument(
Expand Down Expand Up @@ -984,6 +1029,34 @@ def main() -> None:

# --list-json mode: write machine-readable JSON and exit
if args.list_json:
# --continue / --retry-failed: filter out already-evaluated models
if args.continue_run or args.retry_failed is not None:
output_dir = args.output_dir or Path(f"eval_results/{date.today().isoformat()}")
retry_types: set[str] | None = None
if args.retry_failed is not None:
args.continue_run = True
retry_types = {t.upper() for t in args.retry_failed} if args.retry_failed else set()

filtered: list[ModelEntry] = []
skipped_count = 0
for e in entries:
result_path = model_result_dir(output_dir, e.hf_id, e.task) / "eval_result.json"
if args.continue_run and result_path.exists():
try:
existing = load_result_json(result_path)
if _should_skip_existing(existing, retry_types, args.eval_type):
skipped_count += 1
continue
except Exception:
pass # Corrupt result file — include model for re-evaluation
filtered.append(e)
if skipped_count:
safe_print(
f"--continue: skipped {skipped_count} already-evaluated models "
f"(output_dir: {output_dir})"
)
entries = filtered

model_list = [
{
"hf_id": e.hf_id,
Expand Down Expand Up @@ -1025,8 +1098,8 @@ def main() -> None:
safe_print(f"E2E Evaluation: {len(entries)} models -> {output_dir}")
safe_print(f"Device: {args.device} | Timeout: {args.timeout}s | Eval: {args.eval_type}")
safe_print(f"Disk free: {_get_disk_free_gb():.1f} GB")
if args.clean_hf_cache:
safe_print("Cache cleanup: ON (HF cache cleaned after each model)")
if args.clean_cache:
safe_print("Cache cleanup: ON (caches + temp files cleaned after each model)")
if retry_types is not None:
if retry_types:
safe_print(f"Retry mode: {', '.join(sorted(retry_types))}")
Expand Down Expand Up @@ -1077,29 +1150,12 @@ def main() -> None:
if args.continue_run and result_path.exists():
try:
existing = load_result_json(result_path)
skip = True

perf = existing.get("perf") or {}
acc = existing.get("accuracy")

# Derive current classification / verdict to check against retry types
if retry_types is not None:
should_retry = False
# Check perf failure (only when perf ran)
if args.eval_type != "accuracy" and not perf.get("passed"):
cls = classify_result(existing) or "UNKNOWN"
if not retry_types or cls in retry_types:
should_retry = True
# Check accuracy verdict
if not should_retry and acc is not None and not acc.get("skipped"):
verdict = derive_verdict(acc).value
if not retry_types or verdict in retry_types:
should_retry = True
skip = not should_retry

if skip:

if _should_skip_existing(existing, retry_types, args.eval_type):
results.append(existing)
skipped += 1
perf = existing.get("perf") or {}
acc = existing.get("accuracy")
perf_cls = classify_result(existing) or "UNKNOWN"
perf_tag = "PASS" if perf.get("passed") else f"FAIL/{perf_cls}"
acc_tag = ""
Expand Down Expand Up @@ -1224,8 +1280,8 @@ def main() -> None:
else:
safe_print(f" [acc only]{acc_tag}")

if args.clean_hf_cache:
_clean_model_hf_cache(entry.hf_id)
if args.clean_cache:
_clear_disk_caches()

run_duration = time.perf_counter() - run_start

Expand Down
Loading