Skip to content

Commit 434c013

Browse files
d4l3kfacebook-github-bot
authored andcommitted
schedulers,cli: persist newline breaks in log_iter (#425)
Summary: This resolves #424 This makes it so the torchx scheduler `log_iter` method keeps the line breaks so downstream log streams can handle them gracefully. The current solution strips all `\n` characters and always adds them so it makes it impossible to do streaming visualizations of progress bars which use `\r` without a new line break. WARNING: This is a change in the log_iter interface and all schedulers/downstream consumers will need to be updated. If someone is logging from multiple workers this gets dangerous since the progress bar `\r` lines can clobber each other. Pull Request resolved: #425 Test Plan: (torchx-3.10.2) tristanr@tristanr-arch2 ~/D/torchx-proj> torchx run --scheduler local_docker --wait --log utils.python --script test_tqdm.py torchx 2022-03-15 14:26:42 INFO loaded configs from /home/tristanr/Developer/torchx-proj/.torchxconfig torchx 2022-03-15 14:26:42 INFO Building workspace: file:///home/tristanr/Developer/torchx-proj for role[0]: python, image: ghcr.io/pytorch/torchx:0.1.2 dev0 torchx 2022-03-15 14:26:43 INFO Done building workspace torchx 2022-03-15 14:26:43 INFO New image: sha256:9cfaf70f7143b4caef383b46c23635eaf001cbd3d9ff55335aa1ff8c5e236388 built from workspace local_docker://torchx/torchx_utils_python-bprr9rb4k764nd torchx 2022-03-15 14:26:44 INFO Waiting for the app to finish... python/0 100%|██████████| 100/100 [00:03<00:00, 32.95it/s] torchx 2022-03-15 14:26:48 INFO Job finished: SUCCEEDED (torchx-3.10.2) tristanr@tristanr-arch2 ~/D/torchx-proj> torchx run --scheduler local_cwd --wait --log utils.python --script test_tqdm.py torchx 2022-03-15 14:26:52 INFO loaded configs from /home/tristanr/Developer/torchx-proj/.torchxconfig torchx 2022-03-15 14:26:52 INFO Log files located in: /tmp/torchx_0nqvqm1d/torchx/torchx_utils_python-x217jjqhbkkrgd/python/0 local_cwd://torchx/torchx_utils_python-x217jjqhbkkrgd torchx 2022-03-15 14:26:52 INFO Waiting for the app to finish... python/0 100%|██████████| 100/100 [00:03<00:00, 32.95it/s] torchx 2022-03-15 14:26:56 INFO Job finished: SUCCEEDED Reviewed By: kiukchung Differential Revision: D34907682 Pulled By: d4l3k fbshipit-source-id: 34a23e71c909419499c391d28cbc568b54e7e197
1 parent 74560bb commit 434c013

15 files changed

+100
-48
lines changed

torchx/cli/cmd_log.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,17 @@ def validate(job_identifier: str) -> None:
3535
sys.exit(1)
3636

3737

38+
def _prefix_line(prefix: str, line: str) -> str:
39+
"""
40+
_prefix_line ensure the prefix is still present even when dealing with return characters
41+
"""
42+
if "\r" in line:
43+
line = line.replace("\r", f"\r{prefix}")
44+
if not line.startswith("\r"):
45+
line = f"{prefix}{line}"
46+
return line
47+
48+
3849
def print_log_lines(
3950
file: TextIO,
4051
runner: Runner,
@@ -55,7 +66,8 @@ def print_log_lines(
5566
should_tail=should_tail,
5667
streams=streams,
5768
):
58-
print(f"{GREEN}{role_name}/{replica_id}{ENDC} {line}", file=file)
69+
prefix = f"{GREEN}{role_name}/{replica_id}{ENDC} "
70+
print(_prefix_line(prefix, line), file=file, end="")
5971
except Exception as e:
6072
exceptions.put(e)
6173
raise

torchx/cli/test/cmd_log_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def log_lines(
6565
if regex is None:
6666
regex = ".*"
6767

68-
log_lines = ["INFO foo", "ERROR bar", "WARN baz"]
68+
log_lines = ["INFO foo\n", "ERROR bar\n", "WARN baz\n"]
6969
return iter([line for line in log_lines if re.match(regex, line)])
7070

7171

torchx/runner/api.py

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -469,28 +469,36 @@ def log_lines(
469469
if the scheduler has already totally or partially purged log records
470470
for the application.
471471
472+
Return lines will include whitespace characters such as ``\\n`` or
473+
``\\r``. When outputting the lines you should make sure to avoid adding
474+
extra newline characters.
475+
472476
Usage:
473477
474-
::
478+
.. code:: python
475479
476-
app_handle = session.run(app, scheduler="local", cfg=Dict[str, ConfigValue]())
480+
app_handle = session.run(app, scheduler="local", cfg=Dict[str, ConfigValue]())
477481
478-
print("== trainer node 0 logs ==")
479-
for line in session.log_lines(app_handle, "trainer", k=0):
480-
print(line)
482+
print("== trainer node 0 logs ==")
483+
for line in session.log_lines(app_handle, "trainer", k=0):
484+
# for prints newlines will already be present in the line
485+
print(line, end="")
486+
487+
# when writing to a file nothing extra is necessary
488+
f.write(line)
481489
482490
Discouraged anti-pattern:
483491
484-
::
492+
.. code:: python
485493
486-
# DO NOT DO THIS!
487-
# parses accuracy metric from log and reports it for this experiment run
488-
accuracy = -1
489-
for line in session.log_lines(app_handle, "trainer", k=0):
490-
if matches_regex(line, "final model_accuracy:[0-9]*"):
491-
accuracy = parse_accuracy(line)
492-
break
493-
report(experiment_name, accuracy)
494+
# DO NOT DO THIS!
495+
# parses accuracy metric from log and reports it for this experiment run
496+
accuracy = -1
497+
for line in session.log_lines(app_handle, "trainer", k=0):
498+
if matches_regex(line, "final model_accuracy:[0-9]*"):
499+
accuracy = parse_accuracy(line)
500+
break
501+
report(experiment_name, accuracy)
494502
495503
Args:
496504
app_handle: application handle

torchx/schedulers/api.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,11 @@ def log_iter(
263263
7. Some schedulers may support line cursors by supporting ``__getitem__``
264264
(e.g. ``iter[50]`` seeks to the 50th log line).
265265
266+
8. Whitespace is preserved, each new line should include ``\\n``. To
267+
support interactive progress bars the returned lines don't need to
268+
include ``\\n`` but should then be printed without a newline to
269+
correctly handle ``\\r`` carriage returns.
270+
266271
Args:
267272
streams: The IO output streams to select.
268273
One of: combined, stdout, stderr.
@@ -302,3 +307,19 @@ def filter_regex(regex: str, data: Iterable[str]) -> Iterable[str]:
302307

303308
r = re.compile(regex)
304309
return filter(lambda datum: r.search(datum), data)
310+
311+
312+
def split_lines(text: str) -> List[str]:
313+
"""
314+
split_lines splits the string by new lines and keeps the new line characters.
315+
"""
316+
lines = []
317+
while len(text) > 0:
318+
idx = text.find("\n")
319+
if idx >= 0:
320+
lines.append(text[: idx + 1])
321+
text = text[idx + 1 :]
322+
else:
323+
lines.append(text)
324+
break
325+
return lines

torchx/schedulers/aws_batch_scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -517,7 +517,7 @@ def _stream_events(
517517
next_token = response["nextForwardToken"]
518518

519519
for event in response["events"]:
520-
yield event["message"]
520+
yield event["message"] + "\n"
521521

522522

523523
def create_scheduler(session_name: str, **kwargs: object) -> AWSBatchScheduler:

torchx/schedulers/docker_scheduler.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
Scheduler,
2121
Stream,
2222
filter_regex,
23+
split_lines,
2324
)
2425
from torchx.schedulers.ids import make_unique
2526
from torchx.specs.api import (
@@ -425,7 +426,7 @@ def log_iter(
425426
if len(logs) == 0:
426427
logs = []
427428
else:
428-
logs = logs.split("\n")
429+
logs = split_lines(logs)
429430

430431
logs = map(_to_str, logs)
431432

@@ -438,8 +439,6 @@ def log_iter(
438439
def _to_str(a: Union[str, bytes]) -> str:
439440
if isinstance(a, bytes):
440441
a = a.decode("utf-8")
441-
if a.endswith("\n"):
442-
a = a[:-1]
443442
return a
444443

445444

torchx/schedulers/kubernetes_scheduler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
Scheduler,
5050
Stream,
5151
filter_regex,
52+
split_lines,
5253
)
5354
from torchx.schedulers.ids import make_unique
5455
from torchx.specs.api import (
@@ -640,7 +641,7 @@ def log_iter(
640641
iterator = w.stream(core_api.read_namespaced_pod_log, **args)
641642
else:
642643
resp = core_api.read_namespaced_pod_log(**args)
643-
iterator = resp.strip().split("\n")
644+
iterator = split_lines(resp)
644645

645646
if regex:
646647
return filter_regex(regex, iterator)

torchx/schedulers/local_scheduler.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1021,7 +1021,7 @@ def __iter__(self) -> "LogIterator":
10211021
self._check_finished() # check to see if app has finished running
10221022

10231023
if os.path.isfile(self._log_file):
1024-
self._log_fp = open(self._log_file, "r") # noqa: P201
1024+
self._log_fp = open(self._log_file, "rt", newline="\n") # noqa: P201
10251025
break
10261026

10271027
if self._app_finished:
@@ -1049,7 +1049,6 @@ def __next__(self) -> str:
10491049
time.sleep(0.1)
10501050
self._check_finished()
10511051
else:
1052-
line = line.rstrip("\n") # strip the trailing newline
10531052
if re.match(self._regex, line):
10541053
return line
10551054

torchx/schedulers/ray_scheduler.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
DescribeAppResponse,
2222
Scheduler,
2323
Stream,
24+
split_lines,
2425
)
2526
from torchx.schedulers.ids import make_unique
2627
from torchx.schedulers.ray.ray_common import RayActor
@@ -350,7 +351,7 @@ def log_iter(
350351
addr, app_id = app_id.split("-")
351352
client: JobSubmissionClient = JobSubmissionClient(f"http://{addr}")
352353
logs: str = client.get_job_logs(app_id)
353-
return logs.split("\n")
354+
return split_lines(logs)
354355

355356
def create_scheduler(session_name: str, **kwargs: Any) -> RayScheduler:
356357
if not has_ray(): # pragma: no cover

torchx/schedulers/test/api_test.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,12 @@
1111
from typing import Iterable, Mapping, Optional, Union
1212
from unittest.mock import MagicMock, patch
1313

14-
from torchx.schedulers.api import DescribeAppResponse, Scheduler, Stream
14+
from torchx.schedulers.api import (
15+
DescribeAppResponse,
16+
Scheduler,
17+
Stream,
18+
split_lines,
19+
)
1520
from torchx.specs.api import (
1621
NULL_RESOURCE,
1722
AppDef,
@@ -152,3 +157,9 @@ def test_close_twice(self) -> None:
152157
scheduler_mock.close()
153158
scheduler_mock.close()
154159
# nothing to validate explicitly, just that no errors are raised
160+
161+
def test_split_lines(self) -> None:
162+
self.assertEqual(split_lines(""), [])
163+
self.assertEqual(split_lines("\n"), ["\n"])
164+
self.assertEqual(split_lines("foo\nbar"), ["foo\n", "bar"])
165+
self.assertEqual(split_lines("foo\nbar\n"), ["foo\n", "bar\n"])

torchx/schedulers/test/aws_batch_scheduler_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -405,8 +405,8 @@ def test_log_iter(self) -> None:
405405
self.assertEqual(
406406
list(logs),
407407
[
408-
"foo",
409-
"foobar",
408+
"foo\n",
409+
"foobar\n",
410410
],
411411
)
412412

torchx/schedulers/test/docker_scheduler_test.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,8 @@ def test_docker_logs(self) -> None:
219219
self.assertEqual(
220220
logs,
221221
[
222-
"foo",
223-
"bar",
222+
"foo\n",
223+
"bar\n",
224224
],
225225
)
226226
logs = list(
@@ -234,7 +234,7 @@ def test_docker_logs(self) -> None:
234234
self.assertEqual(
235235
logs,
236236
[
237-
"bar",
237+
"bar\n",
238238
],
239239
)
240240

@@ -267,8 +267,8 @@ def test_docker_logs(self) -> None:
267267
self.assertEqual(
268268
logs,
269269
[
270-
"foo",
271-
"bar",
270+
"foo\n",
271+
"bar\n",
272272
],
273273
)
274274

@@ -286,8 +286,8 @@ def test_docker_logs_streams(self) -> None:
286286
self.assertEqual(
287287
logs,
288288
{
289-
"stdout",
290-
"stderr",
289+
"stdout\n",
290+
"stderr\n",
291291
},
292292
)
293293

@@ -299,8 +299,8 @@ def test_docker_logs_streams(self) -> None:
299299
self.assertEqual(
300300
logs,
301301
{
302-
"stdout",
303-
"stderr",
302+
"stdout\n",
303+
"stderr\n",
304304
},
305305
)
306306

@@ -312,7 +312,7 @@ def test_docker_logs_streams(self) -> None:
312312
self.assertEqual(
313313
logs,
314314
[
315-
"stderr",
315+
"stderr\n",
316316
],
317317
)
318318

@@ -324,7 +324,7 @@ def test_docker_logs_streams(self) -> None:
324324
self.assertEqual(
325325
logs,
326326
[
327-
"stdout",
327+
"stdout\n",
328328
],
329329
)
330330

torchx/schedulers/test/kubernetes_scheduler_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -576,8 +576,8 @@ def test_log_iter(self, read_namespaced_pod_log: MagicMock) -> None:
576576
self.assertEqual(
577577
list(lines),
578578
[
579-
"foo reg",
580-
"bar reg",
579+
"foo reg\n",
580+
"bar reg\n",
581581
],
582582
)
583583
call = read_namespaced_pod_log.call_args

torchx/schedulers/test/local_scheduler_test.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ def test_submit_inherit_parent_envs(self) -> None:
325325
app = AppDef(name="check_foo_env_var", roles=[role])
326326
app_id = self.scheduler.submit(app, {"log_dir": self.test_dir})
327327
for line in self.scheduler.log_iter(app_id, "echo_foo"):
328-
self.assertEqual("bar", line)
328+
self.assertEqual("bar\n", line)
329329

330330
desc = self.wait(app_id, self.scheduler)
331331
assert desc is not None
@@ -431,7 +431,7 @@ def test_submit_override_parent_env(self) -> None:
431431
app = AppDef(name="check_foo_env_var", roles=[role])
432432
app_id = self.scheduler.submit(app, {"log_dir": self.test_dir})
433433
for line in self.scheduler.log_iter(app_id, "echo_foo"):
434-
self.assertEqual("new_bar", line)
434+
self.assertEqual("new_bar\n", line)
435435

436436
desc = self.wait(app_id, self.scheduler)
437437
assert desc is not None
@@ -600,20 +600,20 @@ def test_log_iterator(self) -> None:
600600
app_id = self.scheduler.submit(app, cfg)
601601

602602
for i, line in enumerate(self.scheduler.log_iter(app_id, "role1", k=0)):
603-
self.assertEqual(str(i), line)
603+
self.assertEqual(str(i), line.strip())
604604

605605
# since and until ignored
606606
for i, line in enumerate(
607607
self.scheduler.log_iter(
608608
app_id, "role1", k=0, since=datetime.now(), until=datetime.now()
609609
)
610610
):
611-
self.assertEqual(str(i), line)
611+
self.assertEqual(str(i), line.strip())
612612

613613
for i, line in enumerate(
614614
self.scheduler.log_iter(app_id, "role1", k=0, regex=r"[02468]")
615615
):
616-
self.assertEqual(str(i * 2), line)
616+
self.assertEqual(str(i * 2), line.strip())
617617

618618
def test_log_iterator_no_log_dir(self) -> None:
619619
role = Role(

torchx/schedulers/test/slurm_scheduler_test.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ def test_log_iter(self, run: MagicMock) -> None:
373373
since=datetime.datetime.now(),
374374
)
375375
)
376-
self.assertEqual(logs, ["hello", "world"])
376+
self.assertEqual(logs, ["hello\n", "world\n"])
377377

378378
with open(os.path.join(job_dir, "slurm-54-echo-1.err"), "wt") as f:
379379
f.write("foo\nbar\n")
@@ -387,7 +387,7 @@ def test_log_iter(self, run: MagicMock) -> None:
387387
)
388388
)
389389

390-
self.assertEqual(logs, ["foo", "bar"])
390+
self.assertEqual(logs, ["foo\n", "bar\n"])
391391

392392
# no stream specified should default to STDERR
393393
logs = list(
@@ -397,7 +397,7 @@ def test_log_iter(self, run: MagicMock) -> None:
397397
1,
398398
)
399399
)
400-
self.assertEqual(logs, ["foo", "bar"])
400+
self.assertEqual(logs, ["foo\n", "bar\n"])
401401

402402
with self.assertRaises(ValueError):
403403
scheduler.log_iter("54", "echo", 1, streams=Stream.COMBINED)

0 commit comments

Comments
 (0)