Skip to content

Commit 41d9759

Browse files
committed
small adjustments to logging messages
1 parent cd260f5 commit 41d9759

File tree

3 files changed

+24
-28
lines changed

3 files changed

+24
-28
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ ignore = [
5454
"S607", # bandit: subprocess
5555
"COM812",
5656
"ISC001", # conflict with formatter
57+
"G004" # f-string in logging
5758
]
5859
[tool.ruff.lint.per-file-ignores]
5960
"tests/**/*.py" = [

src/torchrunx/agent.py

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ def main(
4747
logger_port: Port for the logging server.
4848
hostname: Hostname of this agent.
4949
"""
50-
# Stream logs to logging server
50+
# Setup logging & stream logs to server
51+
5152
logger = logging.getLogger(f"{__package__}.{hostname}")
5253

5354
log_records_to_socket(
@@ -56,10 +57,6 @@ def main(
5657

5758
redirect_stdio_to_logger(logger)
5859

59-
logger.debug("Agent logging setup.")
60-
61-
# Set up launcher-agent group
62-
6360
logger.debug("Initializing launcher-agent group.")
6461

6562
launcher_agent_group = LauncherAgentGroup(
@@ -71,9 +68,7 @@ def main(
7168

7269
agent_rank = launcher_agent_group.rank - 1
7370

74-
# Communicate initial payloads between launcher/agents
75-
76-
logger.debug("Sending agent details to launcher.")
71+
logger.debug("Synchronizing launcher and agents.")
7772

7873
payload = AgentPayload(
7974
hostname=socket.getfqdn(),
@@ -88,9 +83,7 @@ def main(
8883
worker_global_ranks = launcher_payload.worker_global_ranks[agent_rank]
8984
num_workers = len(worker_global_ranks)
9085

91-
# Spawn worker processes
92-
93-
logger.debug("Launching worker processes.")
86+
logger.info(f"Starting {num_workers} worker processes.")
9487

9588
ctx = dist_mp.start_processes(
9689
name=f"{hostname}_",
@@ -128,6 +121,8 @@ def main(
128121
# Monitor and communicate agent statuses
129122
# Terminate gracefully upon failure
130123

124+
logger.debug("Entering worker monitoring and agent communication loop.")
125+
131126
try:
132127
status = None
133128
while True:
@@ -141,12 +136,12 @@ def main(
141136
all_done = all(s.state == "done" for s in agent_statuses)
142137
any_failed = any(s.state == "failed" for s in agent_statuses)
143138
if all_done or any_failed:
144-
logger.debug("Workers exiting %s.", "cleanly" if not any_failed else "with errors")
139+
logger.info(f"Workers exited {'with' if any_failed else 'without'} errors.")
145140
break
146141
finally:
147142
ctx.close()
148143
sys.stdout.flush()
149144
sys.stderr.flush()
150145
launcher_agent_group.shutdown()
151146

152-
logger.debug("Agent exiting.")
147+
logger.debug("Terminating agent process.")

src/torchrunx/launcher.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,9 @@ def run( # noqa: C901, PLR0912, PLR0915
108108
msg = "The torch.distributed package is not available."
109109
raise RuntimeError(msg)
110110

111+
logger.debug("Preparing launch environment.")
112+
111113
###
112-
logger.debug("Resolving environment.")
113114

114115
hostnames, workers_per_host = resolve_environment(
115116
self.hostnames, self.workers_per_host, ssh_config_file=self.ssh_config_file
@@ -183,11 +184,11 @@ def handler_factory() -> list[logging.Handler]:
183184

184185
log_process.start()
185186

186-
logger.debug("Launching agents.")
187-
188187
# Start agents on each node
189188

190189
for i, hostname in enumerate(hostnames):
190+
logger.info(f'Launching "{func.__name__}" on {hostname}.')
191+
191192
execute_command(
192193
command=build_launch_command(
193194
launcher_hostname=launcher_hostname,
@@ -215,16 +216,15 @@ def handler_factory() -> list[logging.Handler]:
215216
rank=0,
216217
)
217218

218-
logger.debug("Receiving agent details.")
219-
220219
# Sync initial payloads between launcher and agents
221220

221+
logger.debug("Synchronizing launcher and agents.")
222222
launcher_payload, agent_payloads = launcher_agent_group.sync_payloads(payload=payload)
223223

224-
logger.debug("Entering agent monitoring loop.")
225-
226224
# Monitor agent statuses (until failed or done)
227225

226+
logger.debug("Entering agent monitoring loop.")
227+
228228
while True:
229229
# could raise AgentFailedError
230230
agent_statuses = launcher_agent_group.sync_agent_statuses(status=None)
@@ -238,17 +238,10 @@ def handler_factory() -> list[logging.Handler]:
238238
raise v
239239

240240
if all(s.state == "done" for s in agent_statuses):
241-
logger.debug("All workers exited cleanly.")
241+
logger.info("All workers completed successfully.")
242242
return_values: list[list[FunctionR]] = [s.return_values for s in agent_statuses] # pyright: ignore [reportAssignmentType]
243243
return LaunchResult.from_returns(hostnames, return_values)
244244
finally:
245-
logger.debug("Stopping logging server.")
246-
247-
if stop_logging_event is not None:
248-
stop_logging_event.set()
249-
if log_process is not None:
250-
log_process.kill()
251-
252245
# cleanup: SIGTERM all agents
253246
if agent_payloads is not None:
254247
for agent_payload, agent_hostname in zip(agent_payloads, hostnames):
@@ -264,6 +257,13 @@ def handler_factory() -> list[logging.Handler]:
264257
logger.debug("Killing launcher-agent group.")
265258
launcher_agent_group.shutdown()
266259

260+
logger.debug("Stopping logging server.")
261+
262+
if stop_logging_event is not None:
263+
stop_logging_event.set()
264+
if log_process is not None:
265+
log_process.kill()
266+
267267

268268
@dataclass
269269
class LaunchResult(Generic[FunctionR]):

0 commit comments

Comments
 (0)