diff --git a/nemo_skills/dataset/eval_kit/__init__.py b/nemo_skills/dataset/eval_kit/__init__.py new file mode 100644 index 0000000000..624305bf50 --- /dev/null +++ b/nemo_skills/dataset/eval_kit/__init__.py @@ -0,0 +1,42 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# VLMEvalKit integration module. +# Benchmarks are referenced as eval_kit., e.g. eval_kit.MMBench_DEV_EN +# The sub-benchmark name after eval_kit. is dynamically resolved and passed to VLMEvalKit. + +GENERATION_MODULE = "nemo_skills.inference.eval.eval_kit" +METRICS_TYPE = "eval_kit" +GENERATION_ARGS = "" +NUM_SAMPLES = 0 # VLMEvalKit inference is deterministic; no random seeds + +# No JSONL input file; VLMEvalKit manages its own data via build_dataset() +SKIP_INPUT_FILE = True + +# Note: SELF_CONTAINED_TASK is NOT set here because it depends on model_type. +# For mcore mode (Megatron in-process), the pipeline sets self_contained_task=True +# at runtime based on ++model_type=mcore in extra_arguments. +# For vllm mode, the standard NeMo Skills server/client flow is used. + + +def get_extra_generation_args(benchmark): + """Return extra generation args for the given benchmark name. + + Extracts the VLMEvalKit dataset name from the dotted benchmark name + (e.g. eval_kit.MMBench_DEV_EN -> ++vlm_dataset=MMBench_DEV_EN). + """ + if "." in benchmark: + sub = benchmark.split(".", 1)[1] + return f" ++vlm_dataset={sub} " + return "" diff --git a/nemo_skills/dataset/utils.py b/nemo_skills/dataset/utils.py index 0b59a2b7db..4962749bd6 100644 --- a/nemo_skills/dataset/utils.py +++ b/nemo_skills/dataset/utils.py @@ -88,7 +88,21 @@ def _get_dataset_module_from_cluster(cluster_config, mounted_path): def get_default_dataset_module(dataset, data_dir=None, cluster_config=None): + """Return (dataset_module, data_path, is_on_cluster).""" is_on_cluster = False + + # For dotted names like eval_kit.MMBench_DEV_EN, import the parent package. + # The sub-benchmark part is handled by the module's get_extra_generation_args(). + if dataset.startswith("eval_kit."): + dataset_module = importlib.import_module("nemo_skills.dataset.eval_kit") + if data_dir is None: + data_path = "/nemo_run/code/nemo_skills/dataset" + else: + data_path = data_dir + if cluster_config is not None and cluster_config["executor"] == "slurm": + is_on_cluster = True + return dataset_module, data_path, is_on_cluster + if data_dir is None: data_path = "/nemo_run/code/nemo_skills/dataset" dataset_module = importlib.import_module(f"nemo_skills.dataset.{dataset}") @@ -121,9 +135,11 @@ def get_dataset_module(dataset, data_dir=None, cluster_config=None, extra_datase 1. data_dir (or `nemo_skills.dataset` if None) folder 3. extra_datasets parameter if defined 4. `NEMO_SKILLS_EXTRA_DATASETS` environment variable + + Returns (module, data_path, is_on_cluster). """ try: - dataset_module, data_path, is_on_cluster = get_default_dataset_module(dataset, data_dir, cluster_config) + return get_default_dataset_module(dataset, data_dir, cluster_config) except ModuleNotFoundError: try: dataset = dataset.replace(".", "/") diff --git a/nemo_skills/evaluation/evaluator/__init__.py b/nemo_skills/evaluation/evaluator/__init__.py index 269bff939c..dfe82f93cc 100644 --- a/nemo_skills/evaluation/evaluator/__init__.py +++ b/nemo_skills/evaluation/evaluator/__init__.py @@ -27,7 +27,10 @@ eval_livebench_coding, eval_livecodebench_pro, ) -from nemo_skills.evaluation.evaluator.compute_eval import ComputeEvalEvaluator +try: + from nemo_skills.evaluation.evaluator.compute_eval import ComputeEvalEvaluator +except ImportError: + ComputeEvalEvaluator = None from nemo_skills.evaluation.evaluator.icpc import ICPCEvaluator from nemo_skills.evaluation.evaluator.ifbench import eval_ifbench from nemo_skills.evaluation.evaluator.ifeval import eval_if @@ -71,8 +74,9 @@ "icpc": ICPCEvaluator, "audio": AudioEvaluator, "bird": BirdEvaluator, - "compute-eval": ComputeEvalEvaluator, } +if ComputeEvalEvaluator is not None: + EVALUATOR_CLASS_MAP["compute-eval"] = ComputeEvalEvaluator # Validation: Ensure no overlap between class and function maps _class_types = set(EVALUATOR_CLASS_MAP.keys()) diff --git a/nemo_skills/evaluation/evaluator/audio.py b/nemo_skills/evaluation/evaluator/audio.py index 2ebaafe4ea..992abbeb5f 100644 --- a/nemo_skills/evaluation/evaluator/audio.py +++ b/nemo_skills/evaluation/evaluator/audio.py @@ -496,16 +496,23 @@ def evaluate_sample(sample: dict[str, Any], config: AudioEvaluatorConfig) -> dic if config.strip_helpful_prefixes: generation = strip_helpful_prefixes(generation) - if task_type in ["ASR", "ASR-PC", "ASR_LEADERBOARD", "AST", "Translation", "CER"] and not generation: + # Normalise AudioBench speech-translation task types (ST-EN-ZH → Translation) + _ASR_TYPES = {"ASR", "ASR-ZH", "ASR-PC", "ASR_LEADERBOARD"} + _TRANSLATION_TYPES = {"AST", "Translation"} + # AudioBench speech translation types: ST-{src}-{tgt} + if task_type.startswith("ST-"): + _TRANSLATION_TYPES.add(task_type) + + if task_type in (_ASR_TYPES | _TRANSLATION_TYPES | {"CER"}) and not generation: base = { "is_correct": False, "error": "missing_generation", } - if task_type in ["AST", "Translation"]: + if task_type in _TRANSLATION_TYPES: return {**base, "bleu": 0.0} if task_type == "CER": return {**base, "cer": 1.0} - # ASR / ASR-PC + # ASR / ASR-PC / ASR-ZH return {**base, "wer": 1.0} if task_type == "ASR-PC": @@ -518,7 +525,7 @@ def evaluate_sample(sample: dict[str, Any], config: AudioEvaluatorConfig) -> dic ) updates.update(metrics) - elif task_type == "ASR": + elif task_type in {"ASR", "ASR-ZH"}: mode = config.normalization_mode if config.apply_whisper_normalization else "none" metrics = evaluate_asr(expected_answer, generation, normalization_mode=mode) updates.update(metrics) @@ -530,7 +537,7 @@ def evaluate_sample(sample: dict[str, Any], config: AudioEvaluatorConfig) -> dic metrics = evaluate_asr(expected_answer, generation, normalization_mode=mode) updates.update(metrics) - elif task_type in ["AST", "Translation"]: + elif task_type in _TRANSLATION_TYPES: metrics = evaluate_translation(expected_answer, generation) updates.update(metrics) @@ -547,6 +554,13 @@ def evaluate_sample(sample: dict[str, Any], config: AudioEvaluatorConfig) -> dic metrics = evaluate_pc_rate(expected_answer, generation) updates.update(metrics) + elif task_type == "MathQA": + # AudioBench MathQA: exact string match after normalization + gen_norm = generation.strip().lower() + ref_norm = expected_answer.strip().lower() + updates["is_correct"] = gen_norm == ref_norm + updates["predicted_answer"] = generation + else: if "requires_judge" not in sample: updates["requires_judge"] = True diff --git a/nemo_skills/evaluation/metrics/compute_metrics.py b/nemo_skills/evaluation/metrics/compute_metrics.py index 12d5c70470..debe4ccafc 100644 --- a/nemo_skills/evaluation/metrics/compute_metrics.py +++ b/nemo_skills/evaluation/metrics/compute_metrics.py @@ -38,7 +38,7 @@ def __init__( self.metric_type = metric_type self.max_seq_len = max_seq_len if self.metric_type is None: - benchmark_module, _, _ = get_dataset_module( + benchmark_module, *_ = get_dataset_module( benchmark, data_dir=data_dir, cluster_config=cluster_config, diff --git a/nemo_skills/evaluation/metrics/eval_kit_metrics.py b/nemo_skills/evaluation/metrics/eval_kit_metrics.py new file mode 100644 index 0000000000..bb664f58c3 --- /dev/null +++ b/nemo_skills/evaluation/metrics/eval_kit_metrics.py @@ -0,0 +1,93 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +from collections import defaultdict +from pathlib import Path + +from nemo_skills.evaluation.metrics.base import BaseMetrics + + +class EvalKitMetrics(BaseMetrics): + """Metrics class for VLMEvalKit benchmarks. + + VLMEvalKit computes its own aggregate metrics during evaluation. + This class reads pre-computed aggregates from eval_kit_metrics.json + (written by EvalKitGenerationTask) rather than computing per-sample metrics. + The per-sample JSONL is still read by ComputeMetrics for the update() loop, + but we only count entries here -- the real metrics come from the JSON file. + + Note: ComputeMetrics only calls setup() on the "_all_" calculator. When + the data contains ``subset_for_metrics``, additional per-subset calculator + instances are created but never receive a setup() call. We use a + class-level ``_shared_metrics_file`` so that those subset instances can + still locate the eval_kit_metrics.json discovered by the "_all_" instance. + """ + + # Shared across all instances so subset calculators can find the file + # even though only the "_all_" calculator receives setup(). + _shared_metrics_file: Path | None = None + + def __init__(self, **kwargs): + super().__init__(compute_no_answer=False) + self.eval_kit_metrics_file = None + + def setup(self, input_files): + """Find the eval_kit_metrics.json in the same directory as the input files.""" + if input_files: + # input_files are like ['/path/to/eval-results/eval_kit.MMBench_DEV_EN/output.jsonl'] + metrics_dir = Path(input_files[0]).parent + candidate = metrics_dir / "eval_kit_metrics.json" + if candidate.exists(): + self.eval_kit_metrics_file = candidate + EvalKitMetrics._shared_metrics_file = candidate + + def update(self, predictions): + """Count entries but don't compute per-sample metrics.""" + self.total += 1 + + def get_metrics(self): + """Return pre-computed VLMEvalKit aggregate metrics.""" + metrics_dict = {} + + # Load pre-computed metrics from VLMEvalKit. + # Fall back to the class-level shared file for subset calculators + # that never received a setup() call. + eval_kit_results = {} + effective_file = self.eval_kit_metrics_file or EvalKitMetrics._shared_metrics_file + if effective_file and effective_file.exists(): + with open(effective_file, "rt", encoding="utf-8") as f: + eval_kit_results = json.load(f) + + # Build the metrics in NeMo Skills format + agg_dict = {"num_entries": self.total} + + # Flatten VLMEvalKit results into the metrics dict + for key, value in eval_kit_results.items(): + if isinstance(value, dict): + # Nested results (e.g., per-category scores) + for sub_key, sub_value in value.items(): + if isinstance(sub_value, (int, float)): + agg_dict[f"{key}_{sub_key}"] = sub_value + elif isinstance(value, (int, float)): + agg_dict[key] = value + + metrics_dict["greedy"] = agg_dict + return metrics_dict + + def metrics_to_print(self): + return None + + def evaluations_to_print(self): + return ["greedy"] diff --git a/nemo_skills/evaluation/metrics/map_metrics.py b/nemo_skills/evaluation/metrics/map_metrics.py index ad3e367d0d..30e3cec718 100644 --- a/nemo_skills/evaluation/metrics/map_metrics.py +++ b/nemo_skills/evaluation/metrics/map_metrics.py @@ -31,6 +31,7 @@ SciCodeMetrics, SweBenchMetrics, ) +from nemo_skills.evaluation.metrics.eval_kit_metrics import EvalKitMetrics from nemo_skills.evaluation.metrics.gradingbench_metrics import GradingBenchMetrics from nemo_skills.evaluation.metrics.hleaa_metrics import HLEAAMetrics from nemo_skills.evaluation.metrics.icpc_metrics import ICPCMetrics @@ -84,6 +85,7 @@ "omniscience": OmniMetrics, "compute-eval": ComputeEvalMetrics, "gradingbench": GradingBenchMetrics, + "eval_kit": EvalKitMetrics, } diff --git a/nemo_skills/evaluation/metrics/translation_metrics.py b/nemo_skills/evaluation/metrics/translation_metrics.py index 5a819152cd..8f5be0bdeb 100644 --- a/nemo_skills/evaluation/metrics/translation_metrics.py +++ b/nemo_skills/evaluation/metrics/translation_metrics.py @@ -16,7 +16,6 @@ from collections import defaultdict import numpy as np -from sacrebleu import corpus_bleu from nemo_skills.evaluation.metrics.base import BaseMetrics, as_float @@ -35,6 +34,8 @@ class TranslationMetrics(BaseMetrics): # TODO: add support for other translation metrics, such as MetricX def get_metrics(self): + from sacrebleu import corpus_bleu + metrics_dict = {} for key in self.translation_dict: src_lang, tgt_lang = key.split("->") diff --git a/nemo_skills/inference/eval/eval_kit.py b/nemo_skills/inference/eval/eval_kit.py new file mode 100644 index 0000000000..227c7b037d --- /dev/null +++ b/nemo_skills/inference/eval/eval_kit.py @@ -0,0 +1,511 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""VLMEvalKit integration for NeMo Skills. + +This module implements a self-contained generation task that uses VLMEvalKit's +inference and evaluation pipeline. Two modes are supported: + +1. Megatron in-process (model_type=mcore): VLMEvalKit's MultiModalMCore loads + and runs the Megatron model directly. No NeMo Skills server is started. + +2. vLLM client (model_type=vllm): NeMo Skills starts a vLLM server normally, + and VLMEvalKit's VLLMLocal connects to it as a client. + +Benchmarks are referenced as eval_kit. in NeMo Skills, +e.g. --benchmarks eval_kit.MMBench_DEV_EN +""" + +import json +import logging +import os +import pickle +import threading +from dataclasses import field +from pathlib import Path + +import hydra +from omegaconf import MISSING + +try: + from nemo_skills.inference.generate import GenerationTask +except ImportError: + # On the cluster, GenerationTask may not be importable due to missing deps + # (nemo_run, litellm, etc.). The inheritance is only needed for the pipeline's + # __func__ check which runs locally. On the cluster we just need a base class. + GenerationTask = object + +from nemo_skills.utils import get_logger_name, nested_dataclass + +LOG = logging.getLogger(get_logger_name(__file__)) + +# VLMEvalKit (vlmeval) is packaged alongside Skills code by nemo_run when +# NEMO_SKILLS_VLMEVALKIT_PATH is set (see eval.py extra_package_dirs logic). +# It lands at /nemo_run/code/vlmeval/ on the cluster, importable via PYTHONPATH. +# func-timeout is installed at job start via --installation_command in the run script. +# No venv-based requirements are needed (get_generation_requirements returns None). + + +@nested_dataclass(kw_only=True) +class EvalKitConfig: + """Configuration for VLMEvalKit generation task.""" + + # VLMEvalKit dataset name (injected by pipeline from benchmark name) + vlm_dataset: str = MISSING + + # Model configuration + model_type: str = "mcore" # "mcore" or "vllm" + model_config: str | None = None # Path to YAML config for mcore + load_dir: str | None = None # Checkpoint directory for mcore + load_ckpt: str | None = None # Specific checkpoint for mcore + server_url: str | None = None # URL for vLLM server (vllm mode) + model_name: str | None = None # Model name for vLLM + + # Inference parameters + reasoning: bool = False + temperature: float = 1.0 + top_k: int = 1 + top_p: float = 0.95 + + # Video dataset parameters + nframe: int = 16 + fps: int = -1 + nframe_max: int = -1 + use_subtitle: bool = False + media_dir: str = "./" + + # Evaluation parameters + eval_mode: str = "all" # "all", "infer", or "eval" + judge: str | None = None + judge_nproc: int = 4 + judge_retry: int = 3 + + # Output configuration (populated by the pipeline) + work_dir: str = "./outputs" + output_file: str = "" + skip_filled: bool = False # Accepted from pipeline but unused (VLMEvalKit has its own resume) + + # Fields accepted from pipeline but unused by eval_kit (avoids Hydra errors from common_args) + eval_config: dict = field(default_factory=dict) + + +cs = hydra.core.config_store.ConfigStore.instance() +cs.store(name="base_eval_kit_config", node=EvalKitConfig) + + +class EvalKitGenerationTask(GenerationTask): + """Generation task using VLMEvalKit. + + Supports two modes: + - mcore: Self-contained, no NeMo Skills server. Pipeline sets self_contained_task=True. + - vllm: NeMo Skills starts a vLLM server, VLMEvalKit connects as client. + + Inherits get_server_command_fn from GenerationTask (returns the default + get_server_command). For mcore mode, the pipeline sets server_config=None + so no server is started. For vllm mode, the standard vLLM server command is used. + """ + + # --- Declarative pipeline attributes (read generically by pipeline/eval.py) --- + CONTAINER_KEY = "eval_kit" + USE_TORCHRUN = True + + @classmethod + def is_self_contained(cls, extra_arguments: str = "") -> bool: + """Self-contained only in mcore mode (Megatron in-process).""" + return "++model_type=mcore" in extra_arguments + + @classmethod + def get_env_prefix(cls) -> str: + """Shell env setup prepended before the main command (Megatron/VLMEvalKit needs).""" + return ( + 'export LMUData="${LMUData:-${LMUDATA:-}}" && ' + 'export LD_LIBRARY_PATH=/opt/hpcx/ucx/lib:${LD_LIBRARY_PATH:-} && ' + 'export MKL_THREADING_LAYER=GNU && ' + 'export OMP_NUM_THREADS=1 && ' + 'export MKL_NUM_THREADS=1 && ' + 'ldconfig && ' + # Create empty .env so VLMEvalKit's load_env() doesn't emit ERROR logs. + 'touch /nemo_run/code/.env 2>/dev/null; ' + ) + + @classmethod + def get_extra_package_dirs(cls) -> list[str]: + """Directories to package alongside nemo_run code (VLMEvalKit vlmeval/).""" + vlmevalkit_path = os.environ.get("NEMO_SKILLS_VLMEVALKIT_PATH") + if vlmevalkit_path: + pkg = os.path.join(vlmevalkit_path, "vlmeval") + if os.path.isdir(pkg): + return [pkg] + return [] + + @classmethod + def get_generation_default_args(cls): + return "" + + @classmethod + def get_generation_requirements(cls): + # VLMEvalKit is installed via --installation_command (pip install from mounted source). + # No additional venv-based requirements needed. + return None + + def __init__(self, cfg: EvalKitConfig): + self.cfg = cfg + + # Validate environment + lmu_data = os.environ.get("LMUData") + if not lmu_data: + raise ValueError( + "LMUData environment variable must be set for eval_kit benchmarks. " + "Add LMUData=/mounted/path to your cluster config env_vars." + ) + + # Build model FIRST so that initialize_megatron() sets up the + # distributed process group before we need dist.barrier() for + # rank-0-first dataset download. + if cfg.model_type == "mcore": + from vlmeval.vlm.multimodal_mcore.model import MultiModalMCore + + if not cfg.model_config: + raise ValueError("model_config is required for mcore model_type.") + self.model = MultiModalMCore( + model_config=cfg.model_config, + load_dir=cfg.load_dir, + load_ckpt=cfg.load_ckpt, + reasoning=cfg.reasoning, + ) + self.model_name = f"mcore_{Path(cfg.model_config).stem}" + elif cfg.model_type == "vllm": + from vlmeval.vlm.vllm_local import VLLMLocal + + if not cfg.server_url: + raise ValueError("server_url is required for vllm model_type.") + self.model = VLLMLocal( + vllm_url=cfg.server_url, + autospawn=False, + model_name=cfg.model_name or "vllm_local", + reasoning_mode=cfg.reasoning, + temperature=cfg.temperature, + top_k=cfg.top_k, + top_p=cfg.top_p, + ) + self.model_name = cfg.model_name or "vllm_local" + else: + raise ValueError(f"Unknown model_type: {cfg.model_type}. Must be 'mcore' or 'vllm'.") + + # Build dataset after model so the distributed process group is available + # for the rank-0-first download pattern (run.py:428-433). + from vlmeval.dataset import build_dataset + + dataset_kwargs = self._build_dataset_kwargs() + rank = int(os.environ.get("RANK", 0)) + world_size = int(os.environ.get("WORLD_SIZE", 1)) + + if world_size > 1: + import torch.distributed as dist + + if rank == 0: + build_dataset(cfg.vlm_dataset, **dataset_kwargs) + dist.barrier() + + self.dataset = build_dataset(cfg.vlm_dataset, **dataset_kwargs) + if self.dataset is None: + raise ValueError(f"VLMEvalKit dataset '{cfg.vlm_dataset}' is not valid.") + + self.work_dir = os.path.join(cfg.work_dir, "eval_kit_work", cfg.vlm_dataset) + os.makedirs(self.work_dir, exist_ok=True) + + # Async JSONL writer state + self._async_stop = threading.Event() + self._async_written_indices = set() + self._async_lock = threading.Lock() + self._async_thread = None + + # ------------------------------------------------------------------ + # Incremental JSONL writer (mirrors NeMo Skills' -async pattern) + # ------------------------------------------------------------------ + + def _build_index_to_meta(self): + """Build a lookup from dataset index -> {question, answer} for JSONL rows.""" + meta = {} + df = self.dataset.data + for _, row in df.iterrows(): + idx = row["index"] + meta[idx] = { + "question": str(row["question"]) if "question" in row.index else "", + "expected_answer": str(row["answer"]) if "answer" in row.index else "", + } + return meta + + def _pkl_to_prediction(self, value): + """Extract the prediction string from a pkl entry (str or dict).""" + if isinstance(value, dict) and "prediction" in value: + return str(value["prediction"]) + return str(value) + + def _async_writer_loop(self, pkl_path, index_meta, output_path, poll_interval=5): + """Background thread: poll the pkl file and append new entries to JSONL.""" + while not self._async_stop.is_set(): + self._flush_pkl_to_jsonl(pkl_path, index_meta, output_path) + self._async_stop.wait(timeout=poll_interval) + # Final flush after inference signals stop + self._flush_pkl_to_jsonl(pkl_path, index_meta, output_path) + + def _flush_pkl_to_jsonl(self, pkl_path, index_meta, output_path): + """Read the pkl, find new entries, append them to the JSONL file.""" + if not os.path.exists(pkl_path): + return + try: + with open(pkl_path, "rb") as f: + data = pickle.load(f) + except Exception: + # pkl may be mid-write; skip this cycle + return + if not isinstance(data, dict): + return + + new_entries = [] + with self._async_lock: + for idx, value in data.items(): + if idx not in self._async_written_indices: + self._async_written_indices.add(idx) + meta = index_meta.get(idx, {}) + new_entries.append({ + "generation": self._pkl_to_prediction(value), + "expected_answer": meta.get("expected_answer", ""), + "question": meta.get("question", ""), + }) + + if new_entries: + with open(output_path, "a", encoding="utf-8") as f: + for entry in new_entries: + f.write(json.dumps(entry) + "\n") + LOG.info("Async JSONL: flushed %d new entries (total %d)", + len(new_entries), len(self._async_written_indices)) + + def _start_async_writer(self): + """Start the background JSONL writer if output_file is configured.""" + if not self.cfg.output_file: + return + rank = int(os.environ.get("RANK", 0)) + if rank != 0: + return + + world_size = int(os.environ.get("WORLD_SIZE", 1)) + ds_name = self.dataset.dataset_name + pkl_path = os.path.join(self.work_dir, f"0{world_size}_{ds_name}.pkl") + + output_dir = Path(self.cfg.output_file).parent + output_dir.mkdir(parents=True, exist_ok=True) + + # Clear any previous async file + async_path = self.cfg.output_file + if os.path.exists(async_path): + os.remove(async_path) + + index_meta = self._build_index_to_meta() + + self._async_stop.clear() + self._async_written_indices.clear() + self._async_thread = threading.Thread( + target=self._async_writer_loop, + args=(pkl_path, index_meta, async_path), + daemon=True, + ) + self._async_thread.start() + LOG.info("Started async JSONL writer, monitoring %s", pkl_path) + + def _stop_async_writer(self): + """Stop the background writer and wait for final flush.""" + if self._async_thread is None: + return + self._async_stop.set() + self._async_thread.join(timeout=30) + self._async_thread = None + LOG.info("Async JSONL writer stopped (%d entries written)", len(self._async_written_indices)) + + def _build_dataset_kwargs(self): + """Build dataset kwargs mirroring VLMEvalKit's run.py:390-425.""" + from vlmeval.smp import listinstr + + kwargs = {} + ds = self.cfg.vlm_dataset + + if ds in ['MMLongBench_DOC', 'DUDE', 'DUDE_MINI', 'SLIDEVQA', 'SLIDEVQA_MINI']: + kwargs['model'] = self.cfg.model_name or self.cfg.model_config or "" + + if ds in ('Video-MME', 'Video-MME-With-Audio', 'WorldSense-AVLM', 'MetropolisVideoDataset', 'WorldSense', 'avqa_val'): + kwargs['use_subtitle'] = self.cfg.use_subtitle + if ds in ('Video-MME', 'MetropolisVideoDataset', 'MLVU', 'LongVideoBench', 'MMBench-Video', 'MVBench', 'MLVU_MCQ', 'PAI-Bench-U'): + kwargs['nframe'] = self.cfg.nframe + if ds in ['Video-MME', 'MLVU', 'LongVideoBench', 'WorldSense', 'avqa_val', 'MMBench-Video', 'MVBench', 'MLVU_MCQ', 'PAI-Bench-U']: + kwargs['fps'] = self.cfg.fps + if ds in ['Video-MME', 'MLVU', 'LongVideoBench', 'WorldSense', 'avqa_val', 'MLVU_MCQ', 'MMBench-Video', 'PAI-Bench-U']: + kwargs['nframe_max'] = self.cfg.nframe_max + if ds in ['ANet-RTL', 'Charades-STA']: + kwargs['nframe'] = self.cfg.nframe + + if listinstr(['Video-MME-With-Audio', 'DailyOmni', 'WorldSense-AVLM', 'JensenKeyNote'], ds): + kwargs['media_dir'] = self.cfg.media_dir + + return kwargs + + def generate(self): + """Run VLMEvalKit inference and evaluation.""" + from vlmeval.inference import infer_data_job + from vlmeval.inference_mt import infer_data_job_mt + from vlmeval.inference_video import infer_data_job_video + from vlmeval.smp import get_pred_file_format + + dataset = self.dataset + ds_name = dataset.dataset_name + pred_format = get_pred_file_format() + result_file_base = f"{self.model_name}_{ds_name}.{pred_format}" + + rank = int(os.environ.get("RANK", 0)) + + # Start incremental JSONL writer before inference begins + self._start_async_writer() + + # Dispatch to correct inference function (mirrors run.py:453-488) + try: + if self.cfg.eval_mode != "eval": + if dataset.MODALITY == 'VIDEO': + self.model = infer_data_job_video( + model=self.model, + work_dir=self.work_dir, + model_name=self.model_name, + dataset=dataset, + result_file_name=result_file_base, + strip_think=not self.cfg.reasoning, + reasoning_flag=self.cfg.reasoning, + ) + elif dataset.TYPE == 'MT': + self.model = infer_data_job_mt( + model=self.model, + work_dir=self.work_dir, + model_name=self.model_name, + dataset=dataset, + ) + else: + self.model = infer_data_job( + model=self.model, + work_dir=self.work_dir, + model_name=self.model_name, + dataset=dataset, + strip_think=not self.cfg.reasoning, + reasoning_flag=self.cfg.reasoning, + ) + finally: + self._stop_async_writer() + + # Evaluate (mirrors run.py:490-548) + eval_result = {} + if self.cfg.eval_mode != "infer" and rank == 0: + from vlmeval.smp import get_pred_file_path + + result_file = get_pred_file_path( + self.work_dir, self.model_name, ds_name, use_env_format=True + ) + judge_kwargs = { + 'nproc': self.cfg.judge_nproc, + 'verbose': False, + 'retry': self.cfg.judge_retry, + } + if self.cfg.judge: + judge_kwargs['model'] = self.cfg.judge + + if os.path.exists(result_file): + try: + eval_result = dataset.evaluate(result_file, **judge_kwargs) + except KeyError as e: + if e.args and e.args[0] == 'model': + LOG.warning( + "Dataset %s requires a judge model for evaluation (e.g. MathVista). " + "Skipping evaluation. Set ++judge= (e.g. gpt-4o) to enable. " + "Inference output was still written.", + ds_name, + ) + eval_result = {} + else: + raise + if eval_result is None: + eval_result = {} + + # Convert to NeMo Skills format and write outputs (rank 0 only) + if rank == 0: + self._convert_to_nemo_skills_format(eval_result) + + # Write .done file for pipeline tracking + if self.cfg.output_file: + Path(f"{self.cfg.output_file}.done").touch() + + def _convert_to_nemo_skills_format(self, eval_result): + """Rewrite the final ordered JSONL output and eval_kit_metrics.json. + + The async writer has already been producing incremental JSONL during + inference. Here we overwrite with the authoritative, properly-ordered + result that VLMEvalKit merged from all ranks. + """ + if not self.cfg.output_file: + return + + from vlmeval.smp import get_pred_file_path + from vlmeval.smp import load as vlm_load + + output_dir = Path(self.cfg.output_file).parent + output_dir.mkdir(parents=True, exist_ok=True) + + # Write JSONL (required by summarize_results to find output*jsonl files) + result_file = get_pred_file_path( + self.work_dir, + self.model_name, + self.dataset.dataset_name, + use_env_format=True, + ) + if os.path.exists(result_file): + df = vlm_load(result_file) + with open(self.cfg.output_file, 'w', encoding='utf-8') as f: + for _, row in df.iterrows(): + entry = { + "generation": str(row["prediction"]) if "prediction" in row.index else "", + "expected_answer": str(row["answer"]) if "answer" in row.index else "", + "question": str(row["question"]) if "question" in row.index else "", + } + f.write(json.dumps(entry) + "\n") + LOG.info("Wrote final ordered JSONL to %s (%d entries)", self.cfg.output_file, len(df)) + else: + LOG.warning("VLMEvalKit result file not found: %s", result_file) + + # Write aggregate metrics for EvalKitMetrics to read + # eval_result can be a dict or a pandas DataFrame (e.g. ASR); avoid "if eval_result" for DataFrame + if eval_result is not None: + metrics_data = eval_result if isinstance(eval_result, dict) else {"result": str(eval_result)} + metrics_path = output_dir / "eval_kit_metrics.json" + with open(metrics_path, 'w', encoding='utf-8') as f: + json.dump(metrics_data, f, indent=2, default=str) + LOG.info("Wrote eval_kit metrics to %s", metrics_path) + + +GENERATION_TASK_CLASS = EvalKitGenerationTask + + +@hydra.main(version_base=None, config_name="base_eval_kit_config") +def main(cfg: EvalKitConfig): + cfg = EvalKitConfig(_init_nested=True, **cfg) + task = EvalKitGenerationTask(cfg) + task.generate() + + +if __name__ == "__main__": + main() diff --git a/nemo_skills/inference/factory.py b/nemo_skills/inference/factory.py index cd29bbd2c5..93f5bbe193 100644 --- a/nemo_skills/inference/factory.py +++ b/nemo_skills/inference/factory.py @@ -19,10 +19,12 @@ class GenerationType(str, Enum): generate = "generate" math_judge = "math_judge" check_contamination = "check_contamination" + mcore_skills = "mcore_skills" GENERATION_MODULE_MAP = { GenerationType.generate: "nemo_skills.inference.generate", GenerationType.math_judge: "nemo_skills.inference.llm_math_judge", GenerationType.check_contamination: "nemo_skills.inference.check_contamination", + GenerationType.mcore_skills: "nemo_skills.inference.mcore_skills", } diff --git a/nemo_skills/inference/generate.py b/nemo_skills/inference/generate.py index b3f0094c1a..ab76739c21 100644 --- a/nemo_skills/inference/generate.py +++ b/nemo_skills/inference/generate.py @@ -273,6 +273,38 @@ def _get_disallowed_params(self): class GenerationTask: + # --- Declarative pipeline attributes --- + # Subclasses can override to declare their runtime needs generically. + # The pipeline reads these instead of hardcoding module-name checks. + + # Container key in cluster_config["containers"]; None means use "nemo-skills" default. + CONTAINER_KEY: str | None = None + + # Whether to wrap the command with torchrun for multi-GPU data-parallel inference. + USE_TORCHRUN: bool = False + + @classmethod + def is_self_contained(cls, extra_arguments: str = "") -> bool: + """Whether this task manages its own model (no NeMo Skills server). + + Override in subclasses. *extra_arguments* is the raw CLI extra args string + so that the decision can depend on runtime flags (e.g. model_type). + """ + return False + + @classmethod + def get_env_prefix(cls) -> str: + """Shell commands prepended before the main command (e.g. env exports). + + Return an empty string if no special environment is needed. + """ + return "" + + @classmethod + def get_extra_package_dirs(cls) -> list[str]: + """Extra directories to package alongside nemo_run code.""" + return [] + @classmethod def get_generation_default_args(cls) -> str: """ diff --git a/nemo_skills/inference/mcore_skills.py b/nemo_skills/inference/mcore_skills.py new file mode 100644 index 0000000000..d6eccd9bd8 --- /dev/null +++ b/nemo_skills/inference/mcore_skills.py @@ -0,0 +1,542 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""NeMo Skills generation via VLMEvalKit MultiModalMCore in-process. + +This module implements Option A from the plan: read NeMo Skills JSONL, fill +prompts with NeMo Skills prompt config, run inference through MultiModalMCore +synchronously, write NeMo Skills-format JSONL. No HTTP server; evaluation +remains NeMo Skills metrics on the output. + +When run with torchrun (multi-GPU), all ranks participate in model.generate(); +only rank 0 performs file I/O. +""" + +import json +import logging +import os +import re +from dataclasses import field +from pathlib import Path + +import hydra +from omegaconf import MISSING +from tqdm import tqdm +from transformers import AutoTokenizer + +from nemo_skills.prompt.utils import get_prompt +from nemo_skills.utils import chunk_data, get_logger_name, nested_dataclass + +LOG = logging.getLogger(get_logger_name(__file__)) + +try: + from nemo_skills.inference.generate import GenerationTask +except ImportError: + GenerationTask = None + +if GenerationTask is not None: + _get_server_command_fn = GenerationTask.get_server_command_fn +else: + + @classmethod + def _get_server_command_fn(cls): + from nemo_skills.pipeline.utils import get_server_command + + return get_server_command + + +@nested_dataclass(kw_only=True) +class MegatronMCoreConfig: + """Configuration for MegatronMCore NeMo Skills generation.""" + + input_file: str = MISSING + output_file: str = MISSING + + # Prompt config for text-only data (used by fill_prompt). Not needed when the + # input JSONL already contains OpenAI-format 'messages' (e.g. asr-leaderboard). + prompt_config: str | None = None + + # Tokenizer for prompt filling (format_as_string=True). HF model name or path. + # Required when prompt_config is set; optional for messages-only data. + tokenizer: str | None = None + + # MultiModalMCore model + model_config: str = MISSING + load_dir: str | None = None + load_ckpt: str | None = None + reasoning: bool = False + + # Prompt options (mirror GenerationTaskConfig where needed) + code_tags: str | None = None + examples_type: str | None = None + system_message: str | None = None + start_assistant_response_key: str | None = None + chat_template_kwargs: dict = field(default_factory=dict) + + # Base directory to resolve relative audio/image paths (e.g. NEMO_SKILLS_DATA_DIR). + data_dir: str = "" + + # Generation limits and resume + max_samples: int = -1 + skip_filled: bool = False + num_chunks: int | None = None + chunk_id: int | None = None + + # Output + generation_key: str = "generation" + add_generation_stats: bool = True + async_position_key: str = "_async_position" + + dry_run: bool = False + + # Dataset name passed to MultiModalMCore.generate() — used by VLMEvalKit internally + # for dataset-specific logic (e.g. video tile config). Defaults to "nemo_skills". + dataset_name: str = "nemo_skills" + + # Accepted from pipeline/dataset modules but unused by mcore_skills (avoid Hydra errors). + # These come via ++key=value overrides from dataset modules (e.g. asr-leaderboard). + eval_config: dict = field(default_factory=dict) + eval_type: str | None = None + prompt_format: str | None = None + enable_audio: bool = False + + +def _make_mcore_model(cfg: MegatronMCoreConfig): + from vlmeval.vlm.multimodal_mcore.model import MultiModalMCore + + return MultiModalMCore( + model_config=cfg.model_config, + load_dir=cfg.load_dir, + load_ckpt=cfg.load_ckpt, + reasoning=cfg.reasoning, + ) + + +class MegatronMCoreGenerationTask: + """Generation task using NeMo Skills data + prompts and VLMEvalKit MultiModalMCore in-process.""" + + get_server_command_fn = _get_server_command_fn + + # --- Declarative pipeline attributes (read generically by pipeline/eval.py) --- + CONTAINER_KEY = "eval_kit" + USE_TORCHRUN = True + # Metrics are computed by VLMEvalKit (asr_wer etc.) and saved as + # eval_kit_metrics.json — tell the summarize step to use EvalKitMetrics. + METRICS_TYPE_OVERRIDE = "eval_kit" + + @classmethod + def is_self_contained(cls, extra_arguments: str = "") -> bool: + """Always self-contained (in-process MultiModalMCore, no HTTP server).""" + return True + + @classmethod + def get_env_prefix(cls) -> str: + """Shell env setup prepended before the main command (Megatron/VLMEvalKit needs).""" + return ( + 'export LMUData="${LMUData:-${LMUDATA:-}}" && ' + 'export LD_LIBRARY_PATH=/opt/hpcx/ucx/lib:${LD_LIBRARY_PATH:-} && ' + 'export MKL_THREADING_LAYER=GNU && ' + 'export OMP_NUM_THREADS=1 && ' + 'export MKL_NUM_THREADS=1 && ' + 'ldconfig && ' + # Create empty .env so VLMEvalKit's load_env() doesn't emit ERROR logs. + 'touch /nemo_run/code/.env 2>/dev/null; ' + ) + + @classmethod + def get_extra_package_dirs(cls) -> list[str]: + """Directories to package alongside nemo_run code (VLMEvalKit vlmeval/).""" + vlmevalkit_path = os.environ.get("NEMO_SKILLS_VLMEVALKIT_PATH") + if vlmevalkit_path: + pkg = os.path.join(vlmevalkit_path, "vlmeval") + if os.path.isdir(pkg): + return [pkg] + return [] + + @classmethod + def get_generation_default_args(cls): + return "" + + @classmethod + def get_generation_requirements(cls): + return None + + def __init__(self, cfg: MegatronMCoreConfig): + self.cfg = cfg + # Prompt is only needed for text-only data (no 'messages' field). + # For multimodal data with OpenAI-format messages, _build_mcore_messages + # extracts content directly — no prompt template required. + if cfg.prompt_config: + self.prompt = get_prompt( + prompt_config=cfg.prompt_config, + tokenizer=cfg.tokenizer, + code_tags=cfg.code_tags, + examples_type=cfg.examples_type, + system_message=cfg.system_message, + ) + else: + self.prompt = None + self.model = _make_mcore_model(cfg) + + def load_data(self): + data = [] + with open(self.cfg.input_file, "rt", encoding="utf-8") as fin: + for line in fin: + data.append(json.loads(line)) + if self.cfg.num_chunks is not None and self.cfg.chunk_id is not None: + data, self.cfg.output_file = chunk_data( + data, self.cfg.output_file, self.cfg.chunk_id, self.cfg.num_chunks + ) + LOG.info( + "Chunking: %d chunks, processing chunk %d; samples in chunk: %d", + self.cfg.num_chunks, + self.cfg.chunk_id, + len(data), + ) + if self.cfg.max_samples > 0: + data = data[: self.cfg.max_samples] + return data + + def skip_completed_samples(self, data: list) -> list: + if not self.cfg.skip_filled or not Path(self.cfg.output_file).exists(): + return data + filled = 0 + with open(self.cfg.output_file, "rt", encoding="utf-8") as fin: + for _ in fin: + filled += 1 + if filled >= len(data): + return [] + return data[filled:] + + def fill_prompt(self, data_point: dict, data: list) -> str: + from copy import deepcopy + + data_point = deepcopy(data_point) + filled = self.prompt.fill( + data_point, + start_assistant_response_key=self.cfg.start_assistant_response_key, + chat_template_kwargs=self.cfg.chat_template_kwargs or {}, + format_as_string=True, + ) + return filled if isinstance(filled, str) else str(filled) + + def _get_data_dir(self) -> str: + """Return the effective data_dir from cfg or eval_config.""" + data_dir = getattr(self.cfg, "data_dir", None) or "" + if not data_dir and getattr(self.cfg, "eval_config", None): + data_dir = self.cfg.eval_config.get("data_dir") or "" + return data_dir + + def _resolve_path(self, path: str) -> str: + """Resolve a media file path, handling relative paths and mount mismatches. + + 1. Relative paths are joined with data_dir. + 2. Absolute paths that don't exist on disk are retried relative to data_dir + (handles mount mismatches, e.g. JSONL has /dataset/... but data is at /data/...). + """ + if not path: + return path + data_dir = self._get_data_dir() + if not os.path.isabs(path): + if data_dir: + return os.path.join(data_dir, path) + return path + # Absolute path — use as-is if it exists + if os.path.exists(path): + return path + # Absolute path doesn't exist — try stripping the first directory component + # and re-rooting under data_dir (e.g. /dataset/asr-leaderboard/... → /data/asr-leaderboard/...) + if data_dir: + # Strip leading /mount_name/ to get the relative portion + parts = path.strip("/").split("/", 1) + if len(parts) == 2: + relative = parts[1] + candidate = os.path.join(data_dir, relative) + if os.path.exists(candidate): + return candidate + return path + + def _build_mcore_messages(self, data_point: dict) -> list | None: + """Convert a NeMo Skills data point into MultiModalMCore message list. + + If the data point has a 'messages' field (OpenAI format), converts it to + list[dict] with types: "text", "image", "sound". + + Only user/assistant message text is included — system messages are skipped + because MultiModalMCore's generate_inner() builds its own prompt template + with system/user roles internally. + + If no 'messages' field, returns None (caller should use fill_prompt for text-only). + """ + messages = data_point.get("messages") + if not messages: + return None + + mcore: list[dict] = [] + text_parts: list[str] = [] + + for msg in messages: + if not isinstance(msg, dict): + continue + role = msg.get("role", "") + content = msg.get("content", "") + + # Skip system messages — generate_inner builds its own system prompt. + if role == "system": + continue + + # Audio: single or multiple + if "audio" in msg: + audio = msg["audio"] + if isinstance(audio, dict) and "path" in audio: + path = self._resolve_path(audio["path"]) + mcore.append({"type": "sound", "value": path, "sample_rate": 16000}) + if "audios" in msg: + for audio in msg["audios"]: + if isinstance(audio, dict) and "path" in audio: + path = self._resolve_path(audio["path"]) + mcore.append({"type": "sound", "value": path, "sample_rate": 16000}) + + # Content: str or list of content items (text, image_url) + if isinstance(content, str): + if content.strip(): + text_parts.append(content.strip()) + elif isinstance(content, list): + for item in content: + if isinstance(item, dict): + if item.get("type") == "text" and "text" in item: + text_parts.append(item["text"].strip()) + elif item.get("type") == "image_url": + image_url = item.get("image_url") or {} + url = image_url.get("url", "") + if url.startswith("file://"): + path = url[7:] + else: + path = url + if path: + path = self._resolve_path(path) + mcore.append({"type": "image", "value": path}) + + combined_text = "\n".join(t for t in text_parts if t) + if combined_text: + mcore.append({"type": "text", "value": combined_text}) + + if not mcore: + return None + return mcore + + def dump_outputs(self, outputs: list, fout): + for out in outputs: + fout.write(json.dumps(out) + "\n") + + @staticmethod + def _strip_thinking_tags(text: str) -> str: + """Strip ... tags (including empty ones) from model output.""" + return re.sub(r'.*?', '', text, flags=re.DOTALL).strip() + + def _generate_for_sample(self, data_point: dict, data: list) -> str: + """Run model inference for a single data point. Returns generated text.""" + message_list = self._build_mcore_messages(data_point) + if message_list is not None: + raw = self.model.generate(message_list, dataset=self.cfg.dataset_name) + return self._strip_thinking_tags(raw) + if self.prompt is None: + raise ValueError( + "Data point has no 'messages' field and prompt_config is not set. " + "Either provide ++prompt_config for text-only data or ensure " + "the input JSONL contains OpenAI-format 'messages'." + ) + prompt_str = self.fill_prompt(data_point, data) + raw = self.model.generate( + [{"type": "text", "value": prompt_str}], + dataset=self.cfg.dataset_name, + ) + return self._strip_thinking_tags(raw) + + def generate(self): + import sys + + # Use Megatron DP rank/size for data sharding (matches VLMEvalKit pattern). + # With data_parallel=True in generate_and_post_process, each DP rank runs + # generation independently on its shard while TP ranks synchronise internally. + dp_rank = self.model.get_dp_rank() + dp_size = self.model.get_dp_size() + + output_dir = Path(self.cfg.output_file).absolute().parent + if dp_rank == 0: + output_dir.mkdir(parents=True, exist_ok=True) + + data = self.load_data() + data = self.skip_completed_samples(data) + if not data: + if dp_rank == 0: + LOG.info("No data to process, skipping generation") + return + if self.cfg.dry_run: + if dp_rank == 0: + LOG.info("Dry run: would process %d samples", len(data)) + return + + # Round-robin shard by dp_rank (same strategy as VLMEvalKit infer_data). + my_indices = list(range(dp_rank, len(data), dp_size)) + my_data = [data[i] for i in my_indices] + + if dp_rank == 0: + LOG.info( + "Data parallelism: dp_size=%d, total=%d, this rank=%d samples", + dp_size, len(data), len(my_data), + ) + + # Per-rank output file — visible during the run so progress can be + # monitored (e.g. ``wc -l output_rank*.jsonl``). Contains a + # ``_dp_global_idx`` field used for ordered merging at the end. + rank_file = output_dir / f"output_rank{dp_rank}.jsonl" + + # Suppress VLMEvalKit's per-sample print() on non-primary DP ranks to + # avoid 8x duplicate output in logs. + _real_stdout = sys.stdout + if dp_rank != 0: + sys.stdout = open(os.devnull, "w") + + try: + with open(rank_file, "w", encoding="utf-8") as fout: + iterator = tqdm(my_data, desc=f"mcore_skills[dp{dp_rank}]") if dp_rank == 0 else my_data + for local_idx, data_point in enumerate(iterator): + global_idx = my_indices[local_idx] + gen = self._generate_for_sample(data_point, data) + output = { + "_dp_global_idx": global_idx, + self.cfg.generation_key: gen, + **{k: v for k, v in data_point.items() if k != self.cfg.async_position_key}, + } + fout.write(json.dumps(output) + "\n") + fout.flush() + finally: + if dp_rank != 0: + sys.stdout.close() + sys.stdout = _real_stdout + + # Barrier: wait for all DP ranks to finish writing. + import torch.distributed as dist + + if dist.is_initialized(): + dist.barrier() + + # Rank 0 merges per-rank files into the final ordered output. + if dp_rank == 0: + all_results: dict[int, str] = {} + for r in range(dp_size): + rf = output_dir / f"output_rank{r}.jsonl" + if rf.exists() and rf.stat().st_size > 0: + with open(rf, "rt", encoding="utf-8") as fin: + for line in fin: + entry = json.loads(line) + idx = entry.pop("_dp_global_idx") + all_results[idx] = json.dumps(entry) + + mode = "a" if self.cfg.skip_filled and Path(self.cfg.output_file).exists() else "w" + merged_lines = [all_results[idx] + "\n" for idx in sorted(all_results.keys())] + with open(self.cfg.output_file, mode, encoding="utf-8") as fout: + fout.writelines(merged_lines) + LOG.info( + "Merged %d results from %d DP ranks into %s", + len(all_results), dp_size, self.cfg.output_file, + ) + + # Clean up per-rank files after successful merge. + for r in range(dp_size): + rf = output_dir / f"output_rank{r}.jsonl" + rf.unlink(missing_ok=True) + + Path(f"{self.cfg.output_file}.done").touch() + + # Evaluate using VLMEvalKit (same as eval_kit.py does). + self._evaluate_results() + + def _evaluate_results(self): + """Compute metrics using VLMEvalKit's evaluation functions. + + Uses the same asr_wer() that eval_kit.py calls via dataset.evaluate(), + so metrics are identical. Saves eval_kit_metrics.json (consumed by + EvalKitMetrics in the summarize step). + """ + output_file = self.cfg.output_file + if not output_file or not Path(output_file).exists(): + return + + output_path = Path(output_file) + + try: + from vlmeval.dataset.avlm.utils import asr_wer + + # Read entries and build VLMEvalKit-format results list + entries = [] + results = [] + with open(output_file, "rt", encoding="utf-8") as fin: + for line in fin: + entry = json.loads(line) + # Strip leftover tags (older runs may have them) + gen = entry.get("generation", "") + cleaned = self._strip_thinking_tags(gen) + if cleaned != gen: + entry["generation"] = cleaned + entries.append(entry) + results.append({ + "gt": entry.get("expected_answer", ""), + "pred": entry["generation"], + }) + + # Re-write output.jsonl with cleaned generations + with open(output_file, "w", encoding="utf-8") as fout: + for entry in entries: + fout.write(json.dumps(entry) + "\n") + + # Compute WER using VLMEvalKit (same function as eval_kit path) + wer_score = asr_wer(results) + LOG.info("ASR WER: %.2f%%", wer_score) + + # Save as eval_kit_metrics.json (same format eval_kit.py writes) + metrics = {"wer": wer_score} + metrics_file = output_path.parent / "eval_kit_metrics.json" + with open(metrics_file, "w", encoding="utf-8") as f: + json.dump(metrics, f, indent=2) + LOG.info("Metrics saved to %s", metrics_file) + + except ImportError: + LOG.warning( + "VLMEvalKit asr_wer not available — skipping eval-kit-style metrics. " + "The summarize_results job will compute metrics separately." + ) + except Exception: + LOG.exception("Inline metrics computation failed") + + +GENERATION_TASK_CLASS = MegatronMCoreGenerationTask + +cs = hydra.core.config_store.ConfigStore.instance() +cs.store(name="base_mcore_skills_config", node=MegatronMCoreConfig) + + +@hydra.main(version_base=None, config_name="base_mcore_skills_config") +def main(cfg: MegatronMCoreConfig): + cfg = MegatronMCoreConfig(_init_nested=True, **cfg) + task = MegatronMCoreGenerationTask(cfg) + task.generate() + + +if __name__ == "__main__": + import nemo_skills.utils as utils + + utils.setup_logging() + main() diff --git a/nemo_skills/pipeline/eval.py b/nemo_skills/pipeline/eval.py index 1f1557a4f9..7f2b6faaec 100644 --- a/nemo_skills/pipeline/eval.py +++ b/nemo_skills/pipeline/eval.py @@ -37,6 +37,36 @@ LOG = logging.getLogger(get_logger_name(__file__)) +def _apply_task_overrides(combined_cmd, task_classes, job_num_gpus, cluster_config): + """Apply env/torchrun/container overrides declared by generation task classes. + + Returns (modified_cmd, container). + """ + # Environment prefix (first non-empty wins; jobs are not mixed across task types) + for tc in task_classes: + prefix = tc.get_env_prefix() if hasattr(tc, "get_env_prefix") else "" + if prefix: + combined_cmd = f'{prefix}{combined_cmd}' + break + + # Torchrun for multi-GPU data-parallel inference + if any(getattr(tc, "USE_TORCHRUN", False) for tc in task_classes): + if job_num_gpus and int(job_num_gpus) > 1: + combined_cmd = combined_cmd.replace( + 'python -m ', f'torchrun --nproc_per_node {job_num_gpus} -m ', 1 + ) + + # Container selection (task class CONTAINER_KEY, falling back to nemo-skills default) + container = cluster_config["containers"]["nemo-skills"] + for tc in task_classes: + key = getattr(tc, "CONTAINER_KEY", None) + if key and key in cluster_config.get("containers", {}): + container = cluster_config["containers"][key] + break + + return combined_cmd, container + + class SingleNodeMode(str, enum.Enum): sequential = "sequential" parallel = "parallel" @@ -587,6 +617,22 @@ def eval( get_random_port = pipeline_utils.should_get_random_port(server_gpus, exclusive) should_package_extra_datasets = extra_datasets and extra_datasets_type == ExtraDatasetType.local + + # Build extra_package_dirs: include extra_datasets + any dirs declared by generation task classes + extra_pkg_dirs = [] + if should_package_extra_datasets: + extra_pkg_dirs.append(extra_datasets) + seen_pkg_dirs = set() + for ba in benchmarks_dict.values(): + task_cls = ba.generation_task_class + if task_cls is not None and hasattr(task_cls, "get_extra_package_dirs"): + for pkg_dir in task_cls.get_extra_package_dirs(): + if pkg_dir not in seen_pkg_dirs: + seen_pkg_dirs.add(pkg_dir) + extra_pkg_dirs.append(pkg_dir) + LOG.info("Packaging extra dir from %s: %s", task_cls.__name__, pkg_dir) + extra_pkg_dirs = extra_pkg_dirs or None + has_tasks = False job_id_to_tasks = {} benchmark_to_judge_tasks = {} @@ -605,19 +651,33 @@ def eval( job_server_address, job_server_command, job_sandbox_env_overrides, + job_num_gpus, ) = job_args prev_tasks = _task_dependencies for _ in range(dependent_jobs + 1): has_tasks = True + combined_cmd = pipeline_utils.wrap_python_path(cmd=combine_cmds(cmds, single_node_mode)) + + # Apply env/torchrun/container overrides from generation task classes + job_task_classes = [ + benchmarks_dict[b].generation_task_class + for b in job_benchmarks + if benchmarks_dict[b].generation_task_class is not None + ] + combined_cmd, job_container = _apply_task_overrides( + combined_cmd, job_task_classes, job_num_gpus, cluster_config + ) + new_task = pipeline_utils.add_task( exp, - cmd=pipeline_utils.wrap_python_path(cmd=combine_cmds(cmds, single_node_mode)), + cmd=combined_cmd, task_name=f"{expname}-{'-'.join(job_benchmarks)}", log_dir=log_dir, - container=cluster_config["containers"]["nemo-skills"], + container=job_container, cluster_config=cluster_config, partition=partition, + num_gpus=job_num_gpus, server_config=job_server_config, with_sandbox=job_needs_sandbox or with_sandbox, keep_mounts_for_sandbox=job_needs_sandbox_to_keep_mounts or keep_mounts_for_sandbox, @@ -630,7 +690,7 @@ def eval( prev_tasks if cluster_config["executor"] == "slurm" else all_tasks + _task_dependencies ), get_server_command=job_server_command, - extra_package_dirs=[extra_datasets] if should_package_extra_datasets else None, + extra_package_dirs=extra_pkg_dirs, sbatch_kwargs=sbatch_kwargs, installation_command=installation_command, skip_hf_home_check=skip_hf_home_check, @@ -777,8 +837,10 @@ def eval( command += f" --wandb_project={wandb_project} " if data_dir: command += f" --data_dir={data_dir} " - if metric_type: - command += f" --metric_type={metric_type} " + # Use per-benchmark metric_type if available, otherwise fall back to global + effective_metric_type = benchmark_args.metric_type or metric_type + if effective_metric_type: + command += f" --metric_type={effective_metric_type} " if metrics_kwargs: command += f" --metrics_kwargs='{kwargs_to_string(metrics_kwargs)}' " diff --git a/nemo_skills/pipeline/utils/eval.py b/nemo_skills/pipeline/utils/eval.py index 9750659e5b..1f10cf28f7 100644 --- a/nemo_skills/pipeline/utils/eval.py +++ b/nemo_skills/pipeline/utils/eval.py @@ -29,10 +29,23 @@ LOG = logging.getLogger(get_logger_name(__file__)) +def _resolve_generation_task_class(module_name: str): + """Import a generation module and return its GENERATION_TASK_CLASS, or None.""" + try: + if module_name.endswith(".py") or os.sep in module_name: + path_suffix = ".py" if not module_name.endswith(".py") else "" + mod = import_from_path(module_name + path_suffix) + else: + mod = importlib.import_module(module_name) + return getattr(mod, "GENERATION_TASK_CLASS", None) + except Exception: + return None + + @dataclass class BenchmarkArgs: name: str - input_file: str + input_file: str | None generation_args: str judge_args: str judge_pipeline_args: dict @@ -44,10 +57,15 @@ class BenchmarkArgs: eval_subfolder: str benchmark_group: str | None = None score_module: str | None = None + metric_type: str | None = None + self_contained_task: bool = False + num_gpus: int | None = None # For self-contained tasks that need GPU allocation on the main task job_ids: list[int] = field(default_factory=list) remaining_jobs: list[dict] = field(default_factory=list) # Per-benchmark sandbox environment overrides in KEY=VALUE form sandbox_env_overrides: list[str] = field(default_factory=list) + # Resolved GENERATION_TASK_CLASS (populated by prepare_eval_commands) + generation_task_class: type | None = None @property def requires_judge(self): @@ -87,39 +105,44 @@ def get_benchmark_args_from_module( benchmark_group=None, override_dict=None, ): + skip_input_file = getattr(benchmark_module, "SKIP_INPUT_FILE", False) + self_contained_task = getattr(benchmark_module, "SELF_CONTAINED_TASK", False) + if split is None: split = get_arg_from_module_or_dict(benchmark_module, "EVAL_SPLIT", "test", override_dict) - if not is_on_cluster: - if pipeline_utils.is_mounted_filepath(cluster_config, data_path) or cluster_config["executor"] == "none": + input_file = None + if not skip_input_file: + if not is_on_cluster: + if pipeline_utils.is_mounted_filepath(cluster_config, data_path) or cluster_config["executor"] == "none": + input_file = f"{data_path}/{benchmark.replace('.', '/')}/{split}.jsonl" + unmounted_input_file = pipeline_utils.get_unmounted_path(cluster_config, input_file) + unmounted_path = str(Path(__file__).parents[3] / unmounted_input_file.replace("/nemo_run/code/", "")) + else: + # will be copied over in this case as it must come from extra datasets + input_file = f"/nemo_run/code/{Path(data_path).name}/{benchmark.replace('.', '/')}/{split}.jsonl" + unmounted_path = Path(data_path) / benchmark.replace(".", "/") / f"{split}.jsonl" + else: + # on cluster we will always use the mounted path input_file = f"{data_path}/{benchmark.replace('.', '/')}/{split}.jsonl" - unmounted_input_file = pipeline_utils.get_unmounted_path(cluster_config, input_file) - unmounted_path = str(Path(__file__).parents[3] / unmounted_input_file.replace("/nemo_run/code/", "")) + unmounted_path = pipeline_utils.get_unmounted_path(cluster_config, input_file) + + unmounted_path = str(unmounted_path) + # checking if data file exists (can check locally as well) + if is_on_cluster: + if not pipeline_utils.cluster_path_exists(cluster_config, unmounted_path): + raise ValueError( + f"Data file {unmounted_path} does not exist on cluster. " + "Please check the benchmark and split parameters. " + "Did you forget to run prepare data commands or add data_dir argument?" + ) else: - # will be copied over in this case as it must come from extra datasets - input_file = f"/nemo_run/code/{Path(data_path).name}/{benchmark.replace('.', '/')}/{split}.jsonl" - unmounted_path = Path(data_path) / benchmark.replace(".", "/") / f"{split}.jsonl" - else: - # on cluster we will always use the mounted path - input_file = f"{data_path}/{benchmark.replace('.', '/')}/{split}.jsonl" - unmounted_path = pipeline_utils.get_unmounted_path(cluster_config, input_file) - - unmounted_path = str(unmounted_path) - # checking if data file exists (can check locally as well) - if is_on_cluster: - if not pipeline_utils.cluster_path_exists(cluster_config, unmounted_path): - raise ValueError( - f"Data file {unmounted_path} does not exist on cluster. " - "Please check the benchmark and split parameters. " - "Did you forget to run prepare data commands or add data_dir argument?" - ) - else: - if not Path(unmounted_path).exists(): - raise ValueError( - f"Data file {unmounted_path} does not exist locally. " - "Please check the benchmark and split parameters. " - "Did you forget to run prepare data commands or add data_dir argument?" - ) + if not Path(unmounted_path).exists(): + raise ValueError( + f"Data file {unmounted_path} does not exist locally. " + "Please check the benchmark and split parameters. " + "Did you forget to run prepare data commands or add data_dir argument?" + ) # this is deprecated, should remove in the future prompt_config = get_arg_from_module_or_dict(benchmark_module, "PROMPT_CONFIG", "", override_dict=override_dict) @@ -131,6 +154,11 @@ def get_benchmark_args_from_module( if eval_args: generation_args = f"{eval_args} {generation_args}" generation_args += f" ++eval_config.split={split} " + + # Let the dataset module inject extra generation args (e.g. ++vlm_dataset=) + if hasattr(benchmark_module, "get_extra_generation_args"): + generation_args += benchmark_module.get_extra_generation_args(benchmark) + requires_sandbox = get_arg_from_module_or_dict(benchmark_module, "REQUIRES_SANDBOX", False, override_dict) keep_mounts_for_sandbox = get_arg_from_module_or_dict( benchmark_module, "KEEP_MOUNTS_FOR_SANDBOX", False, override_dict @@ -153,6 +181,8 @@ def get_benchmark_args_from_module( if num_chunks == 0: num_chunks = None + metric_type = getattr(benchmark_module, "METRICS_TYPE", None) + if judge_args or judge_pipeline_args or eval_requires_judge: # setting to a tmp folder for judge and then the judged outputs will be in main eval-results folder eval_subfolder = "tmp-eval-results/" @@ -184,6 +214,8 @@ def get_benchmark_args_from_module( eval_subfolder=eval_subfolder, benchmark_group=benchmark_group, sandbox_env_overrides=sandbox_env_overrides, + metric_type=metric_type, + self_contained_task=self_contained_task, ) @@ -330,6 +362,24 @@ def prepare_eval_commands( ): LOG.warning("Found benchmark (%s) which requires sandbox to keep mounts, enabling it.", benchmark) + # Resolve GENERATION_TASK_CLASS for each benchmark and query declarative attributes. + # Each task class declares its own is_self_contained(), get_env_prefix(), etc. + for ba in benchmarks_dict.values(): + effective_module = generation_module or ba.generation_module + task_cls = _resolve_generation_task_class(effective_module) + ba.generation_task_class = task_cls + if task_cls is not None and hasattr(task_cls, "is_self_contained"): + if task_cls.is_self_contained(extra_arguments): + ba.self_contained_task = True + if server_parameters["server_gpus"]: + ba.num_gpus = server_parameters["server_gpus"] + # Allow task class to override metric_type (e.g. mcore_skills uses + # VLMEvalKit evaluation and writes eval_kit_metrics.json). + if task_cls is not None and hasattr(task_cls, "METRICS_TYPE_OVERRIDE"): + ba.metric_type = task_cls.METRICS_TYPE_OVERRIDE + + has_self_contained = any(ba.self_contained_task for ba in benchmarks_dict.values()) + total_evals = 0 for benchmark, benchmark_args in benchmarks_dict.items(): if benchmark_args.num_samples == 0: @@ -360,6 +410,12 @@ def prepare_eval_commands( # if num_jobs is -1, we run all benchmarks in parallel num_jobs = total_evals + # Self-contained tasks (e.g., eval_kit mcore mode) bypass the server/client split + # and manage their own GPU allocation, so each benchmark must get its own job (no grouping). + if has_self_contained and num_jobs != total_evals: + LOG.info("Self-contained tasks detected, forcing num_jobs = total_evals (no job grouping).") + num_jobs = total_evals + if num_jobs == 0: return benchmarks_dict, [] @@ -372,6 +428,7 @@ def prepare_eval_commands( cur_job_idx = 0 get_random_port = pipeline_utils.should_get_random_port(server_parameters["server_gpus"], exclusive) + job_server_config, job_server_address, job_extra_arguments = pipeline_utils.configure_client( **server_parameters, extra_arguments=extra_arguments, @@ -425,10 +482,13 @@ def prepare_eval_commands( "which is not supported for evaluation when grouping jobs." ) + # Self-contained tasks don't use NeMo Skills server, so skip + # server-related args that configure_client adds to job_extra_arguments + effective_extra_args = extra_arguments if benchmark_args.self_contained_task else job_extra_arguments full_extra_arguments = ( f"{generation_task.get_generation_default_args()} " f"{benchmark_args.generation_args} " - f"{job_extra_arguments} " + f"{effective_extra_args} " ) cmd = pipeline_utils.get_generation_cmd( @@ -468,6 +528,18 @@ def prepare_eval_commands( env_source[key] = b job_sandbox_env_overrides = [f"{k}={v}" for k, v in env_map.items()] + # For self-contained tasks, override server config and get num_gpus + job_num_gpus = None + is_self_contained_job = any(benchmarks_dict[b].self_contained_task for b in job_benchmarks) + if is_self_contained_job: + effective_server_config = None + for b in job_benchmarks: + if benchmarks_dict[b].num_gpus is not None: + job_num_gpus = benchmarks_dict[b].num_gpus + break + else: + effective_server_config = job_server_config + # TODO: move to a dataclass job_batches.append( ( @@ -475,23 +547,24 @@ def prepare_eval_commands( job_benchmarks, job_needs_sandbox, job_needs_sandbox_to_keep_mounts, - job_server_config, + effective_server_config, job_server_address, # a check above guarantees that this is the same for all tasks in a job generation_task.get_server_command_fn(), job_sandbox_env_overrides, + job_num_gpus, ) ) - job_server_config, job_server_address, job_extra_arguments = pipeline_utils.configure_client( - **server_parameters, - extra_arguments=extra_arguments, - get_random_port=get_random_port, - ) for job_benchmark in job_benchmarks: benchmarks_dict[job_benchmark].job_ids.append(cur_job_idx) cur_job_idx += 1 job_cmds = [] job_benchmarks = set() + job_server_config, job_server_address, job_extra_arguments = pipeline_utils.configure_client( + **server_parameters, + extra_arguments=extra_arguments, + get_random_port=get_random_port, + ) cur_eval += 1 diff --git a/nemo_skills/pipeline/utils/generation.py b/nemo_skills/pipeline/utils/generation.py index 4432181cff..63a83a5b94 100644 --- a/nemo_skills/pipeline/utils/generation.py +++ b/nemo_skills/pipeline/utils/generation.py @@ -136,9 +136,15 @@ def build_requirements_venv_cmd(requirements: list[str]) -> str: 'mkdir -p "$VENV_ROOT" && ' 'if [ ! -f "$READY_FILE" ]; then ' ' if mkdir "$LOCK_DIR" 2>/dev/null; then ' - ' if ! uv venv --system-site-packages "$VENV_DIR"; then rmdir "$LOCK_DIR"; exit 1; fi; ' - ' . "$VENV_DIR/bin/activate"; ' - ' if ! uv pip install -r "$REQS_FILE"; then rmdir "$LOCK_DIR"; exit 1; fi; ' + ' if command -v uv >/dev/null 2>&1; then ' + ' if ! uv venv --system-site-packages "$VENV_DIR"; then rmdir "$LOCK_DIR"; exit 1; fi; ' + ' . "$VENV_DIR/bin/activate"; ' + ' if ! uv pip install -r "$REQS_FILE"; then rmdir "$LOCK_DIR"; exit 1; fi; ' + " else " + ' if ! python3 -m venv --system-site-packages "$VENV_DIR"; then rmdir "$LOCK_DIR"; exit 1; fi; ' + ' . "$VENV_DIR/bin/activate"; ' + ' if ! python3 -m pip install -r "$REQS_FILE"; then rmdir "$LOCK_DIR"; exit 1; fi; ' + " fi; " ' touch "$READY_FILE"; ' ' rmdir "$LOCK_DIR"; ' " else " @@ -434,8 +440,6 @@ def get_generation_cmd( If requirements are provided, a per-requirements uv venv is prepared and activated before running the generation command. """ - if input_file is None and input_dir is None: - raise ValueError("Either input_file or input_dir must be provided.") if input_file is not None and input_dir is not None: raise ValueError("Please provide either input_file or input_dir, not both.") @@ -458,7 +462,9 @@ def get_generation_cmd( hydra_config_args, override_args = separate_hydra_args(extra_arguments) # Handle file paths vs module names - common_args = f"++skip_filled=True ++input_file={input_file} ++output_file={output_file}" + common_args = f"++skip_filled=True ++output_file={output_file}" + if input_file is not None: + common_args += f" ++input_file={input_file}" if script.endswith(".py") or os.sep in script: # It's a file path, run it directly with .py extension script_path = script if script.endswith(".py") else f"{script}.py" diff --git a/nemo_skills/training/nemo_rl/start_grpo.py b/nemo_skills/training/nemo_rl/start_grpo.py index 82e34b2dda..26588d2250 100644 --- a/nemo_skills/training/nemo_rl/start_grpo.py +++ b/nemo_skills/training/nemo_rl/start_grpo.py @@ -230,7 +230,9 @@ def setup_data( "env_cls", "nemo_skills.training.nemo_rl.environments.math_environment.MathEnvironment", ) - ACTOR_ENVIRONMENT_REGISTRY[env_cls_path] = PY_EXECUTABLES.SYSTEM + py_executable_str = env_configs["math"].get("py_executable", "system") + py_executable = getattr(PY_EXECUTABLES, py_executable_str.upper()) + ACTOR_ENVIRONMENT_REGISTRY[env_cls_path] = py_executable module_name, class_name = env_cls_path.rsplit(".", 1) env_module = importlib.import_module(module_name) @@ -315,18 +317,37 @@ def main() -> None: val_task_to_env, ) = setup_data(tokenizer, config["data"], config["env"]) - ( - policy, - policy_generation, - cluster, - dataloader, - val_dataloader, - loss_fn, - logger, - checkpointer, - grpo_state, - master_config, - ) = setup(config, tokenizer, dataset, val_dataset) + setup_result = setup(config, tokenizer, dataset, val_dataset) + + if len(setup_result) == 10: # Nemo-RL main branch + ( + policy, + policy_generation, + cluster, + dataloader, + val_dataloader, + loss_fn, + logger, + checkpointer, + grpo_state, + master_config, + ) = setup_result + elif len(setup_result) == 11: # For Nano/Super nemo-RL + ( + policy, + policy_generation, + nemo_gym_actor, + cluster, + dataloader, + val_dataloader, + loss_fn, + logger, + checkpointer, + grpo_state, + master_config, + ) = setup_result + else: + raise ValueError(f"Expected 10 or 11 elements in setup_result, got {len(setup_result)}") # Check if async mode is enabled if "async_grpo" in config["grpo"] and config["grpo"]["async_grpo"]["enabled"]: diff --git a/requirements/eval-kit.txt b/requirements/eval-kit.txt new file mode 100644 index 0000000000..dcfc99670b --- /dev/null +++ b/requirements/eval-kit.txt @@ -0,0 +1,3 @@ +# VLMEvalKit (vlmeval) is installed at job start via --installation_command +# in run_eval_kit.sh (pip install from mounted cluster source). +# This file is kept as a placeholder; no venv-based requirements are needed.