Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions nemo_run/core/execution/dgxcloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ def launch(self, name: str, cmd: list[str]) -> tuple[str, str]:
ln -s {self.pvc_job_dir}/ /nemo_run
cd /nemo_run/code
mkdir -p {self.pvc_job_dir}/logs
{" ".join(cmd)} 2>&1 | tee -a {self.pvc_job_dir}/logs/output-$HOSTNAME.log
{" ".join(cmd)} 2>&1 | tee -a {self.pvc_job_dir}/log_$HOSTNAME.out {self.pvc_job_dir}/log-allranks_0.out
"""
with open(os.path.join(self.job_dir, "launch_script.sh"), "w+") as f:
f.write(launch_script)
Expand Down Expand Up @@ -394,8 +394,11 @@ def fetch_logs(

files = []
while len(files) < self.nodes:
files = list(glob.glob(f"{self.pvc_job_dir}/logs/output-*.log"))
logger.info(f"Waiting for {self.nodes - len(files)} log files to be created...")
files = list(glob.glob(f"{self.pvc_job_dir}/log_*.out"))
files = [f for f in files if "log-allranks_0" not in f]
logger.info(
f"Waiting for {self.nodes + 1 - len(files)} log files to be created in {self.pvc_job_dir}..."
)
time.sleep(3)

cmd.extend(files)
Expand All @@ -410,7 +413,7 @@ def fetch_logs(
for line in iter(proc.stdout.readline, ""):
if (
line
and not line.rstrip("\n").endswith(".log <==")
and not line.rstrip("\n").endswith(".out <==")
and line.rstrip("\n") != ""
):
yield f"{line}"
Expand Down
14 changes: 7 additions & 7 deletions test/core/execution/test_dgxcloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ def test_fetch_logs_streaming(self, mock_sleep, mock_popen, mock_glob):

# Mock log files
mock_glob.return_value = [
"/workspace/nemo_run/experiments/exp1/task1/logs/output-worker-0.log",
"/workspace/nemo_run/experiments/exp1/task1/logs/output-worker-1.log",
"/workspace/nemo_run/experiments/exp1/task1/log_worker-0.out",
"/workspace/nemo_run/experiments/exp1/task1/log_worker-1.out",
]

# Mock process that yields log lines
Expand Down Expand Up @@ -162,7 +162,7 @@ def test_fetch_logs_non_streaming(self, mock_sleep, mock_popen, mock_glob):

# Mock log files
mock_glob.return_value = [
"/workspace/nemo_run/experiments/exp1/task1/logs/output-worker-0.log",
"/workspace/nemo_run/experiments/exp1/task1/log_worker-0.out",
]

# Mock process that yields log lines
Expand Down Expand Up @@ -232,7 +232,7 @@ def test_fetch_logs_waits_for_running_status(self, mock_glob, mock_sleep):

with patch.object(executor, "status", side_effect=status_values):
# Mock glob to prevent it from blocking
mock_glob.return_value = ["/workspace/nemo_run/logs/output.log"]
mock_glob.return_value = ["/workspace/nemo_run/logs/outputlog_"]

with patch("subprocess.Popen") as mock_popen:
mock_process = MagicMock()
Expand All @@ -257,10 +257,10 @@ def test_fetch_logs_waits_for_log_files(self, mock_popen, mock_glob, mock_sleep)
# Mock glob to return incomplete files first, then all files
mock_glob.side_effect = [
[], # No files yet
["/workspace/nemo_run/experiments/exp1/task1/logs/output-worker-0.log"], # 1 of 2
["/workspace/nemo_run/experiments/exp1/task1/log_worker-0.out"], # 1 of 2
[ # All 2 files
"/workspace/nemo_run/experiments/exp1/task1/logs/output-worker-0.log",
"/workspace/nemo_run/experiments/exp1/task1/logs/output-worker-1.log",
"/workspace/nemo_run/experiments/exp1/task1/log_worker-0.out",
"/workspace/nemo_run/experiments/exp1/task1/log_worker-1.out",
],
]

Expand Down
Loading