diff --git a/nemo_gym/__init__.py b/nemo_gym/__init__.py index 844bb1e6b..041db3f28 100644 --- a/nemo_gym/__init__.py +++ b/nemo_gym/__init__.py @@ -36,7 +36,9 @@ # environ["HF_HOME"] = join(CACHE_DIR, "huggingface") # UV caching directory overrides to local folders. -environ["UV_CACHE_DIR"] = str(CACHE_DIR / "uv") +# Use setdefault so a pre-warmed cache (e.g., in a container) is not overridden. +environ.setdefault("UV_CACHE_DIR", str(CACHE_DIR / "uv")) +environ.setdefault("UV_LINK_MODE", "symlink") # Turn off Gradio analytics environ["GRADIO_ANALYTICS_ENABLED"] = "False" diff --git a/nemo_gym/cli.py b/nemo_gym/cli.py index af3558253..d2c003670 100644 --- a/nemo_gym/cli.py +++ b/nemo_gym/cli.py @@ -62,10 +62,21 @@ ) -def _setup_env_command(dir_path: Path, global_config_dict: DictConfig) -> str: # pragma: no cover +def _setup_env_command(dir_path: Path, server_name: str, global_config_dict: DictConfig) -> str: # pragma: no cover head_server_deps = global_config_dict[HEAD_SERVER_DEPS_KEY_NAME] - uv_venv_cmd = f"uv venv --seed --allow-existing --python {global_config_dict[PYTHON_VERSION_KEY_NAME]} .venv" + # Use centralized venv directory if NEMO_GYM_VENV_DIR is set, otherwise fall back to per-directory .venv + venv_dir = environ.get("NEMO_GYM_VENV_DIR") + if venv_dir: + venv_path = f"{venv_dir}/{server_name}" + else: + venv_path = ".venv" + + # Force rebuild: remove the existing venv before recreating it + force_rebuild = environ.get("NEMO_GYM_FORCE_REBUILD_VENVS", "false").lower() == "true" + force_rebuild_cmd = f"rm -rf {venv_path} && " if force_rebuild else "" + + uv_venv_cmd = f"{force_rebuild_cmd}uv venv --seed --allow-existing --python {global_config_dict[PYTHON_VERSION_KEY_NAME]} {venv_path}" has_pyproject_toml = exists(f"{dir_path / 'pyproject.toml'}") has_requirements_txt = exists(f"{dir_path / 'requirements.txt'}") @@ -74,7 +85,7 @@ def _setup_env_command(dir_path: Path, global_config_dict: DictConfig) -> str: # not needed for most clusters. should be safe in all scenarios, but only minimally tested outside of colab. # see discussion and examples here: https://github.com/NVIDIA-NeMo/Gym/pull/526#issuecomment-3676230383 uv_pip_set_python = global_config_dict.get(UV_PIP_SET_PYTHON_KEY_NAME, False) - uv_pip_python_flag = "--python .venv/bin/python " if uv_pip_set_python else "" + uv_pip_python_flag = f"--python {venv_path}/bin/python " if uv_pip_set_python else "" verbose_flag = "-v " if global_config_dict.get(PIP_INSTALL_VERBOSE_KEY_NAME) else "" @@ -93,7 +104,7 @@ def _setup_env_command(dir_path: Path, global_config_dict: DictConfig) -> str: cmd = f"""cd {dir_path} \\ && {uv_venv_cmd} \\ - && source .venv/bin/activate \\ + && source {venv_path}/bin/activate \\ && {install_cmd} \\ """ @@ -213,7 +224,7 @@ def start(self, global_config_dict_parser_config: GlobalConfigDictParserConfig) dir_path = PARENT_DIR / Path(first_key, second_key) - command = f"""{_setup_env_command(dir_path, global_config_dict)} \\ + command = f"""{_setup_env_command(dir_path, second_key, global_config_dict)} \\ && {NEMO_GYM_CONFIG_DICT_ENV_VAR_NAME}={escaped_config_dict_yaml_str} \\ {NEMO_GYM_CONFIG_PATH_ENV_VAR_NAME}={shlex.quote(top_level_path)} \\ python {str(entrypoint_fpath)}""" @@ -496,7 +507,9 @@ def _validate_data_single(test_config: TestConfig) -> None: # pragma: no cover def _test_single(test_config: TestConfig, global_config_dict: DictConfig) -> Popen: # pragma: no cover # Eventually we may want more sophisticated testing here, but this is sufficient for now. - command = f"""{_setup_env_command(test_config.dir_path, global_config_dict)} && pytest""" + command = ( + f"""{_setup_env_command(test_config.dir_path, test_config.dir_path.name, global_config_dict)} && pytest""" + ) return _run_command(command, test_config.dir_path) diff --git a/nemo_gym/prefetch_gym_venvs.py b/nemo_gym/prefetch_gym_venvs.py new file mode 100755 index 000000000..33a5feab8 --- /dev/null +++ b/nemo_gym/prefetch_gym_venvs.py @@ -0,0 +1,201 @@ +#!/usr/bin/env python3 +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Pre-build all NeMo Gym server virtual environments. + +This script discovers all server directories (responses_api_models, resources_servers, +responses_api_agents) that have a pyproject.toml or requirements.txt, and creates a +venv for each in $NEMO_GYM_VENV_DIR. + +Usage: + python nemo_gym/prefetch_gym_venvs.py /opt/gym_venvs + + # Only prefetch specific servers: + python nemo_gym/prefetch_gym_venvs.py /opt/gym_venvs --filter vllm_model simple_agent + + # Or via environment variable: + NEMO_GYM_VENV_DIR=/opt/gym_venvs python nemo_gym/prefetch_gym_venvs.py + +Environment variables: + NEMO_GYM_VENV_DIR Fallback for venv_dir if not passed as argument. + NEMO_GYM_FORCE_REBUILD_VENVS Set to 'true' to force rebuild (same as --force-rebuild). + UV_LINK_MODE Optional. Set to 'symlink' to share packages via UV cache (recommended). + PYTHON_VERSION Optional. Python version for venvs (default: current interpreter version). +""" + +import argparse +import os +import shutil +import subprocess +import sys +import time +from pathlib import Path + + +GYM_ROOT = Path(__file__).resolve().parent.parent +SERVER_TYPE_DIRS = ["responses_api_models", "resources_servers", "responses_api_agents"] +FORCE_REBUILD_ENV_VAR = "NEMO_GYM_FORCE_REBUILD_VENVS" + + +def get_head_server_deps() -> list[str]: + """Get the head server dependency pins (ray and openai versions).""" + deps = [] + try: + import ray + + deps.append(f"ray[default]=={ray.__version__}") + except ImportError: + print(" WARNING: ray not importable, skipping ray pin") + try: + import openai + + deps.append(f"openai=={openai.__version__}") + except ImportError: + print(" WARNING: openai not importable, skipping openai pin") + return deps + + +def discover_servers(filters: list[str] | None = None) -> list[tuple[str, Path]]: + """Discover all server directories with pyproject.toml or requirements.txt.""" + servers = [] + for server_type in SERVER_TYPE_DIRS: + type_dir = GYM_ROOT / server_type + if not type_dir.exists(): + continue + for server_dir in sorted(type_dir.iterdir()): + if not server_dir.is_dir(): + continue + has_deps = (server_dir / "pyproject.toml").exists() or (server_dir / "requirements.txt").exists() + if not has_deps: + continue + + server_name = server_dir.name + if filters and server_name not in filters: + continue + + servers.append((f"{server_type}/{server_name}", server_dir)) + return servers + + +def prefetch_venv( + server_label: str, + server_dir: Path, + venv_dir: str, + python_version: str, + head_server_deps: list[str], + force_rebuild: bool = False, +) -> bool: + """Create and populate a venv for one server. Returns True on success.""" + server_name = server_dir.name + venv_path = os.path.join(venv_dir, server_name) + + if force_rebuild and os.path.exists(venv_path): + shutil.rmtree(venv_path) + + # Create venv + result = subprocess.run( + ["uv", "venv", "--seed", "--allow-existing", "--python", python_version, venv_path], + capture_output=True, + text=True, + ) + if result.returncode != 0: + print(f" ERROR creating venv: {result.stderr}") + return False + + # Determine install command + install_cmd = ["uv", "pip", "install", "--python", f"{venv_path}/bin/python"] + if (server_dir / "pyproject.toml").exists(): + install_cmd.extend(["-e", str(server_dir)]) + elif (server_dir / "requirements.txt").exists(): + install_cmd.extend(["-r", str(server_dir / "requirements.txt")]) + install_cmd.extend(head_server_deps) + + # Install deps + result = subprocess.run( + install_cmd, + capture_output=True, + text=True, + cwd=str(server_dir), + ) + if result.returncode != 0: + print(f" ERROR installing deps: {result.stderr}") + return False + + return True + + +def main(): + parser = argparse.ArgumentParser(description="Pre-build NeMo Gym server venvs") + parser.add_argument( + "venv_dir", + nargs="?", + default=None, + help="Directory where venvs will be created. Falls back to NEMO_GYM_VENV_DIR env var if not provided.", + ) + parser.add_argument( + "--filter", + nargs="+", + default=None, + help="Only prefetch servers whose directory name exactly matches one of these strings (e.g., vllm_model simple_agent)", + ) + parser.add_argument( + "--force-rebuild", + action="store_true", + default=False, + help="Force rebuild all venvs from scratch, even if they already exist. " + f"Can also be set via {FORCE_REBUILD_ENV_VAR}=true", + ) + args = parser.parse_args() + + venv_dir = args.venv_dir or os.environ.get("NEMO_GYM_VENV_DIR") + if not venv_dir: + parser.error("venv_dir is required. Pass as argument or set NEMO_GYM_VENV_DIR.") + + python_version = os.environ.get("PYTHON_VERSION", f"{sys.version_info.major}.{sys.version_info.minor}") + force_rebuild = args.force_rebuild or os.environ.get(FORCE_REBUILD_ENV_VAR, "false").lower() == "true" + + os.makedirs(venv_dir, exist_ok=True) + + servers = discover_servers(args.filter) + if not servers: + print("No servers found to prefetch.") + sys.exit(0) + + head_server_deps = get_head_server_deps() + + print(f"Prefetching {len(servers)} server venvs into {venv_dir}") + + prefetched = [] + failed = [] + total_start = time.time() + + for label, server_dir in servers: + t0 = time.time() + if prefetch_venv(label, server_dir, venv_dir, python_version, head_server_deps, force_rebuild=force_rebuild): + print(f" ✓ {label} ({time.time() - t0:.1f}s)") + prefetched.append(label) + else: + print(f" ✗ {label} ({time.time() - t0:.1f}s)") + failed.append(label) + + print(f"Done: {len(prefetched)} ok, {len(failed)} failed in {time.time() - total_start:.1f}s") + + if failed: + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/tests/unit_tests/test_prefetch_gym_venvs.py b/tests/unit_tests/test_prefetch_gym_venvs.py new file mode 100644 index 000000000..3db39dfd7 --- /dev/null +++ b/tests/unit_tests/test_prefetch_gym_venvs.py @@ -0,0 +1,346 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from pathlib import Path +from subprocess import CompletedProcess +from unittest.mock import patch + +import pytest + +from nemo_gym.prefetch_gym_venvs import discover_servers, get_head_server_deps, prefetch_venv + + +@pytest.fixture +def fake_gym_root(tmp_path: Path) -> Path: + """Create a minimal fake Gym directory structure for testing discovery.""" + # responses_api_models/vllm_model (pyproject.toml) + (tmp_path / "responses_api_models" / "vllm_model").mkdir(parents=True) + (tmp_path / "responses_api_models" / "vllm_model" / "pyproject.toml").write_text("[project]\nname='vllm-model'\n") + + # responses_api_models/openai_model (requirements.txt) + (tmp_path / "responses_api_models" / "openai_model").mkdir(parents=True) + (tmp_path / "responses_api_models" / "openai_model" / "requirements.txt").write_text("openai\n") + + # resources_servers/math_with_judge (requirements.txt) + (tmp_path / "resources_servers" / "math_with_judge").mkdir(parents=True) + (tmp_path / "resources_servers" / "math_with_judge" / "requirements.txt").write_text("math-verify\n") + + # resources_servers/no_deps_server (no pyproject.toml or requirements.txt — should be skipped) + (tmp_path / "resources_servers" / "no_deps_server").mkdir(parents=True) + + # responses_api_agents/simple_agent (requirements.txt) + (tmp_path / "responses_api_agents" / "simple_agent").mkdir(parents=True) + (tmp_path / "responses_api_agents" / "simple_agent" / "requirements.txt").write_text("nemo-gym\n") + + # A regular file that should be ignored (not a directory) + (tmp_path / "responses_api_models" / "README.md").write_text("# Models\n") + + return tmp_path + + +class TestDiscoverServers: + def test_discovers_all_servers(self, fake_gym_root: Path) -> None: + with patch("nemo_gym.prefetch_gym_venvs.GYM_ROOT", fake_gym_root): + servers = discover_servers() + + labels = [label for label, _ in servers] + assert len(servers) == 4 + assert "responses_api_models/vllm_model" in labels + assert "responses_api_models/openai_model" in labels + assert "resources_servers/math_with_judge" in labels + assert "responses_api_agents/simple_agent" in labels + + def test_skips_dirs_without_deps(self, fake_gym_root: Path) -> None: + with patch("nemo_gym.prefetch_gym_venvs.GYM_ROOT", fake_gym_root): + servers = discover_servers() + + labels = [label for label, _ in servers] + assert "resources_servers/no_deps_server" not in labels + + def test_skips_non_directories(self, fake_gym_root: Path) -> None: + with patch("nemo_gym.prefetch_gym_venvs.GYM_ROOT", fake_gym_root): + servers = discover_servers() + + # README.md under responses_api_models should not appear + labels = [label for label, _ in servers] + assert not any("README" in label for label in labels) + + def test_filter_single(self, fake_gym_root: Path) -> None: + with patch("nemo_gym.prefetch_gym_venvs.GYM_ROOT", fake_gym_root): + servers = discover_servers(filters=["vllm_model"]) + + assert len(servers) == 1 + assert servers[0][0] == "responses_api_models/vllm_model" + + def test_filter_multiple(self, fake_gym_root: Path) -> None: + with patch("nemo_gym.prefetch_gym_venvs.GYM_ROOT", fake_gym_root): + servers = discover_servers(filters=["vllm_model", "simple_agent"]) + + labels = [label for label, _ in servers] + assert len(servers) == 2 + assert "responses_api_models/vllm_model" in labels + assert "responses_api_agents/simple_agent" in labels + + def test_filter_no_match(self, fake_gym_root: Path) -> None: + with patch("nemo_gym.prefetch_gym_venvs.GYM_ROOT", fake_gym_root): + servers = discover_servers(filters=["nonexistent"]) + + assert len(servers) == 0 + + def test_filter_is_exact_match(self, fake_gym_root: Path) -> None: + """Filters match the exact server directory name, not substrings.""" + # Add a local_vllm_model to ensure "vllm_model" doesn't match it + (fake_gym_root / "responses_api_models" / "local_vllm_model").mkdir(parents=True) + (fake_gym_root / "responses_api_models" / "local_vllm_model" / "requirements.txt").write_text("vllm\n") + + with patch("nemo_gym.prefetch_gym_venvs.GYM_ROOT", fake_gym_root): + servers = discover_servers(filters=["vllm_model"]) + + labels = [label for label, _ in servers] + assert len(servers) == 1 + assert "responses_api_models/vllm_model" in labels + assert "responses_api_models/local_vllm_model" not in labels + + def test_filter_is_case_sensitive(self, fake_gym_root: Path) -> None: + with patch("nemo_gym.prefetch_gym_venvs.GYM_ROOT", fake_gym_root): + servers = discover_servers(filters=["VLLM_MODEL"]) + + assert len(servers) == 0 + + def test_missing_server_type_dir(self, fake_gym_root: Path) -> None: + """If a server type directory doesn't exist, it's silently skipped.""" + import shutil + + shutil.rmtree(fake_gym_root / "responses_api_agents") + with patch("nemo_gym.prefetch_gym_venvs.GYM_ROOT", fake_gym_root): + servers = discover_servers() + + labels = [label for label, _ in servers] + assert len(servers) == 3 + assert not any("responses_api_agents" in label for label in labels) + + def test_results_are_sorted(self, fake_gym_root: Path) -> None: + with patch("nemo_gym.prefetch_gym_venvs.GYM_ROOT", fake_gym_root): + servers = discover_servers() + + # Within each server type, names should be sorted + model_servers = [label for label, _ in servers if label.startswith("responses_api_models/")] + assert model_servers == sorted(model_servers) + + +class TestPrefetchVenv: + def test_success_with_pyproject(self, tmp_path: Path) -> None: + server_dir = tmp_path / "vllm_model" + server_dir.mkdir() + (server_dir / "pyproject.toml").write_text("[project]\nname='test'\n") + + with patch("nemo_gym.prefetch_gym_venvs.subprocess.run") as mock_run: + mock_run.return_value = CompletedProcess(args=[], returncode=0, stdout="", stderr="") + + result = prefetch_venv( + "responses_api_models/vllm_model", + server_dir, + str(tmp_path / "venvs"), + "3.12", + ["ray[default]==2.46.0"], + ) + + assert result is True + assert mock_run.call_count == 2 + + # First call: uv venv + venv_call = mock_run.call_args_list[0] + assert "uv" in venv_call[0][0] + assert "venv" in venv_call[0][0] + assert str(tmp_path / "venvs" / "vllm_model") in venv_call[0][0] + + # Second call: uv pip install -e . + pip_call = mock_run.call_args_list[1] + assert "-e" in pip_call[0][0] + assert "ray[default]==2.46.0" in pip_call[0][0] + + def test_success_with_requirements(self, tmp_path: Path) -> None: + server_dir = tmp_path / "math_with_judge" + server_dir.mkdir() + (server_dir / "requirements.txt").write_text("math-verify\n") + + with patch("nemo_gym.prefetch_gym_venvs.subprocess.run") as mock_run: + mock_run.return_value = CompletedProcess(args=[], returncode=0, stdout="", stderr="") + + result = prefetch_venv( + "resources_servers/math_with_judge", + server_dir, + str(tmp_path / "venvs"), + "3.12", + [], + ) + + assert result is True + pip_call = mock_run.call_args_list[1] + assert "-r" in pip_call[0][0] + + def test_venv_creation_failure(self, tmp_path: Path) -> None: + server_dir = tmp_path / "broken_server" + server_dir.mkdir() + (server_dir / "requirements.txt").write_text("some-pkg\n") + + with patch("nemo_gym.prefetch_gym_venvs.subprocess.run") as mock_run: + mock_run.return_value = CompletedProcess( + args=[], returncode=1, stdout="", stderr="error: python 99.99 not found" + ) + + result = prefetch_venv( + "resources_servers/broken_server", + server_dir, + str(tmp_path / "venvs"), + "99.99", + [], + ) + + assert result is False + # Should stop after venv creation failure, not attempt pip install + assert mock_run.call_count == 1 + + def test_pip_install_failure(self, tmp_path: Path) -> None: + server_dir = tmp_path / "bad_deps" + server_dir.mkdir() + (server_dir / "requirements.txt").write_text("nonexistent-pkg-xyz\n") + + with patch("nemo_gym.prefetch_gym_venvs.subprocess.run") as mock_run: + mock_run.side_effect = [ + CompletedProcess(args=[], returncode=0, stdout="", stderr=""), # venv succeeds + CompletedProcess(args=[], returncode=1, stdout="", stderr="No matching distribution"), # pip fails + ] + + result = prefetch_venv( + "resources_servers/bad_deps", + server_dir, + str(tmp_path / "venvs"), + "3.12", + [], + ) + + assert result is False + assert mock_run.call_count == 2 + + def test_force_rebuild_removes_existing_venv(self, tmp_path: Path) -> None: + server_dir = tmp_path / "stale_server" + server_dir.mkdir() + (server_dir / "requirements.txt").write_text("some-pkg\n") + + # Create a pre-existing venv directory to simulate a stale venv + venvs_dir = tmp_path / "venvs" + existing_venv = venvs_dir / "stale_server" + existing_venv.mkdir(parents=True) + (existing_venv / "marker.txt").write_text("i should be deleted") + + with patch("nemo_gym.prefetch_gym_venvs.subprocess.run") as mock_run: + mock_run.return_value = CompletedProcess(args=[], returncode=0, stdout="", stderr="") + + result = prefetch_venv( + "resources_servers/stale_server", + server_dir, + str(venvs_dir), + "3.12", + [], + force_rebuild=True, + ) + + assert result is True + # The old venv directory should have been removed (shutil.rmtree) + # then recreated by uv venv, so the marker file should be gone + assert not (existing_venv / "marker.txt").exists() + + def test_force_rebuild_false_keeps_existing_venv(self, tmp_path: Path) -> None: + server_dir = tmp_path / "good_server" + server_dir.mkdir() + (server_dir / "requirements.txt").write_text("some-pkg\n") + + # Create a pre-existing venv directory + venvs_dir = tmp_path / "venvs" + existing_venv = venvs_dir / "good_server" + existing_venv.mkdir(parents=True) + (existing_venv / "marker.txt").write_text("i should survive") + + with patch("nemo_gym.prefetch_gym_venvs.subprocess.run") as mock_run: + mock_run.return_value = CompletedProcess(args=[], returncode=0, stdout="", stderr="") + + result = prefetch_venv( + "resources_servers/good_server", + server_dir, + str(venvs_dir), + "3.12", + [], + force_rebuild=False, + ) + + assert result is True + # The marker file should still be there + assert (existing_venv / "marker.txt").exists() + + def test_empty_head_server_deps(self, tmp_path: Path) -> None: + server_dir = tmp_path / "minimal" + server_dir.mkdir() + (server_dir / "requirements.txt").write_text("fastapi\n") + + with patch("nemo_gym.prefetch_gym_venvs.subprocess.run") as mock_run: + mock_run.return_value = CompletedProcess(args=[], returncode=0, stdout="", stderr="") + + result = prefetch_venv( + "resources_servers/minimal", + server_dir, + str(tmp_path / "venvs"), + "3.12", + [], # no head_server_deps + ) + + assert result is True + pip_call = mock_run.call_args_list[1] + # Should not contain any extra deps beyond -r requirements.txt + pip_args = pip_call[0][0] + assert pip_args[-1] == str(server_dir / "requirements.txt") + + +class TestGetHeadServerDeps: + def test_with_ray_and_openai(self) -> None: + with ( + patch.dict("sys.modules", {"ray": type("ray", (), {"__version__": "2.46.0"})()}), + patch.dict("sys.modules", {"openai": type("openai", (), {"__version__": "2.6.1"})()}), + ): + # Need to reimport to pick up the mocked modules + import importlib + + import nemo_gym.prefetch_gym_venvs as mod + + importlib.reload(mod) + deps = mod.get_head_server_deps() + + assert "ray[default]==2.46.0" in deps + assert "openai==2.6.1" in deps + + def test_without_ray_or_openai(self) -> None: + """When ray/openai aren't importable, returns empty list.""" + import builtins + + original_import = builtins.__import__ + + def mock_import(name, *args, **kwargs): + if name in ("ray", "openai"): + raise ImportError(f"No module named '{name}'") + return original_import(name, *args, **kwargs) + + with patch("builtins.__import__", side_effect=mock_import): + deps = get_head_server_deps() + + assert deps == []