diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index 2cacb1a1..8eb0edff 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -14,13 +14,12 @@ # limitations under the License. import base64 +import glob import json import logging import os -import queue import subprocess import tempfile -import threading import time from dataclasses import dataclass, field from enum import Enum @@ -323,7 +322,8 @@ def launch(self, name: str, cmd: list[str]) -> tuple[str, str]: launch_script = f""" ln -s {self.pvc_job_dir}/ /nemo_run cd /nemo_run/code -{" ".join(cmd)} +mkdir -p {self.pvc_job_dir}/logs +{" ".join(cmd)} 2>&1 | tee -a {self.pvc_job_dir}/logs/output-$HOSTNAME.log """ with open(os.path.join(self.job_dir, "launch_script.sh"), "w+") as f: f.write(launch_script) @@ -371,18 +371,6 @@ def status(self, job_id: str) -> Optional[DGXCloudState]: r_json = response.json() return DGXCloudState(r_json["phase"]) - def _stream_url_sync(self, url: str, headers: dict, q: queue.Queue): - """Stream a single URL using requests and put chunks into the queue""" - try: - with requests.get(url, stream=True, headers=headers, verify=False) as response: - for line in response.iter_lines(decode_unicode=True): - q.put((url, f"{line}\n")) - except Exception as e: - logger.error(f"Error streaming URL {url}: {e}") - - finally: - q.put((url, None)) - def fetch_logs( self, job_id: str, @@ -390,72 +378,59 @@ def fetch_logs( stderr: Optional[bool] = None, stdout: Optional[bool] = None, ) -> Iterable[str]: - token = self.get_auth_token() - if not token: - logger.error("Failed to retrieve auth token for fetch logs request.") - yield "" - - response = requests.get( - f"{self.base_url}/workloads", headers=self._default_headers(token=token) - ) - workload_name = next( - ( - workload["name"] - for workload in response.json()["workloads"] - if workload["id"] == job_id - ), - None, - ) - if workload_name is None: - logger.error(f"No workload found with id {job_id}") - yield "" + while self.status(job_id) != DGXCloudState.RUNNING: + logger.info("Waiting for job to start...") + time.sleep(15) - urls = [ - f"{self.kube_apiserver_url}/api/v1/namespaces/runai-{self.project_name}/pods/{workload_name}-worker-{i}/log?container=pytorch" - for i in range(self.nodes) - ] + cmd = ["tail"] if stream: - urls = [url + "&follow=true" for url in urls] + cmd.append("-f") - while self.status(job_id) != DGXCloudState.RUNNING: - logger.info("Waiting for job to start...") - time.sleep(15) + # setting linked PVC job directory + nemo_run_home = get_nemorun_home() + job_subdir = self.job_dir[len(nemo_run_home) + 1 :] # +1 to remove the initial backslash + self.pvc_job_dir = os.path.join(self.pvc_nemo_run_dir, job_subdir) - time.sleep(10) + files = [] + while len(files) < self.nodes: + files = list(glob.glob(f"{self.pvc_job_dir}/logs/output-*.log")) + logger.info(f"Waiting for {self.nodes - len(files)} log files to be created...") + time.sleep(3) - q = queue.Queue() - active_urls = set(urls) + cmd.extend(files) - # Start threads - threads = [ - threading.Thread( - target=self._stream_url_sync, args=(url, self._default_headers(token=token), q) - ) - for url in urls - ] - for t in threads: - t.start() - - # Yield chunks as they arrive - while active_urls: - url, item = q.get() - if item is None or self.status(job_id) in [ - DGXCloudState.DELETING, - DGXCloudState.STOPPED, - DGXCloudState.STOPPING, - DGXCloudState.DEGRADED, - DGXCloudState.FAILED, - DGXCloudState.COMPLETED, - DGXCloudState.TERMINATING, - ]: - active_urls.discard(url) - else: - yield item - - # Wait for threads - for t in threads: - t.join() + logger.info(f"Attempting to stream logs with command: {cmd}") + + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, text=True, bufsize=1) + + if stream: + while True: + try: + for line in iter(proc.stdout.readline, ""): + if ( + line + and not line.rstrip("\n").endswith(".log <==") + and line.rstrip("\n") != "" + ): + yield f"{line}" + if proc.poll() is not None: + break + except Exception as e: + logger.error(f"Error streaming logs: {e}") + time.sleep(3) + continue + + else: + try: + for line in iter(proc.stdout.readline, ""): + if line: + yield line.rstrip("\n") + if proc.poll() is not None: + break + finally: + proc.terminate() + proc.wait(timeout=2) def cancel(self, job_id: str): # Retrieve the authentication token for the REST calls diff --git a/test/core/execution/test_dgxcloud.py b/test/core/execution/test_dgxcloud.py index 265c3377..e8042f68 100644 --- a/test/core/execution/test_dgxcloud.py +++ b/test/core/execution/test_dgxcloud.py @@ -13,14 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging import os import subprocess import tempfile from unittest.mock import MagicMock, mock_open, patch import pytest -import requests from nemo_run.config import set_nemorun_home from nemo_run.core.execution.dgxcloud import DGXCloudExecutor, DGXCloudState @@ -83,11 +81,12 @@ def test_get_auth_token_success(self, mock_post): ) @patch("requests.post") - def test_get_auth_token_failure(self, mock_post): + @patch("time.sleep") + def test_get_auth_token_failure(self, mock_sleep, mock_post): mock_response = MagicMock() mock_response.text = '{"error": "Invalid credentials"}' mock_post.return_value = mock_response - + mock_sleep.return_value = None executor = DGXCloudExecutor( base_url="https://dgxapi.example.com", kube_apiserver_url="https://127.0.0.1:443", @@ -102,171 +101,193 @@ def test_get_auth_token_failure(self, mock_post): assert token is None - def test_fetch_no_token(self, caplog): - with ( - patch.object(DGXCloudExecutor, "get_auth_token", return_value=None), - caplog.at_level(logging.ERROR), - ): - executor = DGXCloudExecutor( - base_url="https://dgxapi.example.com", - kube_apiserver_url="https://127.0.0.1:443", - app_id="test_app_id", - app_secret="test_app_secret", - project_name="test_project", - container_image="nvcr.io/nvidia/test:latest", - pvc_nemo_run_dir="/workspace/nemo_run", - ) + @patch("glob.glob") + @patch("subprocess.Popen") + @patch("time.sleep") + def test_fetch_logs_streaming(self, mock_sleep, mock_popen, mock_glob): + """Test fetch_logs in streaming mode.""" + set_nemorun_home("/nemo_home") - logs_iter = executor.fetch_logs("123", stream=True) - assert next(logs_iter) == "" - assert ( - caplog.records[-1].message - == "Failed to retrieve auth token for fetch logs request." - ) - assert caplog.records[-1].levelname == "ERROR" - caplog.clear() - - @patch("nemo_run.core.execution.dgxcloud.requests.get") - def test_fetch_no_workload_with_name(self, mock_requests_get, caplog): - mock_workloads_response = MagicMock(spec=requests.Response) - mock_workloads_response.json.return_value = { - "workloads": [{"name": "hello-world", "id": "123"}] - } - - mock_requests_get.side_effect = [mock_workloads_response] - - with ( - patch.object(DGXCloudExecutor, "get_auth_token", return_value="test_token"), - caplog.at_level(logging.ERROR), - ): - executor = DGXCloudExecutor( - base_url="https://dgxapi.example.com", - kube_apiserver_url="https://127.0.0.1:443", - app_id="test_app_id", - app_secret="test_app_secret", - project_name="test_project", - container_image="nvcr.io/nvidia/test:latest", - pvc_nemo_run_dir="/workspace/nemo_run", - ) + # Mock log files + mock_glob.return_value = [ + "/workspace/nemo_run/experiments/exp1/task1/logs/output-worker-0.log", + "/workspace/nemo_run/experiments/exp1/task1/logs/output-worker-1.log", + ] - logs_iter = executor.fetch_logs("this-workload-does-not-exist", stream=True) - assert next(logs_iter) == "" - assert ( - caplog.records[-1].message - == "No workload found with id this-workload-does-not-exist" - ) - assert caplog.records[-1].levelname == "ERROR" - caplog.clear() - - @patch("nemo_run.core.execution.dgxcloud.requests.get") - @patch("nemo_run.core.execution.dgxcloud.time.sleep") - @patch("nemo_run.core.execution.dgxcloud.threading.Thread") - def test_fetch_logs(self, mock_threading_Thread, mock_sleep, mock_requests_get): - # --- 1. Setup Primitives for the *live* test --- - mock_log_response = MagicMock(spec=requests.Response) - - mock_log_response.iter_lines.return_value = iter( - ["this is a static log", "this is the last static log"] + # Mock process that yields log lines + mock_process = MagicMock() + mock_process.stdout.readline.side_effect = [ + "Log line 1\n", + "Log line 2\n", + "", # End of stream + ] + mock_process.poll.return_value = None + mock_popen.return_value = mock_process + mock_sleep.return_value = None + + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + kube_apiserver_url="https://127.0.0.1:443", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + nodes=2, ) - mock_log_response.__enter__.return_value = mock_log_response - - # Mock for the '/workloads' call - mock_workloads_response = MagicMock(spec=requests.Response) - mock_workloads_response.json.return_value = { - "workloads": [{"name": "hello-world", "id": "123"}] - } - - mock_queue_instance = MagicMock() - mock_queue_instance.get.side_effect = [ - ( - "https://127.0.0.1:443/api/v1/namespaces/runai-test_project/pods/hello-world-worker-0/log?container=pytorch&follow=true", - "this is a static log\n", - ), - ( - "https://127.0.0.1:443/api/v1/namespaces/runai-test_project/pods/hello-world-worker-0/log?container=pytorch&follow=true", - None, - ), - ( - "https://127.0.0.1:443/api/v1/namespaces/runai-test_project/pods/hello-world-worker-1/log?container=pytorch&follow=true", - None, - ), + executor.job_dir = "/nemo_home/experiments/exp1/task1" + + with patch.object(executor, "status", return_value=DGXCloudState.RUNNING): + logs_iter = executor.fetch_logs("job123", stream=True) + + # Consume first two log lines + log1 = next(logs_iter) + log2 = next(logs_iter) + + assert "Log line 1" in log1 + assert "Log line 2" in log2 + + # Verify subprocess was called with tail -f + mock_popen.assert_called_once() + call_args = mock_popen.call_args[0][0] + assert "tail" in call_args + assert "-f" in call_args + + @patch("glob.glob") + @patch("subprocess.Popen") + @patch("time.sleep") + def test_fetch_logs_non_streaming(self, mock_sleep, mock_popen, mock_glob): + """Test fetch_logs in non-streaming mode.""" + set_nemorun_home("/nemo_home") + + # Mock log files + mock_glob.return_value = [ + "/workspace/nemo_run/experiments/exp1/task1/logs/output-worker-0.log", ] - mock_requests_get.side_effect = [mock_workloads_response, mock_log_response] + # Mock process that yields log lines + mock_process = MagicMock() + mock_process.stdout.readline.side_effect = [ + "Log line 1\n", + "Log line 2\n", + "", # End of stream + ] + mock_process.poll.return_value = None + mock_popen.return_value = mock_process + mock_sleep.return_value = None - # --- 4. Setup Executor (inside the patch) --- - with ( - patch.object(DGXCloudExecutor, "get_auth_token", return_value="test_token"), - patch.object(DGXCloudExecutor, "status", return_value=DGXCloudState.RUNNING), - patch("nemo_run.core.execution.dgxcloud.queue.Queue", return_value=mock_queue_instance), - ): - executor = DGXCloudExecutor( - base_url="https://dgxapi.example.com", - kube_apiserver_url="https://127.0.0.1:443", - app_id="test_app_id", - app_secret="test_app_secret", - project_name="test_project", - container_image="nvcr.io/nvidia/test:latest", - pvc_nemo_run_dir="/workspace/nemo_run", - nodes=2, - ) + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + kube_apiserver_url="https://127.0.0.1:443", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + nodes=1, + ) + executor.job_dir = "/nemo_home/experiments/exp1/task1" - logs_iter = executor.fetch_logs("123", stream=True) + with patch.object(executor, "status", return_value=DGXCloudState.RUNNING): + logs_iter = executor.fetch_logs("job123", stream=False) - assert next(logs_iter) == "this is a static log\n" + # Consume log lines + logs = list(logs_iter) - mock_sleep.assert_called_once_with(10) + assert len(logs) == 2 + assert logs[0] == "Log line 1" + assert logs[1] == "Log line 2" - mock_threading_Thread.assert_any_call( - target=executor._stream_url_sync, - args=( - "https://127.0.0.1:443/api/v1/namespaces/runai-test_project/pods/hello-world-worker-0/log?container=pytorch&follow=true", - executor._default_headers(token="test_token"), - mock_queue_instance, - ), - ) - mock_threading_Thread.assert_any_call( - target=executor._stream_url_sync, - args=( - "https://127.0.0.1:443/api/v1/namespaces/runai-test_project/pods/hello-world-worker-1/log?container=pytorch&follow=true", - executor._default_headers(token="test_token"), - mock_queue_instance, - ), - ) - with pytest.raises(StopIteration): - next(logs_iter) + # Verify subprocess was called with tail (no -f) + mock_popen.assert_called_once() + call_args = mock_popen.call_args[0][0] + assert "tail" in call_args + assert "-f" not in call_args - @patch("nemo_run.core.execution.dgxcloud.requests.get") - def test__stream_url_sync(self, mock_requests_get): - # --- 1. Setup Primitives for the *live* test --- - mock_log_response = MagicMock(spec=requests.Response) + # Verify process was terminated + mock_process.terminate.assert_called_once() + mock_process.wait.assert_called_once() - mock_log_response.iter_lines.return_value = iter( - ["this is a static log", "this is the last static log"] + @patch("time.sleep") + @patch("glob.glob") + def test_fetch_logs_waits_for_running_status(self, mock_glob, mock_sleep): + """Test that fetch_logs waits for job to be RUNNING.""" + set_nemorun_home("/nemo_home") + + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + kube_apiserver_url="https://127.0.0.1:443", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + nodes=1, ) - mock_log_response.__enter__.return_value = mock_log_response + executor.job_dir = "/nemo_home/experiments/exp1/task1" - mock_requests_get.side_effect = [mock_log_response] + # Mock status to return PENDING then RUNNING + status_values = [DGXCloudState.PENDING, DGXCloudState.PENDING, DGXCloudState.RUNNING] + mock_sleep.return_value = None - mock_queue_instance = MagicMock() + with patch.object(executor, "status", side_effect=status_values): + # Mock glob to prevent it from blocking + mock_glob.return_value = ["/workspace/nemo_run/logs/output.log"] - with patch( - "nemo_run.core.execution.dgxcloud.queue.Queue", return_value=mock_queue_instance - ): - executor = DGXCloudExecutor( - base_url="https://dgxapi.example.com", - kube_apiserver_url="https://127.0.0.1:443", - app_id="test_app_id", - app_secret="test_app_secret", - project_name="test_project", - container_image="nvcr.io/nvidia/test:latest", - pvc_nemo_run_dir="/workspace/nemo_run", - nodes=2, - ) + with patch("subprocess.Popen") as mock_popen: + mock_process = MagicMock() + mock_process.stdout.readline.return_value = "" + mock_process.poll.return_value = 0 + mock_popen.return_value = mock_process + + logs_iter = executor.fetch_logs("job123", stream=False) + # Consume the iterator to trigger the logic + list(logs_iter) + + # Should have slept while waiting for RUNNING status + assert mock_sleep.call_count >= 2 + + @patch("time.sleep") + @patch("glob.glob") + @patch("subprocess.Popen") + def test_fetch_logs_waits_for_log_files(self, mock_popen, mock_glob, mock_sleep): + """Test that fetch_logs waits for all log files to be created.""" + set_nemorun_home("/nemo_home") + + # Mock glob to return incomplete files first, then all files + mock_glob.side_effect = [ + [], # No files yet + ["/workspace/nemo_run/experiments/exp1/task1/logs/output-worker-0.log"], # 1 of 2 + [ # All 2 files + "/workspace/nemo_run/experiments/exp1/task1/logs/output-worker-0.log", + "/workspace/nemo_run/experiments/exp1/task1/logs/output-worker-1.log", + ], + ] + + mock_process = MagicMock() + mock_process.stdout.readline.return_value = "" + mock_process.poll.return_value = 0 + mock_popen.return_value = mock_process + mock_sleep.return_value = None + + executor = DGXCloudExecutor( + base_url="https://dgxapi.example.com", + kube_apiserver_url="https://127.0.0.1:443", + app_id="test_app_id", + app_secret="test_app_secret", + project_name="test_project", + container_image="nvcr.io/nvidia/test:latest", + pvc_nemo_run_dir="/workspace/nemo_run", + nodes=2, # Expecting 2 log files + ) + executor.job_dir = "/nemo_home/experiments/exp1/task1" - executor._stream_url_sync("123", "some-headers", mock_queue_instance) + with patch.object(executor, "status", return_value=DGXCloudState.RUNNING): + logs_iter = executor.fetch_logs("job123", stream=False) + list(logs_iter) # Consume the iterator - mock_queue_instance.put.assert_any_call(("123", "this is a static log\n")) + # Should have called glob multiple times waiting for files + assert mock_glob.call_count == 3 @patch("requests.get") def test_get_project_and_cluster_id_success(self, mock_get):