diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index 8eb0edff..13596ebf 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -323,7 +323,7 @@ def launch(self, name: str, cmd: list[str]) -> tuple[str, str]: ln -s {self.pvc_job_dir}/ /nemo_run cd /nemo_run/code mkdir -p {self.pvc_job_dir}/logs -{" ".join(cmd)} 2>&1 | tee -a {self.pvc_job_dir}/logs/output-$HOSTNAME.log +{" ".join(cmd)} 2>&1 | tee -a {self.pvc_job_dir}/log_$HOSTNAME.out {self.pvc_job_dir}/log-allranks_0.out """ with open(os.path.join(self.job_dir, "launch_script.sh"), "w+") as f: f.write(launch_script) @@ -394,8 +394,11 @@ def fetch_logs( files = [] while len(files) < self.nodes: - files = list(glob.glob(f"{self.pvc_job_dir}/logs/output-*.log")) - logger.info(f"Waiting for {self.nodes - len(files)} log files to be created...") + files = list(glob.glob(f"{self.pvc_job_dir}/log_*.out")) + files = [f for f in files if "log-allranks_0" not in f] + logger.info( + f"Waiting for {self.nodes + 1 - len(files)} log files to be created in {self.pvc_job_dir}..." + ) time.sleep(3) cmd.extend(files) @@ -410,7 +413,7 @@ def fetch_logs( for line in iter(proc.stdout.readline, ""): if ( line - and not line.rstrip("\n").endswith(".log <==") + and not line.rstrip("\n").endswith(".out <==") and line.rstrip("\n") != "" ): yield f"{line}" diff --git a/test/core/execution/test_dgxcloud.py b/test/core/execution/test_dgxcloud.py index e8042f68..9098b5c5 100644 --- a/test/core/execution/test_dgxcloud.py +++ b/test/core/execution/test_dgxcloud.py @@ -110,8 +110,8 @@ def test_fetch_logs_streaming(self, mock_sleep, mock_popen, mock_glob): # Mock log files mock_glob.return_value = [ - "/workspace/nemo_run/experiments/exp1/task1/logs/output-worker-0.log", - "/workspace/nemo_run/experiments/exp1/task1/logs/output-worker-1.log", + "/workspace/nemo_run/experiments/exp1/task1/log_worker-0.out", + "/workspace/nemo_run/experiments/exp1/task1/log_worker-1.out", ] # Mock process that yields log lines @@ -162,7 +162,7 @@ def test_fetch_logs_non_streaming(self, mock_sleep, mock_popen, mock_glob): # Mock log files mock_glob.return_value = [ - "/workspace/nemo_run/experiments/exp1/task1/logs/output-worker-0.log", + "/workspace/nemo_run/experiments/exp1/task1/log_worker-0.out", ] # Mock process that yields log lines @@ -232,7 +232,7 @@ def test_fetch_logs_waits_for_running_status(self, mock_glob, mock_sleep): with patch.object(executor, "status", side_effect=status_values): # Mock glob to prevent it from blocking - mock_glob.return_value = ["/workspace/nemo_run/logs/output.log"] + mock_glob.return_value = ["/workspace/nemo_run/logs/outputlog_"] with patch("subprocess.Popen") as mock_popen: mock_process = MagicMock() @@ -257,10 +257,10 @@ def test_fetch_logs_waits_for_log_files(self, mock_popen, mock_glob, mock_sleep) # Mock glob to return incomplete files first, then all files mock_glob.side_effect = [ [], # No files yet - ["/workspace/nemo_run/experiments/exp1/task1/logs/output-worker-0.log"], # 1 of 2 + ["/workspace/nemo_run/experiments/exp1/task1/log_worker-0.out"], # 1 of 2 [ # All 2 files - "/workspace/nemo_run/experiments/exp1/task1/logs/output-worker-0.log", - "/workspace/nemo_run/experiments/exp1/task1/logs/output-worker-1.log", + "/workspace/nemo_run/experiments/exp1/task1/log_worker-0.out", + "/workspace/nemo_run/experiments/exp1/task1/log_worker-1.out", ], ]