Skip to content

Commit 41e39e3

Browse files
committed
we have developed a new way to obtain the list of processes
1 parent d8d70ad commit 41e39e3

File tree

1 file changed

+69
-102
lines changed

1 file changed

+69
-102
lines changed

app/resources/wfs/mysnake.py

+69-102
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,9 @@
1919
import multiprocessing as mp
2020
import yaml
2121
import subprocess
22-
import re
2322
import time
2423
import shlex
25-
import pandas as pd
24+
import re
2625

2726

2827
####################
@@ -56,7 +55,7 @@ def remove_outfiles(cfg):
5655
[ os.remove(o) for out in outfiles for o in out if os.path.isfile(o) ]
5756

5857
# define executor function
59-
def executor(proc):
58+
def executor(cmd):
6059

6160
def _extract_files(files):
6261
out = []
@@ -143,53 +142,57 @@ def _exec(cline, lfile, cmd_name, cmd_force, rule_name, rule_pos):
143142
if eval(rule_pos) == 1.0:
144143
print(f"MYSNAKE_LOG_END_CMD_EXEC\t{end_time}\t{cmd_name}\t{cmd_force}\t{rule['name']}\t{rule_pos}\t{state}", flush=True)
145144
return output
145+
146+
# for each rule
147+
outputs = []
148+
for i in range(len(cmd['rules'])):
149+
# input parameter
150+
rule = cmd['rules'][i]
151+
cmd_name, cmd_force, rule_pos = cmd['name'],cmd['force'],f"{i+1}/{len(cmd['rules'])}"
146152

147-
# input parameter
148-
cmd_name, cmd_force, rule_pos, rule = proc['name'],proc['force'],proc['rpos'],proc['rule']
149-
150-
# declare output
151-
output = {
152-
'name': rule['name'],
153-
'state': 'waiting',
154-
'start_time': '-',
155-
'end_time': '-',
156-
'info': '-',
157-
'log_file': 'rule[logfile]' }
158-
# get the list of files: inputs, outputs and logs
159-
ifiles = _extract_files(rule['infiles'])
160-
ofiles = _extract_files(rule['outfiles'])
161-
# create directories recursevely
162-
[os.makedirs(os.path.dirname(f), exist_ok=True) for f in ifiles]
163-
[os.makedirs(os.path.dirname(f), exist_ok=True) for f in ofiles]
164-
165-
# this processes would be able to execute
166-
# wait until the input files are ready to read/write
167-
while not _all_ready(ifiles):
168-
try:
169-
time.sleep(1)
170-
except:
171-
raise Exception("Caught KeyboardInterrupt, terminating workers")
172-
173-
# It is the moment of the execution
174-
if cmd_force == 1 and _all_ready(ifiles):
175-
output = _exec(rule['cline'], rule['logfile'], cmd_name, cmd_force, rule['name'], rule_pos)
176-
elif (cmd_force == 0 or cmd_force == '') and not _all_ready(ofiles):
177-
output = _exec(rule['cline'], rule['logfile'], cmd_name, cmd_force, rule['name'], rule_pos)
178-
elif (cmd_force == 0 or cmd_force == '') and _all_ready(ifiles) and _all_ready(ofiles):
179-
state = 'cached'
180-
end_time = time.asctime()
181-
output['cmd_force'] = cmd_force
182-
output['end_time'] = end_time
183-
print(f"MYSNAKE_LOG_END_CMD_EXEC\t{end_time}\t{cmd_name}\t{cmd_force}\t{rule['name']}\t{rule_pos}\t{state}", flush=True)
184-
else:
185-
state = 'already_exec'
186-
end_time = time.asctime()
187-
output['cmd_force'] = cmd_force
188-
output['end_time'] = end_time
189-
print(f"MYSNAKE_LOG_END_CMD_EXEC\t{end_time}\t{cmd_name}\t{cmd_force}\t{rule['name']}\t{rule_pos}\t{state}", flush=True)
190-
191-
192-
return output
153+
# declare output
154+
output = {
155+
'name': rule['name'],
156+
'state': 'waiting',
157+
'start_time': '-',
158+
'end_time': '-',
159+
'info': '-',
160+
'log_file': 'rule[logfile]' }
161+
# get the list of files: inputs, outputs and logs
162+
ifiles = _extract_files(rule['infiles'])
163+
ofiles = _extract_files(rule['outfiles'])
164+
# create directories recursevely
165+
[os.makedirs(os.path.dirname(f), exist_ok=True) for f in ifiles]
166+
[os.makedirs(os.path.dirname(f), exist_ok=True) for f in ofiles]
167+
168+
# this processes would be able to execute
169+
# wait until the input files are ready to read/write
170+
while not _all_ready(ifiles):
171+
try:
172+
time.sleep(1)
173+
except:
174+
raise Exception("Caught KeyboardInterrupt, terminating workers")
175+
176+
# It is the moment of the execution
177+
if cmd_force == 1 and _all_ready(ifiles):
178+
output = _exec(rule['cline'], rule['logfile'], cmd_name, cmd_force, rule['name'], rule_pos)
179+
elif (cmd_force == 0 or cmd_force == '') and not _all_ready(ofiles):
180+
output = _exec(rule['cline'], rule['logfile'], cmd_name, cmd_force, rule['name'], rule_pos)
181+
elif (cmd_force == 0 or cmd_force == '') and _all_ready(ifiles) and _all_ready(ofiles):
182+
state = 'cached'
183+
end_time = time.asctime()
184+
output['cmd_force'] = cmd_force
185+
output['end_time'] = end_time
186+
print(f"MYSNAKE_LOG_END_CMD_EXEC\t{end_time}\t{cmd_name}\t{cmd_force}\t{rule['name']}\t{rule_pos}\t{state}", flush=True)
187+
else:
188+
state = 'already_exec'
189+
end_time = time.asctime()
190+
output['cmd_force'] = cmd_force
191+
output['end_time'] = end_time
192+
print(f"MYSNAKE_LOG_END_CMD_EXEC\t{end_time}\t{cmd_name}\t{cmd_force}\t{rule['name']}\t{rule_pos}\t{state}", flush=True)
193+
outputs.append(output)
194+
195+
return outputs
193196

194197

195198

@@ -225,76 +228,39 @@ def main(args):
225228

226229
# ------
227230
print(f"MYSNAKE_LOG_PREPARING\t{time.asctime()}", flush=True)
228-
229-
logging.debug("extract the order of processes")
230-
# create a matrix (ist of list then pandas) with:
231-
# rows are the columns/columns their rules
232-
# with the list of tuple: ( commands (names), their rules)
233-
m = [ [ (cmd['name'],cmd['rules'][i]['name']) for i in range(len(cmd['rules'])) ] for cmds in cfg['commands'] for cmd in cmds ]
234-
df = pd.DataFrame(m)
235-
# get the list of list extracting the elements by column
236-
cr = [ df[c].to_list() for c in df.columns ]
237-
cr = [ (re.sub('\_\d*$','',t[0]), t[1]) for c in cr for t in c if t ] # flat the list and remove prefix for the name of comands
238-
# get dict with the commands_names and list of their own rules
239-
dict_1=dict()
240-
for c,r in cr:
241-
dict_1.setdefault(c, []).append(r)
242-
# extract the order of processes
243-
proc_ord = [ dict_1[c] for c in COMMAND_ORDER if c in dict_1 ]
244-
proc_ord = [i for s in proc_ord for i in s]
245231

246-
247-
logging.debug("extract the rules for each command")
232+
logging.debug("extract the commands in order")
248233
try:
249234
cfg_rules = [ {
250235
'name': cmd['name'],
251236
'force': cmd['force'],
252-
'rpos': f"{i+1}/{len(cmd['rules'])}",
253-
'rule': cmd['rules'][i]
254-
} for cmds in cfg['commands'] for cmd in cmds for i in range(len(cmd['rules'])) ]
237+
'rules': cmd['rules']
238+
} for cmds in cfg['commands'] for cmd in cmds ]
255239
if not cfg_rules:
256240
raise Exception('the rule list of confing is empty')
257241
except yaml.YAMLError as exc:
258242
sys.exit("ERROR!! Extracting the rules for each command: {}".format(exc))
243+
# get the total of commands
244+
total_cmds = len(cfg_rules)
245+
# get dict with the commands_names and list of their own rules
246+
dict_1=dict()
247+
for c in cfg_rules:
248+
n = re.sub('\_\d*$','',c['name'])
249+
dict_1.setdefault(n, []).append(c)
250+
# extract the order of processes
251+
cfg_rules = [ dict_1[c] for c in COMMAND_ORDER if c in dict_1 ]
252+
cfg_rules = [i for s in cfg_rules for i in s]
259253

260-
logging.debug("get the command/rules that will be executed and will not")
261-
try:
262-
# create a dict with the rule name as key and with the report as value
263-
rules_exec = {}
264-
rules_notexec = []
265-
for c in cfg_rules:
266-
k = c['rule']['name']
267-
if k in proc_ord:
268-
rules_exec[k] = c
269-
else:
270-
rules_notexec.append(c)
271-
except yaml.YAMLError as exc:
272-
sys.exit("ERROR!! Getting the executed and not executed commands/rules: {}".format(exc))
273-
274-
logging.debug("reorder the config rules based on the list of outputs of workflow managament system")
275-
try:
276-
# create a list of dict with the processes reported by the output of snakemake (the list of processes for the execution)
277-
rules_exec_ord = [ rules_exec[o] for o in proc_ord ]
278-
except yaml.YAMLError as exc:
279-
sys.exit("ERROR!! Reordering the rules: {}".format(exc))
280-
281-
logging.debug("add the cached processes into the list of processes will be executed")
282-
try:
283-
rules_ord = rules_notexec + rules_exec_ord
284-
if rules_ord:
285-
total_cmds = len(list(set( [ c['name'] for c in rules_ord ] )))
286-
except yaml.YAMLError as exc:
287-
sys.exit("ERROR!! Reordering the rules: {}".format(exc))
288254

289255

290256
# ------
291257
print(f"MYSNAKE_LOG_EXECUTING\t{time.asctime()}\t{total_cmds}", flush=True)
292-
258+
293259
logging.debug(f"start the execution of {total_cmds} commands using {NCPUS} processes in parallel")
294260
pool = mp.Pool(processes=NCPUS)
295261
results = []
296-
for rule in rules_ord:
297-
results.append(pool.apply_async(executor, (rule, )))
262+
for cmd in cfg_rules:
263+
results.append(pool.apply_async(executor, (cmd, )))
298264

299265
logging.debug("close the pool")
300266
pool.close()
@@ -303,6 +269,7 @@ def main(args):
303269

304270
# ------
305271
print(f"MYSNAKE_STATS_EXECUTING\t{time.asctime()}", flush=True)
272+
306273
logging.debug("execute the statistic processes")
307274
try:
308275
# command line

0 commit comments

Comments
 (0)