Skip to content

Commit 4750614

Browse files
authored
local_scheduler: ignore bad utf-8 data instead of crashing (#718)
1 parent 1577ee6 commit 4750614

File tree

2 files changed

+26
-4
lines changed

2 files changed

+26
-4
lines changed

torchx/schedulers/local_scheduler.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ def _fmt_io_filename(std_io: Optional[BinaryIO]) -> str:
438438

439439
def __repr__(self) -> str:
440440
role_to_pid = {}
441-
for (role_name, replicas) in self.role_replicas.items():
441+
for role_name, replicas in self.role_replicas.items():
442442
pids = role_to_pid.setdefault(role_name, [])
443443
for r in replicas:
444444
pids.append(r.proc.pid)
@@ -612,7 +612,7 @@ def _evict_lru(self) -> bool:
612612
"""
613613
lru_time = sys.maxsize
614614
lru_app_id = None
615-
for (app_id, app) in self._apps.items():
615+
for app_id, app in self._apps.items():
616616
if is_terminal(app.state):
617617
if app.last_updated <= lru_time:
618618
lru_app_id = app_id
@@ -988,7 +988,7 @@ def _cancel_existing(self, app_id: str) -> None:
988988

989989
def close(self) -> None:
990990
# terminate all apps
991-
for (app_id, app) in self._apps.items():
991+
for app_id, app in self._apps.items():
992992
log.debug(f"Terminating app: {app_id}")
993993
app.kill()
994994
# delete logdir if torchx created a log dir
@@ -1037,7 +1037,12 @@ def __iter__(self) -> "LogIterator":
10371037
self._check_finished() # check to see if app has finished running
10381038

10391039
if os.path.isfile(self._log_file):
1040-
self._log_fp = open(self._log_file, "rt", newline="\n") # noqa: P201
1040+
self._log_fp = open(
1041+
self._log_file,
1042+
mode="rt",
1043+
newline="\n",
1044+
errors="replace", # replace bad utf-8 with \uFFFD
1045+
) # noqa: P201
10411046
break
10421047

10431048
if self._app_finished:

torchx/schedulers/test/local_scheduler_test.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,9 @@ def setUp(self) -> None:
223223
session_name="test_session",
224224
image_provider_class=LocalDirectoryImageProvider,
225225
)
226+
write_shell_script(
227+
self.test_dir, "print_invalid_utf8.sh", ["printf '\\x02\\xc5\\xd8'"]
228+
)
226229

227230
def tearDown(self) -> None:
228231
self.scheduler.close()
@@ -672,6 +675,20 @@ def test_log_iterator_no_log_dir(self) -> None:
672675
logs = list(self.scheduler.log_iter(app_id, "role1", k=0))
673676
self.assertEqual(len(logs), 11)
674677

678+
def test_log_iterator_invalid_utf8(self) -> None:
679+
role = Role(
680+
"role1",
681+
image=self.test_dir,
682+
entrypoint="print_invalid_utf8.sh",
683+
num_replicas=1,
684+
)
685+
686+
app = AppDef(name="test_app", roles=[role])
687+
688+
app_id = self.scheduler.submit(app, cfg={})
689+
logs = list(self.scheduler.log_iter(app_id, "role1", k=0))
690+
self.assertEqual(logs, ["\x02\ufffd\ufffd"])
691+
675692
def test_submit_multiple_roles(self) -> None:
676693
test_file1 = join(self.test_dir, "test_file_1")
677694
test_file2 = join(self.test_dir, "test_file_2")

0 commit comments

Comments
 (0)