Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/nemo-evaluator/src/nemo_evaluator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
ScoreStats,
TaskResult,
)
from nemo_evaluator.gym_harness import GymHarness
from nemo_evaluator.package_info import (
__contact_emails__,
__contact_names__,
Expand All @@ -39,6 +40,7 @@
)

__all__ = [
"GymHarness",
"ApiEndpoint",
"ConfigParams",
"EndpointType",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Generate a minimal nemo-gym YAML config that points ``simple_agent`` at an
already-running ``NeMoEvaluatorResourcesServer`` and its auto-generated
JSONL dataset.

The generated config can be fed to ``ng_run`` or ``ng_collect_rollouts``.
"""

from __future__ import annotations

import os
import tempfile
from pathlib import Path
from typing import Any, Optional

import yaml


def generate_gym_config(
resource_server_host: str,
resource_server_port: int,
data_dir: str,
eval_type: str,
model_server_url: str,
model_id: str,
model_server_name: str = "policy_model",
) -> dict[str, Any]:
"""Build a nemo-gym config dict suitable for ``ng_collect_rollouts``.

The config references:
- The already-running ``NeMoEvaluatorResourcesServer`` as a resource server.
- A model server (the adapter / real endpoint) for the policy model.
- A dataset JSONL that was written by the resource server at startup.
"""
jsonl_fpath = str(Path(data_dir) / f"{eval_type}.jsonl")

# The nemo-gym global config is keyed by "config path" at the top level.
# Each entry has {server_type: {server_name: {actual config}}}.
# See resources_servers/mcqa/configs/mcqa.yaml for the canonical format.
return {
"nemo_evaluator": {
"resources_servers": {
"nemo_evaluator": {
"entrypoint": "app.py",
"host": resource_server_host,
"port": resource_server_port,
},
},
},
"nemo_evaluator_simple_agent": {
"responses_api_agents": {
"simple_agent": {
"entrypoint": "app.py",
"resources_server": {
"type": "resources_servers",
"name": "nemo_evaluator",
},
"model_server": {
"type": "responses_api_models",
"name": model_server_name,
},
"datasets": [
{
"name": "eval",
"type": "eval",
"jsonl_fpath": jsonl_fpath,
},
],
},
},
},
model_server_name: {
"responses_api_models": {
model_server_name: {
"entrypoint": "app.py",
"url": model_server_url,
"model": model_id,
},
},
},
}


def write_gym_config(
config: dict[str, Any],
output_path: Optional[str] = None,
) -> str:
"""Write the gym config dict to a YAML file.

Returns the path to the written file.
"""
if output_path is None:
fd, output_path = tempfile.mkstemp(suffix=".yaml", prefix="nemo_gym_eval_")
os.close(fd)

with open(output_path, "w") as f:
yaml.safe_dump(config, f, default_flow_style=False)

return output_path
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Context manager for spawning the generic NeMo Evaluator resource server
(``ne_resource_server``) as a subprocess.

Usage::

with ResourceServerProcess(
eval_type="AIME_2025",
framework_config=framework_config,
) as rsp:
print(rsp.url) # http://localhost:12345
print(rsp.data_dir) # /tmp/nemo_evaluator_data
"""

from __future__ import annotations

import json
import shutil
import socket
import subprocess
import time
from typing import Any, Optional

from nemo_evaluator.logging import get_logger

logger = get_logger(__name__)

DEFAULT_HOST = "localhost"
DEFAULT_DATA_DIR = "/tmp/nemo_evaluator_data"


def _find_free_port(host: str = DEFAULT_HOST) -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((host, 0))
return s.getsockname()[1]


def _wait_for_server(
host: str, port: int, max_wait: float = 120, interval: float = 0.5
) -> bool:
deadline = time.time() + max_wait
while time.time() < deadline:
try:
with socket.create_connection((host, port), timeout=1):
return True
except OSError:
time.sleep(interval)
return False


class ResourceServerProcess:
"""Spawn ``ne_resource_server`` / ``nemo_evaluator_resource_server`` as a subprocess.

The process is started in ``__enter__`` and terminated in ``__exit__``.

Configuration is derived from the harness's ``framework.yml``.
"""

def __init__(
self,
eval_type: str,
framework_config: dict[str, Any],
host: str = DEFAULT_HOST,
port: Optional[int] = None,
data_dir: str = DEFAULT_DATA_DIR,
) -> None:
self.eval_type = eval_type
self.host = host
self.port = port or _find_free_port(host)
self.data_dir = data_dir
Comment on lines +84 to +85
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Auto-selected port can race and cause flaky startup.

Choosing a port first and binding later in another process is a TOCTOU race; another process can claim that port before the resource server starts.

Consider retrying startup with a newly selected port when port was auto-assigned and startup fails.

Also applies to: 118-138

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/nemo-evaluator/src/nemo_evaluator/core/resource_server_process.py`
around lines 82 - 83, The auto-selected port assignment (self.port = port or
_find_free_port(host)) is susceptible to a TOCTOU race: when port is not
provided, retry binding with a freshly chosen port if the server startup fails;
modify the startup logic in the ResourceServerProcess (the code paths that call
self.port and launch the server—e.g., the constructor assignment and the method
that actually binds/starts the server) to detect bind/start failures and, if the
original port was auto-assigned (port argument was None), loop a few times: pick
a new port via _find_free_port(host), set self.port, and retry startup with a
short backoff before finally raising the error; ensure this retry logic is used
in both the initial constructor/startup and the alternate startup path
referenced around lines 118–138.


# Derive harness_package from framework.yml
fw = framework_config.get("framework", {})
self.harness_package = fw.get("pkg_name")
if not self.harness_package:
raise ValueError("framework_config['framework']['pkg_name'] is required")

# Build judge config from defaults (if present)
defaults = framework_config.get("defaults", {})
judge_defaults = (
defaults.get("config", {})
.get("params", {})
.get("extra", {})
.get("judge", {})
)
self.harness_kwargs: dict[str, Any] = {}
if judge_defaults and judge_defaults.get("url"):
self.harness_kwargs["judge_config"] = {
"backend": judge_defaults.get("backend", "openai"),
"model": judge_defaults.get("model_id"),
"url": judge_defaults.get("url"),
}

self._process: Optional[subprocess.Popen] = None

@property
def url(self) -> str:
return f"http://{self.host}:{self.port}"

def __enter__(self) -> ResourceServerProcess:
exe = shutil.which("ne_resource_server") or shutil.which(
"nemo_evaluator_resource_server"
)
if exe is None:
raise RuntimeError(
"Cannot find ne_resource_server or nemo_evaluator_resource_server on PATH. "
"Is nemo-gym installed?"
)

cmd = [
exe,
"--harness_package",
self.harness_package,
"--eval_type",
self.eval_type,
"--host",
self.host,
"--port",
str(self.port),
"--data_dir",
self.data_dir,
]
if self.harness_kwargs:
cmd += ["--harness_kwargs_json", json.dumps(self.harness_kwargs)]

logger.info("Spawning resource server: %s", " ".join(cmd))
self._process = subprocess.Popen(cmd)
Comment on lines +138 to +142
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid logging the full spawn command with raw harness kwargs.

The command log can include --harness_kwargs_json and expose sensitive data (for example credentials embedded in URLs).

🔒 Proposed hardening
-        logger.info("Spawning resource server: %s", " ".join(cmd))
+        logger.info(
+            "Spawning resource server (exe=%s, host=%s, port=%s, harness_package=%s, has_harness_kwargs=%s)",
+            exe,
+            self.host,
+            self.port,
+            self.harness_package,
+            bool(self.harness_kwargs),
+        )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/nemo-evaluator/src/nemo_evaluator/core/resource_server_process.py`
around lines 126 - 130, The current logger.info call logs the full spawn command
(cmd) and can expose sensitive data in self.harness_kwargs; before logging,
sanitize cmd by removing or redacting the value passed to
"--harness_kwargs_json" (or omit that flag entirely) to produce a safe_cmd, then
log safe_cmd instead of cmd; keep subprocess.Popen(cmd) unchanged so the process
still receives the real arguments.


if _wait_for_server(self.host, self.port):
logger.info("Resource server ready at %s", self.url)
return self

self._process.terminate()
self._process.wait(timeout=5)
raise RuntimeError(f"Resource server failed to start at {self.url}")
Comment on lines +148 to +150
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify the startup-failure cleanup path lacks TimeoutExpired handling
rg -n -C5 'self\._process\.terminate\(\)' packages/nemo-evaluator/src/nemo_evaluator/core/resource_server_process.py

Repository: NVIDIA-NeMo/Evaluator

Length of output: 798


🏁 Script executed:

#!/bin/bash
# Check imports in the file
head -50 packages/nemo-evaluator/src/nemo_evaluator/core/resource_server_process.py

Repository: NVIDIA-NeMo/Evaluator

Length of output: 1579


🏁 Script executed:

#!/bin/bash
# Look for TimeoutExpired handling anywhere in the file
rg -n 'TimeoutExpired' packages/nemo-evaluator/src/nemo_evaluator/core/resource_server_process.py

Repository: NVIDIA-NeMo/Evaluator

Length of output: 114


Handle TimeoutExpired in startup failure cleanup to prevent orphaned processes.

Lines 136–138 call wait(timeout=5) without catching subprocess.TimeoutExpired, which can leave the child process running if termination hangs. The __exit__ method (lines 143–147) already implements the correct pattern with force-kill; apply the same approach here.

Proposed fix
         self._process.terminate()
-        self._process.wait(timeout=5)
+        try:
+            self._process.wait(timeout=5)
+        except subprocess.TimeoutExpired:
+            logger.warning("Force-killing resource server (pid=%s)", self._process.pid)
+            self._process.kill()
+            self._process.wait()
         raise RuntimeError(f"Resource server failed to start at {self.url}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
self._process.terminate()
self._process.wait(timeout=5)
raise RuntimeError(f"Resource server failed to start at {self.url}")
self._process.terminate()
try:
self._process.wait(timeout=5)
except subprocess.TimeoutExpired:
logger.warning("Force-killing resource server (pid=%s)", self._process.pid)
self._process.kill()
self._process.wait()
raise RuntimeError(f"Resource server failed to start at {self.url}")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/nemo-evaluator/src/nemo_evaluator/core/resource_server_process.py`
around lines 136 - 138, The startup failure path currently calls
self._process.terminate() and self._process.wait(timeout=5) without handling
subprocess.TimeoutExpired, which can leave the child running; update the failure
cleanup in the method that raises RuntimeError(f"Resource server failed to start
at {self.url}") to mirror the __exit__ pattern: call self._process.terminate(),
catch subprocess.TimeoutExpired from self._process.wait(timeout=5), then call
self._process.kill() and self._process.wait() in the except block before
re-raising the RuntimeError so the process cannot be orphaned (refer to
self._process, the RuntimeError raising location, and the __exit__ cleanup
logic).


def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
if self._process is not None:
self._process.terminate()
try:
self._process.wait(timeout=10)
except subprocess.TimeoutExpired:
logger.warning(
"Force-killing resource server (pid=%s)", self._process.pid
)
self._process.kill()
self._process.wait()
return False
77 changes: 77 additions & 0 deletions packages/nemo-evaluator/src/nemo_evaluator/gym_harness.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
GymHarness interface for integrating evaluation harnesses with nemo-gym.

Each harness package (e.g., simple-evals, lm-eval) implements this interface
in a module named ``gym_harness.py`` at the package root, exporting a class
named ``Harness``.

The generic resource server in nemo-gym discovers implementations via::

import {pkg_name}.gym_harness
harness = {pkg_name}.gym_harness.Harness(eval_type=..., **kwargs)

``pkg_name`` is read from the harness's existing ``framework.yml``.
"""

from abc import ABC, abstractmethod
from typing import Any


class GymHarness(ABC):
"""Interface for making any eval harness available as a nemo-gym resource server.

Subclasses must implement :meth:`get_dataset` (load benchmark data and
format prompts) and :meth:`verify` (extract and score a model response).

Implementations should be exported as ``Harness`` in
``{pkg_name}/gym_harness.py`` so the resource server can discover them
by convention.
"""

def __init__(self, eval_type: str, **kwargs: Any) -> None:
self.eval_type = eval_type

@abstractmethod
def get_dataset(self) -> list[dict]:
"""Load benchmark data, format prompts, return JSONL-ready rows.

Each row must contain at minimum:

- ``responses_create_params``: ``{"input": [{"role": "user", "content": "..."}]}``
- ``expected_answer``: ``str``

Additional fields are passed through to :meth:`verify` as keyword
arguments during evaluation.
"""
...

@abstractmethod
async def verify(
self,
response_text: str,
expected_answer: str,
**kwargs: Any,
) -> tuple[float, str | None]:
"""Extract an answer from *response_text* and score it against *expected_answer*.

Returns ``(reward, extracted_answer)``. ``extracted_answer`` may be
``None`` if extraction failed.

This method is async to support judge-model fallback calls.
"""
...
Loading
Loading