@@ -268,9 +268,13 @@ def _wait_for_workers(self):
268
268
"""
269
269
Wait for all worker nodes to finish processing this observation
270
270
"""
271
- self .logger .info (f"Waiting for workers to finish processing { self .obs_config ['datetimesource' ]} " )
271
+ obs = self .obs_config ['datetimesource' ]
272
+ self .logger .info (f"Waiting for workers to finish processing { obs } " )
272
273
twait = 0
274
+
273
275
for beam in self .obs_config ['beams' ]:
276
+ # Log which beam we are waiting for
277
+ self .logger .info (f"{ obs } waiting for results from CB{ beam :02d} " )
274
278
result_file = os .path .join (self .central_result_dir , f'CB{ beam :02d} _summary.yaml' )
275
279
# wait until the result file is present
276
280
while not os .path .isfile (result_file ):
@@ -279,9 +283,8 @@ def _wait_for_workers(self):
279
283
twait += self .check_interval
280
284
# if we waited a long time, check if a warning should be sent if the node is offline
281
285
node = WORKERS [beam ]
282
- if (twait > self .max_wait_time ) and (node not in self .warnings_sent ) and \
283
- (not self ._check_node_online (node )):
284
- # node is not in warnings and offline, send a warning
286
+ if (twait > self .max_wait_time ) and (not self ._check_node_online (node )) and \
287
+ (node not in self .warnings_sent ):
285
288
self ._send_warning (node )
286
289
# store that we sent a warning
287
290
self .warnings_sent .append (node )
@@ -337,8 +340,64 @@ def _check_node_online(self, node):
337
340
def _send_warning (self , node ):
338
341
"""
339
342
Send a warning email about a node
343
+
344
+ :param str node: Node to send warning about
340
345
"""
341
- self .logger .warning (f"Received request to warn about { node } . Warning email not yet implemented" )
346
+ # get observation info from obs config
347
+ try :
348
+ date = self .obs_config ['date' ]
349
+ datetimesource = self .obs_config ['datetimesource' ]
350
+ taskid = self .obs_config ['parset' ]['task.taskID' ]
351
+ except (KeyError , TypeError ):
352
+ # KeyError if parset or task.taskID are missing, TypeError if obs_config is None
353
+ self .logger .error (f"Failed to get parameters of current master observation, not sending warning email for "
354
+ f"{ node } " )
355
+ return
356
+
357
+ # generate email
358
+ beam = int (node [- 2 :]) - 1
359
+ content = dedent (f"""
360
+ <html>
361
+ <title>DARC Warning</title>
362
+ <body>
363
+ <p>
364
+ <h3>Warning: DARC may be offline on { node } </h3><br />
365
+ DARC on { node } is either offline or no longer processing this observation:<br />
366
+ Task ID = { taskid } <br />
367
+ Name = { datetimesource } <br />
368
+ </p>
369
+ <p>
370
+ Please check:
371
+ <ul>
372
+ <li>Is DARC still online on { node } ? See http://arts041.apertif/darc/status
373
+ <li>Is DARC still processing on { node } ?
374
+ <ul>
375
+ <li>Check the log file: <code>tail -n 50 /home/arts/darc/log/processor.{ node } .log</code>
376
+ <li>Check if there are files in <code>/data2/output/{ date } /{ datetimesource } /triggers</code>
377
+ </ul>
378
+ </ul>
379
+ </p>
380
+ <p>
381
+ If DARC is offline, do the following:
382
+ <ul>
383
+ <li>Restart DARC on { node } : <code>ssh arts@{ node } . darc/venv/bin/activate; darc_start_all_services</code>
384
+ <li>Create an empty output file for this observation: <code>touch /home/arts/darc/results/{ date } /{ datetimesource } /CB{ beam :02d} .yaml</code>"
385
+ </p>
386
+ </body>
387
+ </html>
388
+ """ )
389
+
390
+ # set email subject with trigger time
391
+ subject = f"DARC Warning: { node } "
392
+ # get FQDN in way that actually adds the domain
393
+ # simply socket.getfqdn does not actually do that on ARTS
394
+ fqdn = socket .getaddrinfo (socket .gethostname (), None , 0 , socket .SOCK_DGRAM , 0 , socket .AI_CANONNAME )[0 ][3 ]
395
+ frm = f"DARC Warning System <{ os .getlogin ()} @{ fqdn } >"
396
+ to = self .email_settings ['to' ]
397
+ body = {'type' : 'html' , 'content' : content }
398
+ # send
399
+ self .logger .info (f"Sending { node } warning email" )
400
+ util .send_email (frm , to , subject , body )
342
401
343
402
def _process_results (self , info , coordinates ):
344
403
"""
@@ -361,6 +420,17 @@ def _process_results(self, info, coordinates):
361
420
# load the summary file
362
421
with open (os .path .join (self .central_result_dir , f'CB{ beam :02d} _summary.yaml' )) as f :
363
422
info_beam = yaml .load (f , Loader = yaml .SafeLoader )
423
+
424
+ if info_beam is None :
425
+ self .logger .warning (f"Empty result file for CB{ beam :02d} " )
426
+ # add to email with question marks
427
+ beaminfo += "<tr><td>{beam:02d}</td>" \
428
+ "<td>?</td>" \
429
+ "<td>?</td>" \
430
+ "<td>?</td>" \
431
+ "<td>?</td></tr>" .format (beam = beam )
432
+ continue
433
+
364
434
beaminfo += "<tr><td>{beam:02d}</td>" \
365
435
"<td>{ncand_raw}</td>" \
366
436
"<td>{ncand_post_clustering}</td>" \
0 commit comments