Skip to content

Commit ee16955

Browse files
committed
Implement online check for nodes
1 parent 1190b4c commit ee16955

File tree

1 file changed

+48
-4
lines changed

1 file changed

+48
-4
lines changed

darc/processor_master.py

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#!usr/bin/env python3
22

33
import os
4+
import logging
45
import socket
56
import threading
67
from textwrap import dedent
@@ -14,6 +15,7 @@
1415

1516
from darc import DARCBase
1617
from darc import util
18+
from darc.control import send_command
1719
from darc.definitions import WORKERS, TSAMP
1820

1921

@@ -33,6 +35,9 @@ def __init__(self, *args, **kwargs):
3335

3436
self.scavenger = None
3537

38+
# reduce logging from status check commands
39+
logging.getLogger('darc.control').setLevel(logging.ERROR)
40+
3641
def run(self):
3742
"""
3843
Main loop. Create thread scavenger, then run parent class run method
@@ -166,6 +171,7 @@ def __init__(self, *args, **kwargs):
166171
self.warnings_sent = []
167172
self.status = None
168173
self.process = None
174+
self.central_result_dir = None
169175

170176
def start_observation(self, obs_config, reload=True):
171177
"""
@@ -282,19 +288,57 @@ def _wait_for_workers(self):
282288

283289
def _check_node_online(self, node):
284290
"""
285-
Check if the processor on a node is still online
291+
Check if the processor on a node is still online and processing the current observation
286292
287293
:param str node: Hostname of node to check
288294
:return: status (bool): True if node is online, else False
289295
"""
290-
self.logger.warning("Node status check not yet implemented, returning True")
291-
return True
296+
# check if the processor on the node is online
297+
try:
298+
reply = send_command(self.node_timeout, 'processor', 'status', host=node)
299+
if reply is None:
300+
self.logger.debug(f"No reply received from {node}, assuming it is offline")
301+
return False
302+
status = reply['message']['processor']
303+
except Exception as e:
304+
self.logger.error(f"Failed to get {node} status: {type(e)}: {e}")
305+
status = ''
306+
if status != 'running':
307+
# processor is not running
308+
self.logger.debug(f"{node} processor is not running")
309+
return False
310+
311+
# get list of running observations from node
312+
self.logger.debug(f"{node} is online, checking for observations")
313+
try:
314+
output = send_command(self.node_timeout, 'processor', 'get_attr observations')['message']['processor']
315+
# parse the observation list
316+
# the list contains reference to processes, which should be put in quotes first
317+
output = ast.literal_eval(output.replace('<', '\'<').replace('>', '>\''))
318+
taskids = output['ProcessorManager.observations'].keys()
319+
except Exception as e:
320+
self.logger.error(f"Failed to get observation list from {node}: {type(e)}: {e}")
321+
return False
322+
self.logger.debug(f"{node} taskids: {taskids}")
323+
324+
# check if the node is still processing the current taskid
325+
try:
326+
taskid = self.obs_config['parset']['task.taskID']
327+
except (KeyError, TypeError):
328+
# KeyError if parset or task.taskID are missing, TypeError if obs_config is None
329+
self.logger.error(f"Failed to get task ID of current master observation, assuming {node} is online")
330+
return True
331+
332+
if taskid in taskids:
333+
return True
334+
else:
335+
return False
292336

293337
def _send_warning(self, node):
294338
"""
295339
Send a warning email about a node
296340
"""
297-
self.logger.warning("Warning email not yet implemented")
341+
self.logger.warning(f"Received request to warn about {node}. Warning email not yet implemented")
298342

299343
def _process_results(self, info, coordinates):
300344
"""

0 commit comments

Comments
 (0)