diff --git a/nemo_skills/pipeline/eval.py b/nemo_skills/pipeline/eval.py index 1f1557a4f9..17e9337c5e 100644 --- a/nemo_skills/pipeline/eval.py +++ b/nemo_skills/pipeline/eval.py @@ -27,7 +27,12 @@ from nemo_skills.pipeline.app import app, typer_unpacker from nemo_skills.pipeline.generate import generate as _generate from nemo_skills.pipeline.utils import kwargs_to_string, parse_kwargs -from nemo_skills.pipeline.utils.eval import combine_cmds, prepare_eval_commands +from nemo_skills.pipeline.utils.declarative import Command, CommandGroup, HardwareConfig, Pipeline +from nemo_skills.pipeline.utils.eval import ( + EvalGenerationUnit, + prepare_eval_commands, +) +from nemo_skills.pipeline.utils.scripts import EvalClientScript, SandboxScript, ServerScript from nemo_skills.utils import ( get_logger_name, setup_logging, @@ -133,7 +138,6 @@ def _create_nvembed_judge_tasks( judge_pipeline_args, rerun_done, log_dir, - server_parameters, cluster_config, judge_server_gpus, judge_server_nodes, @@ -315,16 +319,38 @@ def eval( "If not specified, will use the registered generation module for the " "generation type (which is required in this case).", ), - model: str = typer.Option(None, help="Path to the model to be evaluated"), - server_address: str = typer.Option(None, help="Address of the server hosting the model"), - server_type: pipeline_utils.SupportedServers = typer.Option(..., help="Type of server to use"), - server_gpus: int = typer.Option(None, help="Number of GPUs to use if hosting the model"), - server_nodes: int = typer.Option(1, help="Number of nodes to use if hosting the model"), - server_args: str = typer.Option("", help="Additional arguments for the server"), - server_entrypoint: str = typer.Option( + model: List[str] = typer.Option( + None, + help="Path to the model(s) to be evaluated. CLI: space-separated. Python API: string or list. " + "Single value broadcasts to all models for multi-model evaluation.", + ), + server_address: List[str] = typer.Option( + None, + help="Server address(es). CLI: space-separated. Python API: string or list. Single value broadcasts to all models.", + ), + server_type: List[pipeline_utils.SupportedServers] = typer.Option( + ..., + help="Server type(s). CLI: space-separated. Python API: string or list. Single value broadcasts to all models.", + ), + server_gpus: List[int] = typer.Option( + None, + help="Number of GPUs per model if hosting. CLI: space-separated ints. Python API: int or list. " + "Single value broadcasts to all models.", + ), + server_nodes: List[int] = typer.Option( + [1], + help="Number of nodes per model. CLI: space-separated ints. Python API: int or list. " + "Single value broadcasts to all models.", + ), + server_args: List[str] = typer.Option( + [""], + help="Server arguments per model. CLI: space-separated. Python API: string or list. " + "Single value broadcasts to all models.", + ), + server_entrypoint: List[str] = typer.Option( None, help="Path to the entrypoint of the server. " - "If not specified, will use the default entrypoint for the server type.", + "CLI: space-separated. Python API: string or list. Single value broadcasts to all models.", ), judge_type: str = typer.Option("llm", help="Type of judge to use: 'llm' (default) or 'nvembed'"), judge_model: str = typer.Option(None, help="Path to the model to be used as a judge (if applicable)"), @@ -349,8 +375,10 @@ def eval( "If not specified, will use the registered generation module for the " "generation type.", ), - server_container: str = typer.Option( - None, help="Override container image for the hosted server (if server_gpus is set)" + server_container: List[str] = typer.Option( + None, + help="Override container image(s) for the hosted server(s) (if server_gpus is set). " + "CLI: space-separated. Python API: string or list. Single value broadcasts to all models.", ), extra_judge_args: str = typer.Option( "", help="Additional arguments for judge (passed to generate script, so should start with ++)" @@ -481,10 +509,14 @@ def eval( LOG.info("Starting evaluation job") LOG.info("Extra arguments that will be passed to the underlying script: %s", extra_arguments) - try: - server_type = server_type.value - except AttributeError: - pass + # Convert server_type enum values to strings + def convert_server_type_to_string(st): + return st.value if hasattr(st, "value") else st + + if isinstance(server_type, list): + server_type = [convert_server_type_to_string(st) for st in server_type] + else: + server_type = convert_server_type_to_string(server_type) try: extra_datasets_type = extra_datasets_type.value except AttributeError: @@ -508,6 +540,26 @@ def eval( else: wandb_parameters = None + # Normalize model configuration to list + models_list = pipeline_utils.normalize_models_config(model) + num_models = len(models_list) + + LOG.info(f"Number of models: {num_models}") + for model_idx, model_name in enumerate(models_list): + LOG.info(f" Model {model_idx}: {model_name}") + + server_types_list = pipeline_utils.normalize_parameter(server_type, num_models, "server_type") + server_gpus_list = pipeline_utils.normalize_parameter(server_gpus, num_models, "server_gpus") + server_nodes_list = pipeline_utils.normalize_parameter(server_nodes, num_models, "server_nodes") + server_args_list = pipeline_utils.normalize_parameter(server_args, num_models, "server_args") + server_entrypoints_list = pipeline_utils.normalize_parameter(server_entrypoint, num_models, "server_entrypoint") + server_containers_list = pipeline_utils.normalize_parameter(server_container, num_models, "server_container") + + if server_address is not None: + server_addresses_list = pipeline_utils.normalize_parameter(server_address, num_models, "server_address") + else: + server_addresses_list = [None] * num_models + server_parameters = { "model": model, "server_type": server_type, @@ -518,6 +570,7 @@ def eval( "server_entrypoint": server_entrypoint, "server_container": server_container, } + cli_judge_pipeline_args = { "model": judge_model, "server_type": judge_server_type, @@ -559,25 +612,25 @@ def eval( if " " in str(benchmarks): raise ValueError("benchmarks should be separated with commas") - benchmarks_dict, job_batches = prepare_eval_commands( - cluster_config, - benchmarks, - split, - extra_datasets, - num_jobs, - starting_seed, - output_dir, - num_chunks, - chunk_ids, - rerun_done, - server_parameters, - extra_arguments, - data_dir, - extra_datasets_type, - exclusive, - with_sandbox, - keep_mounts_for_sandbox, - wandb_parameters, + # Use a single shared code path for both single-model and multi-model eval: + # build structured "eval units" and run via declarative Pipeline (like ns generate). + benchmarks_dict, job_batches_units = prepare_eval_commands( + cluster_config=cluster_config, + benchmarks_or_groups=benchmarks, + split=split, + extra_datasets=extra_datasets, + num_jobs=num_jobs, + starting_seed=starting_seed, + output_dir=output_dir, + num_chunks=num_chunks, + chunk_ids=chunk_ids, + rerun_done=rerun_done, + extra_arguments=extra_arguments, + data_dir=data_dir, + extra_datasets_type=extra_datasets_type, + with_sandbox=with_sandbox, + keep_mounts_for_sandbox=keep_mounts_for_sandbox, + wandb_parameters=wandb_parameters, eval_requires_judge=eval_requires_judge, generation_type=generation_type, generation_module=generation_module, @@ -585,7 +638,6 @@ def eval( sbatch_kwargs = parse_kwargs(sbatch_kwargs, exclusive=exclusive, qos=qos, time_min=time_min) - get_random_port = pipeline_utils.should_get_random_port(server_gpus, exclusive) should_package_extra_datasets = extra_datasets and extra_datasets_type == ExtraDatasetType.local has_tasks = False job_id_to_tasks = {} @@ -595,50 +647,220 @@ def eval( _task_dependencies = [] with pipeline_utils.get_exp(expname, cluster_config, _reuse_exp) as exp: # scheduling main eval jobs - for idx, job_args in enumerate(job_batches): + has_tasks = True + + # Validate that pre-hosted models have server addresses (applies to both single & multi-model) + for model_idx in range(num_models): + if not (server_gpus_list[model_idx] is not None and int(server_gpus_list[model_idx] or 0) > 0): + if not server_addresses_list[model_idx]: + raise ValueError( + f"Model {model_idx} is not self-hosted (server_gpus=0/None) but server_address is missing. " + "Please provide --server-address (one per model, or a single value to broadcast)." + ) + + jobs = [] + job_names = [] + job_batch_to_last_job_name = {} + + # Pipeline-level: local/none executors should run sequentially + sequential = True if cluster_config["executor"] in ["local", "none"] else False + + for job_idx, job_args in enumerate(job_batches_units): ( - cmds, + units, job_benchmarks, job_needs_sandbox, job_needs_sandbox_to_keep_mounts, - job_server_config, - job_server_address, - job_server_command, job_sandbox_env_overrides, ) = job_args - prev_tasks = _task_dependencies - for _ in range(dependent_jobs + 1): - has_tasks = True - new_task = pipeline_utils.add_task( - exp, - cmd=pipeline_utils.wrap_python_path(cmd=combine_cmds(cmds, single_node_mode)), - task_name=f"{expname}-{'-'.join(job_benchmarks)}", - log_dir=log_dir, - container=cluster_config["containers"]["nemo-skills"], + task_name = f"{expname}-{'-'.join(job_benchmarks)}" + + # Build server scripts list (one per model, None if pre-hosted) + server_scripts: list[ServerScript | None] = [] + for model_idx in range(num_models): + if server_gpus_list[model_idx] is not None and int(server_gpus_list[model_idx] or 0) > 0: + server_scripts.append( + ServerScript( + server_type=server_types_list[model_idx], + model_path=models_list[model_idx], + cluster_config=cluster_config, + num_gpus=server_gpus_list[model_idx], + num_nodes=server_nodes_list[model_idx], + server_args=server_args_list[model_idx] or "", + server_entrypoint=server_entrypoints_list[model_idx], + port=None, + allocate_port=True, + ) + ) + else: + server_scripts.append(None) + + sandbox_script = None + sandbox_enabled = (job_needs_sandbox or with_sandbox) is True + if sandbox_enabled: + sandbox_script = SandboxScript( cluster_config=cluster_config, - partition=partition, - server_config=job_server_config, - with_sandbox=job_needs_sandbox or with_sandbox, - keep_mounts_for_sandbox=job_needs_sandbox_to_keep_mounts or keep_mounts_for_sandbox, - sandbox_port=None if get_random_port else 6000, - sandbox_env_overrides=job_sandbox_env_overrides, - run_after=run_after, - reuse_code_exp=reuse_code_exp, - reuse_code=reuse_code, - task_dependencies=( - prev_tasks if cluster_config["executor"] == "slurm" else all_tasks + _task_dependencies + keep_mounts=job_needs_sandbox_to_keep_mounts or keep_mounts_for_sandbox, + allocate_port=True, + env_overrides=job_sandbox_env_overrides, + ) + # Ensure sandbox runs on all nodes in the group (like the server does) + # This is critical for multi-node setups where client tasks need local sandbox access + sandbox_script.span_group_nodes = True + + # Convert units to dict payloads for EvalClientScript + unit_dicts = [] + for u in units: + if isinstance(u, EvalGenerationUnit): + unit_dicts.append( + { + "input_file": u.input_file, + "output_dir": u.output_dir, + "extra_arguments": u.extra_arguments, + "random_seed": u.random_seed, + "chunk_id": u.chunk_id, + "num_chunks": u.num_chunks, + "script": u.script, + "requirements": u.requirements, + "wandb_parameters": u.wandb_parameters, + "with_sandbox": u.with_sandbox, + } + ) + else: + unit_dicts.append(dict(u)) + + client_script = EvalClientScript( + units=unit_dicts, + single_node_mode=single_node_mode, + with_sandbox=sandbox_enabled, + servers=server_scripts, + server_addresses_prehosted=server_addresses_list, + model_names=models_list, + server_types=server_types_list, + sandbox=sandbox_script, + installation_command=installation_command, + ) + + # Build groups: group0 = (optional server0) + (optional sandbox) + client + groups = [] + + group0_components = [] + group0_server = server_scripts[0] if server_scripts else None + group_gpus = 0 + group_nodes = 1 + group_tasks = 1 + + if group0_server is not None: + group0_components.append( + Command( + script=group0_server, + container=server_containers_list[0] or cluster_config["containers"][server_types_list[0]], + name=f"{task_name}_model_0_server", + ) + ) + group_gpus = int(server_gpus_list[0]) + group_nodes = int(server_nodes_list[0]) + group_tasks = int(group0_server.num_tasks) + + if sandbox_script is not None: + group0_components.append( + Command( + script=sandbox_script, + container=cluster_config["containers"]["sandbox"], + name=f"{task_name}_sandbox", + ) + ) + + group0_components.append( + Command( + script=client_script, + container=cluster_config["containers"]["nemo-skills"], + name=f"{task_name}", + ) + ) + + groups.append( + CommandGroup( + commands=group0_components, + hardware=HardwareConfig( + partition=partition, + num_gpus=group_gpus, + num_nodes=group_nodes, + num_tasks=group_tasks, + sbatch_kwargs=sbatch_kwargs, ), - get_server_command=job_server_command, - extra_package_dirs=[extra_datasets] if should_package_extra_datasets else None, - sbatch_kwargs=sbatch_kwargs, - installation_command=installation_command, - skip_hf_home_check=skip_hf_home_check, + name=f"{task_name}_group0", + log_dir=log_dir, + ) + ) + + # Additional groups for hosted models 1..N-1 + for model_idx in range(1, num_models): + srv = server_scripts[model_idx] + if srv is None: + continue + groups.append( + CommandGroup( + commands=[ + Command( + script=srv, + container=server_containers_list[model_idx] + or cluster_config["containers"][server_types_list[model_idx]], + name=f"{task_name}_model_{model_idx}_server", + ) + ], + hardware=HardwareConfig( + partition=partition, + num_gpus=int(server_gpus_list[model_idx]), + num_nodes=int(server_nodes_list[model_idx]), + num_tasks=int(srv.num_tasks), + sbatch_kwargs=sbatch_kwargs, + ), + name=f"{task_name}_model_{model_idx}_group", + log_dir=log_dir, + ) ) - prev_tasks = [new_task] - all_tasks.append(new_task) - # only last dependent job will be here, which is what we want - job_id_to_tasks[idx] = prev_tasks + + base_deps = list(_task_dependencies or []) + if run_after: + base_deps.extend(run_after if isinstance(run_after, list) else [run_after]) + + prev_job = None + for dep_idx in range(dependent_jobs + 1): + internal_job_name = f"{task_name}-dep{dep_idx}" if dep_idx > 0 else task_name + if dep_idx == 0: + job_deps = base_deps if base_deps else None + else: + job_deps = [prev_job] + + job_spec = {"name": internal_job_name, "dependencies": job_deps} + if len(groups) > 1: + job_spec["groups"] = groups + else: + job_spec["group"] = groups[0] + + jobs.append(job_spec) + job_names.append(internal_job_name) + prev_job = job_spec + + job_batch_to_last_job_name[job_idx] = internal_job_name + + if jobs: + pipeline = Pipeline( + name=expname, + cluster_config=cluster_config, + jobs=jobs, + reuse_code=reuse_code, + reuse_code_exp=reuse_code_exp, + skip_hf_home_check=skip_hf_home_check, + extra_package_dirs=[extra_datasets] if should_package_extra_datasets else None, + ) + handles = pipeline.run(dry_run=dry_run, _reuse_exp=exp, sequential=sequential) + job_name_to_handle = dict(zip(job_names, handles)) + for job_idx, last_job_name in job_batch_to_last_job_name.items(): + job_id_to_tasks[job_idx] = [job_name_to_handle[last_job_name]] + all_tasks.append(job_name_to_handle[last_job_name]) # scheduling judge jobs if needed for idx, (benchmark, benchmark_args) in enumerate(benchmarks_dict.items()): if not eval_requires_judge and not benchmark_args.requires_judge: @@ -674,7 +896,6 @@ def eval( judge_pipeline_args=judge_pipeline_args, rerun_done=rerun_done, log_dir=log_dir, - server_parameters=server_parameters, cluster_config=cluster_config, judge_server_gpus=judge_server_gpus, judge_server_nodes=judge_server_nodes, diff --git a/nemo_skills/pipeline/utils/eval.py b/nemo_skills/pipeline/utils/eval.py index 9750659e5b..68661c1270 100644 --- a/nemo_skills/pipeline/utils/eval.py +++ b/nemo_skills/pipeline/utils/eval.py @@ -54,15 +54,24 @@ def requires_judge(self): return bool(self.judge_args or self.judge_pipeline_args) -def combine_cmds(cmds: list[str], single_node_mode: str) -> str: - """Combine multiple eval commands into a single eval cmd.""" - if single_node_mode == "sequential": - return " && ".join(cmds) - elif single_node_mode == "parallel": - if len(cmds) == 1: - return cmds[0] - return " & ".join(f"( {cmd} )" for cmd in cmds) + " & wait " - raise ValueError(f"Unknown single_node_mode: {single_node_mode}") +@dataclass +class EvalGenerationUnit: + """Parameters for a single generation invocation used during eval batching. + + This is intentionally kept as a simple dataclass so scripts can build the final + commands at runtime (after heterogeneous hostname refs are resolved). + """ + + output_dir: str + input_file: str + extra_arguments: str + random_seed: int | None + chunk_id: int | None + num_chunks: int | None + script: str + requirements: list[str] | None + wandb_parameters: dict | None + with_sandbox: bool def get_arg_from_module_or_dict(module, arg_name, default_value=None, override_dict=None): @@ -256,11 +265,9 @@ def prepare_eval_commands( num_chunks, chunk_ids, rerun_done, - server_parameters, extra_arguments, data_dir, extra_datasets_type, - exclusive, with_sandbox, keep_mounts_for_sandbox, wandb_parameters, @@ -268,14 +275,16 @@ def prepare_eval_commands( generation_type=None, generation_module=None, ): - # TODO: there is a bit too much code duplication here and logic is quite dense, should try to refactor - - # TODO: should we allow setting num chunks per benchmark when not using groups? Maybe benchmark:rs_num:num_chunks? + """Prepare per-job eval generation units for evaluation batching. + Returns: + benchmarks_dict: Mapping benchmark name -> BenchmarkArgs (includes job_ids, remaining_jobs, etc.) + job_batches: List of tuples: + (job_units, job_benchmarks, job_needs_sandbox, job_needs_sandbox_to_keep_mounts, job_sandbox_env_overrides) + """ if generation_type is not None: if generation_module is not None: raise ValueError("Cannot specify both generation_module and generation_type. ") - generation_module = GENERATION_MODULE_MAP[generation_type] benchmarks_or_groups = { @@ -286,12 +295,11 @@ def prepare_eval_commands( if num_jobs is None: if cluster_config["executor"] == "slurm": - num_jobs = -1 # -1 means run all benchmarks in parallel + num_jobs = -1 else: - # for local executor, it makes no sense to use other values num_jobs = 1 - benchmarks_dict = {} # benchmark_name -> benchmark_args + benchmarks_dict = {} for benchmark_or_group, rs_num in benchmarks_or_groups.items(): cur_benchmarks = add_default_args( cluster_config, @@ -341,7 +349,6 @@ def prepare_eval_commands( if num_chunks: benchmark_args.num_chunks = num_chunks if benchmark_args.num_chunks is not None: - # TODO: currently using global chunk_ids but local num_chunks. That's not ideal benchmark_chunk_ids = compute_chunk_ids(chunk_ids, benchmark_args.num_chunks) if benchmark_chunk_ids is None: benchmark_chunk_ids = [None] @@ -353,11 +360,10 @@ def prepare_eval_commands( chunk_ids=benchmark_chunk_ids, rerun_done=rerun_done, ) - for seed_idx, (seed, benchmark_chunk_ids) in enumerate(benchmark_args.remaining_jobs.items()): + for _, benchmark_chunk_ids in benchmark_args.remaining_jobs.items(): total_evals += len(benchmark_chunk_ids) if num_jobs < 0: - # if num_jobs is -1, we run all benchmarks in parallel num_jobs = total_evals if num_jobs == 0: @@ -371,16 +377,9 @@ def prepare_eval_commands( eval_to_job_map.extend([i] * count) cur_job_idx = 0 - get_random_port = pipeline_utils.should_get_random_port(server_parameters["server_gpus"], exclusive) - job_server_config, job_server_address, job_extra_arguments = pipeline_utils.configure_client( - **server_parameters, - extra_arguments=extra_arguments, - get_random_port=get_random_port, - ) - cur_eval = 0 job_batches = [] - job_cmds = [] + job_units: list[EvalGenerationUnit] = [] job_benchmarks = set() for benchmark, benchmark_args in benchmarks_dict.items(): @@ -392,7 +391,6 @@ def prepare_eval_commands( benchmark_output_dir = f"{output_dir}/{benchmark_args.eval_subfolder}" for seed_idx, (seed, benchmark_chunk_ids) in enumerate(benchmark_args.remaining_jobs.items()): if wandb_parameters: - # no need for chunks as it will run after merging wandb_parameters["samples_file"] = pipeline_utils.get_chunked_rs_filename( benchmark_output_dir, random_seed=seed, @@ -428,30 +426,30 @@ def prepare_eval_commands( full_extra_arguments = ( f"{generation_task.get_generation_default_args()} " f"{benchmark_args.generation_args} " - f"{job_extra_arguments} " + f"{extra_arguments} " ) - cmd = pipeline_utils.get_generation_cmd( - input_file=benchmark_args.input_file, - output_dir=benchmark_output_dir, - extra_arguments=full_extra_arguments, - random_seed=seed, - chunk_id=chunk_id, - num_chunks=benchmark_args.num_chunks, - script=generation_module or benchmark_args.generation_module, - requirements=requirements, - # only logging for the first seed - wandb_parameters=wandb_parameters if seed_idx == 0 else None, - with_sandbox=benchmark_args.requires_sandbox, + job_units.append( + EvalGenerationUnit( + input_file=benchmark_args.input_file, + output_dir=benchmark_output_dir, + extra_arguments=full_extra_arguments, + random_seed=seed, + chunk_id=chunk_id, + num_chunks=benchmark_args.num_chunks, + script=generation_module or benchmark_args.generation_module, + requirements=requirements, + wandb_parameters=wandb_parameters if seed_idx == 0 else None, + with_sandbox=benchmark_args.requires_sandbox, + ) ) - job_cmds.append(cmd) if cur_eval == total_evals - 1 or cur_job_idx != eval_to_job_map[cur_eval + 1]: job_needs_sandbox = any(benchmarks_dict[b].requires_sandbox for b in job_benchmarks) job_needs_sandbox_to_keep_mounts = any( benchmarks_dict[b].keep_mounts_for_sandbox for b in job_benchmarks ) - # Aggregate per-job sandbox env overrides from participating benchmarks (first key wins) + ordered_benchmarks = [b for b in benchmarks_dict.keys() if b in job_benchmarks] env_map: Dict[str, str] = {} env_source: Dict[str, str] = {} @@ -468,29 +466,19 @@ def prepare_eval_commands( env_source[key] = b job_sandbox_env_overrides = [f"{k}={v}" for k, v in env_map.items()] - # TODO: move to a dataclass job_batches.append( ( - job_cmds, + job_units, job_benchmarks, job_needs_sandbox, job_needs_sandbox_to_keep_mounts, - job_server_config, - job_server_address, - # a check above guarantees that this is the same for all tasks in a job - generation_task.get_server_command_fn(), job_sandbox_env_overrides, ) ) - job_server_config, job_server_address, job_extra_arguments = pipeline_utils.configure_client( - **server_parameters, - extra_arguments=extra_arguments, - get_random_port=get_random_port, - ) for job_benchmark in job_benchmarks: benchmarks_dict[job_benchmark].job_ids.append(cur_job_idx) cur_job_idx += 1 - job_cmds = [] + job_units = [] job_benchmarks = set() cur_eval += 1 diff --git a/nemo_skills/pipeline/utils/scripts.py b/nemo_skills/pipeline/utils/scripts.py index 3e74e438ef..cd1485effd 100644 --- a/nemo_skills/pipeline/utils/scripts.py +++ b/nemo_skills/pipeline/utils/scripts.py @@ -49,7 +49,7 @@ from nemo_skills.pipeline.utils.commands import sandbox_command from nemo_skills.pipeline.utils.exp import install_packages_wrap from nemo_skills.pipeline.utils.generation import get_generation_cmd -from nemo_skills.pipeline.utils.server import get_free_port, get_server_command +from nemo_skills.pipeline.utils.server import get_free_port, get_server_command, wrap_python_path from nemo_skills.utils import get_logger_name if TYPE_CHECKING: @@ -432,3 +432,145 @@ def build_cmd() -> Tuple[str, Dict]: # Always use lazy command building self.set_inline(build_cmd) super().__post_init__() + + +def _combine_cmds(cmds: List[str], single_node_mode: str) -> str: + """Combine multiple eval commands into a single shell command. + + Duplicated (intentionally) from pipeline/utils/eval.py to avoid import cycles. + """ + if single_node_mode == "sequential": + return " && ".join(cmds) + if single_node_mode == "parallel": + if len(cmds) == 1: + return cmds[0] + return " & ".join(f"( {cmd} )" for cmd in cmds) + " & wait " + raise ValueError(f"Unknown single_node_mode: {single_node_mode}") + + +def _inject_if_missing(extra_arguments: str, needle: str, insertion: str) -> str: + """Prepend insertion if needle isn't already present.""" + if needle in extra_arguments: + return extra_arguments + return f"{insertion}{extra_arguments}" + + +def _inject_single_server_overrides( + *, + extra_arguments: str, + server_type: str, + model_name: str, + host: str | None = None, + port: int | None = None, + base_url: str | None = None, +) -> str: + """Inject single-model server config overrides (like configure_client does). + + We do this at runtime for EvalClientScript so hostname refs can resolve in heterogeneous jobs. + """ + # Respect user-provided overrides + extra_arguments = _inject_if_missing( + extra_arguments, "++server.server_type=", f"++server.server_type={server_type} " + ) + extra_arguments = _inject_if_missing(extra_arguments, "++server.model=", f"++server.model={model_name} ") + + if host is not None and port is not None: + extra_arguments = _inject_if_missing(extra_arguments, "++server.host=", f"++server.host={host} ") + extra_arguments = _inject_if_missing(extra_arguments, "++server.port=", f"++server.port={port} ") + return extra_arguments + + if base_url is not None: + extra_arguments = _inject_if_missing(extra_arguments, "++server.base_url=", f"++server.base_url={base_url} ") + return extra_arguments + + return extra_arguments + + +@dataclass(kw_only=True) +class EvalClientScript(BaseJobScript): + """Script for running evaluation generation commands (possibly multiple) in one job. + + Unlike GenerationClientScript (which builds a single generation run), + this script builds N generation commands and combines them with sequential/parallel mode. + + It supports multi-model server references by resolving a list of server addresses + at runtime (hostname refs in heterogeneous jobs). + """ + + # A list of generation "units" (dict payload to keep utils->scripts import simple) + # Each unit must contain args accepted by get_generation_cmd(). + units: List[Dict] + single_node_mode: str = "parallel" + + # Multi-model cross-component references + servers: Optional[List[Optional["ServerScript"]]] = None + server_addresses_prehosted: Optional[List[str]] = None + model_names: Optional[List[str]] = None + server_types: Optional[List[str]] = None + sandbox: Optional["SandboxScript"] = None + with_sandbox: bool = False + + log_prefix: str = field(default="main", init=False) + + def __post_init__(self): + def build_cmd() -> Tuple[str, Dict]: + env_vars: Dict[str, str] = {} + + # Add sandbox port to environment if sandbox is referenced + if self.sandbox: + env_vars["NEMO_SKILLS_SANDBOX_PORT"] = str(self.sandbox.port) + + # Build server addresses if servers are provided + server_addresses = None + if self.servers is not None: + server_addresses = [] + for server_idx, server_script in enumerate(self.servers): + if server_script is not None: + addr = f"{server_script.hostname_ref()}:{server_script.port}" + else: + addr = self.server_addresses_prehosted[server_idx] + server_addresses.append(addr) + + cmds: List[str] = [] + is_multi_model = bool(self.model_names) and len(self.model_names) > 1 + for unit in self.units: + unit = dict(unit) + + # Single-model: get_generation_cmd doesn't emit list-style ++server.* overrides, + # so we must inject the single-server config here (mirrors configure_client behavior). + if self.model_names and len(self.model_names) == 1: + server_type = self.server_types[0] if self.server_types else "" + model_name = self.model_names[0] + if self.servers is not None and self.servers[0] is not None: + srv = self.servers[0] + unit["extra_arguments"] = _inject_single_server_overrides( + extra_arguments=unit.get("extra_arguments", ""), + server_type=server_type, + model_name=model_name, + host=srv.hostname_ref(), + port=srv.port, + ) + else: + unit["extra_arguments"] = _inject_single_server_overrides( + extra_arguments=unit.get("extra_arguments", ""), + server_type=server_type, + model_name=model_name, + base_url=server_addresses[0] if server_addresses else None, + ) + + cmds.append( + get_generation_cmd( + **unit, + # Multi-model parameters (only for num_models > 1) + server_addresses=server_addresses if is_multi_model else None, + model_names=self.model_names if is_multi_model else None, + server_types=self.server_types if is_multi_model else None, + ) + ) + + combined = _combine_cmds(cmds, self.single_node_mode) + combined = wrap_python_path(combined) + return combined, {"environment": env_vars} + + self.set_inline(build_cmd) + super().__post_init__()