Skip to content

Commit fd41e21

Browse files
committed
add main script and config file
1 parent c82aeac commit fd41e21

File tree

2 files changed

+366
-0
lines changed

2 files changed

+366
-0
lines changed

pjm.cfg

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
[Default]
2+
nodes=1
3+
ppn=1
4+
queue=cu
5+
stringency=high
6+
[PBS]
7+
QueueList=cu,fat,batch,assemble

pjm.py

Lines changed: 359 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,359 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
4+
# Author: zengxiaofei
5+
6+
# Created Time: 2017-06-03 17:34
7+
8+
"""A Simple Job Manager for PBS
9+
10+
Author: Xiaofei Zeng
11+
12+
Version: V0.9
13+
"""
14+
15+
from __future__ import print_function
16+
import sys
17+
import os
18+
import re
19+
import ConfigParser
20+
import argparse
21+
import time
22+
import collections
23+
import socket
24+
25+
26+
class Job(object):
27+
"""creat a Job object"""
28+
def __init__(self, job_name):
29+
self.name = job_name
30+
# default settings
31+
self.depend = set()
32+
self.nodes = default_nodes
33+
self.ppn = default_ppn
34+
self.queue = default_queue
35+
self.cmd = ''
36+
self.dir = workdir + job_name
37+
self.status = 'waiting'
38+
self.jobid = ''
39+
self.stringency = default_stringency
40+
self.realtime = 0
41+
self.cputime = 0
42+
43+
def generate_pbs(self, cfg_header):
44+
os.system('mkdir {0}'.format(self.dir))
45+
with open('{0}/{1}.pbs'.format(self.dir, self.name), 'w') as fpbs:
46+
fpbs.write('#PBS -N {0}\n'.format(self.name))
47+
fpbs.write('#PBS -l nodes={0}:ppn={1}\n'.format(self.nodes, self.ppn))
48+
fpbs.write('#PBS -q {0}\n'.format(self.queue))
49+
fpbs.write('#PBS -S /bin/bash\n')
50+
if self.stringency == 'high':
51+
fpbs.write('set -o errexit\n')
52+
elif self.stringency == 'low':
53+
pass
54+
fpbs.write('cd {0}\n'.format(self.dir))
55+
fpbs.write('echo Job started at `date` @`hostname`\n')
56+
fpbs.write(cfg_header)
57+
fpbs.write(self.cmd)
58+
fpbs.write('echo Job completed at `date`\n')
59+
60+
def qsub(self):
61+
jobid = os.popen('cd {0} && qsub {1}.pbs'.format(
62+
self.dir, self.name)).read().rstrip()
63+
self.jobid = jobid
64+
self.status = 'running'
65+
with open(log_file, 'a') as fqsub:
66+
fqsub.write('{0}: job {1} submitted ({2})\n'.format(
67+
time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())),
68+
self.name, self.jobid))
69+
70+
def status_monitor(self):
71+
# running
72+
if self.jobid:
73+
is_running = os.popen(
74+
'qstat | awk "NR>2 {print $1}" | grep "%s"' % self.jobid).read().rstrip()
75+
else:
76+
is_running = False
77+
78+
if is_running:
79+
self.status = 'running'
80+
# done with no errors
81+
elif self.status == 'running':
82+
done_no_errors = os.popen(
83+
'tail -1 {0}/{1}.o{2} 2>/dev/null | grep -P "^Job completed at "'.format(
84+
self.dir, self.name, self.jobid.split('.')[0])).read()
85+
if done_no_errors:
86+
self.status = 'done'
87+
# aborted with errors
88+
else:
89+
self.status = 'error'
90+
91+
92+
def analyse_dependency():
93+
#TODO analyse dependency for PBS, Python
94+
pass
95+
96+
97+
def get_arg(arg_name, args_string):
98+
match_arg = re.match(
99+
r'[\w\W]*'+arg_name+r' *= *([\w,.]+)',
100+
args_string)
101+
if match_arg:
102+
return match_arg.group(1)
103+
104+
105+
def job_parser(job_file):
106+
job_list = []
107+
config_start = False
108+
cmd = ''
109+
is_end = True
110+
cfg_header = ''
111+
with open(job_file, 'r') as f:
112+
for line in f:
113+
# config parser
114+
if line.lstrip().startswith('##'):
115+
config_start = not config_start
116+
if not config_start:
117+
cfg_header += line
118+
if config_start and line.strip():
119+
cfg_header += line
120+
# job parser
121+
match_line = re.match(r' *([\w.]+) *\(([\w\W]*)\){', line)
122+
# job start
123+
if match_line:
124+
is_end = False
125+
job_name = match_line.group(1)
126+
job = Job(job_name)
127+
job_list.append(job)
128+
args_string = match_line.group(2)
129+
depend = get_arg('depend', args_string)
130+
nodes = get_arg('nodes', args_string)
131+
ppn = get_arg('ppn', args_string)
132+
queue = get_arg('queue', args_string)
133+
dir = get_arg('dir', args_string)
134+
status = get_arg('status', args_string)
135+
stringency = get_arg('stringency', args_string)
136+
# TODO: need a robust func to validate args
137+
if depend:
138+
job.depend = set(depend.split(','))
139+
if nodes:
140+
job.nodes = nodes
141+
if ppn:
142+
job.ppn = ppn
143+
if queue:
144+
job.queue = queue
145+
if dir:
146+
job.dir = workdir + dir
147+
if status:
148+
if status in ('error', 'running'):
149+
os.system('rm -r {0}'.format(job.dir))
150+
job.status = 'waiting'
151+
else:
152+
job.status = status
153+
if stringency:
154+
job.stringency = stringency
155+
# job end
156+
elif re.match(r' *} *END', line) and not config_start:
157+
is_end = True
158+
# job commands
159+
elif not (is_end or config_start):
160+
job.cmd += line
161+
return (job_list, cfg_header)
162+
163+
164+
def wait_finish(job_list, cfg_header, sleeptime):
165+
"""if one job error, wait for other jobs to finish"""
166+
while True:
167+
run_job = 0
168+
for job in job_list:
169+
job.status_monitor()
170+
if job.status == 'running':
171+
run_job += 1
172+
if not run_job:
173+
break
174+
log(job_list, cfg_header)
175+
time.sleep(sleeptime)
176+
177+
178+
179+
def log(job_list, cfg_header):
180+
"""generate .status .log .rsc"""
181+
def calculate_time(time_str):
182+
ts = time_str.split(':')
183+
return int(ts[0])*60*60 + int(ts[1])*60 + int(ts[2])
184+
185+
def status_log(job_list, cfg_header):
186+
with open(status_file, 'w') as fstatus:
187+
fstatus.write(cfg_header)
188+
for job in job_list:
189+
depend_arg = 'depend={0};'.format(','.join(job.depend)) if job.depend else ''
190+
fstatus.write('%s(%s%s){\n%s}END\n\n' % (
191+
job.name,
192+
depend_arg,
193+
';'.join(('nodes='+job.nodes,
194+
'ppn='+job.ppn,
195+
'queue='+job.queue,
196+
'dir='+os.path.basename(job.dir),
197+
'stringency='+job.stringency,
198+
'status='+job.status)),
199+
job.cmd))
200+
201+
def event_log():
202+
logstr = ''
203+
for status in ('running', 'waiting', 'done', 'error'):
204+
if not status_dict[status]:
205+
pass
206+
elif logstr:
207+
logstr += '; {0} {1}'.format(','.join(status_dict[status]), status)
208+
else:
209+
logstr += '{0} {1}'.format(','.join(status_dict[status]), status)
210+
211+
last_line = ''
212+
if os.path.exists(log_file):
213+
with open(log_file) as f:
214+
for line in f:
215+
last_line = line
216+
else:
217+
# record some info at first
218+
with open(log_file, 'w') as fevent:
219+
fevent.write('PJM running @{0}\n'.format(socket.gethostname()))
220+
221+
with open(log_file, 'a') as fevent:
222+
if last_line and logstr != last_line.split(' ', 2)[2].strip():
223+
fevent.write('{0}: {1}\n'.format(
224+
time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())), logstr))
225+
status_log(job_list, cfg_header)
226+
227+
def resource_log():
228+
# TODO statistics of realtime and cputime, not compatible with SGE
229+
with open(resource_file, 'a') as frsc:
230+
frsc.write('################# {0} #################\n'.format(
231+
time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))))
232+
frsc.write('Job_Name\tCPU_Time(s)\tReal_Time(s)\tAvg_CPU\n')
233+
for job in job_list:
234+
if job.status == 'running':
235+
with os.popen('qstat -f {0}'.format(job.jobid)) as f:
236+
for line in f:
237+
match_cpu = re.match(r'.*resources_used.cput = ([\d:]+)\n', line)
238+
match_real = re.match(r'.*resources_used.walltime = ([\d:]+)\n', line)
239+
if match_cpu:
240+
cputime = match_cpu.group(1)
241+
job.cputime = calculate_time(cputime)
242+
if match_real:
243+
realtime = match_real.group(1)
244+
job.realtime = calculate_time(realtime)
245+
avg_cpu = 'NA' if job.realtime == 0 else float(job.cputime)/job.realtime
246+
frsc.write('{0}\t{1}\t{2}\t{3:.3f}\n'.format(
247+
job.name, job.cputime, job.realtime, avg_cpu))
248+
249+
status_dict = collections.defaultdict(list)
250+
for job in job_list:
251+
job.status_monitor()
252+
status_dict[job.status].append(job.name)
253+
254+
event_log()
255+
resource_log()
256+
257+
258+
def job_manager(job_list, cfg_header, sleeptime):
259+
done_set = set()
260+
try:
261+
while True:
262+
log(job_list, cfg_header)
263+
for job in job_list:
264+
job.status_monitor()
265+
# if no pre-jobs
266+
if job.status == 'waiting' and not job.depend:
267+
job.generate_pbs(cfg_header)
268+
job.qsub()
269+
job.status_monitor()
270+
# if job is done
271+
elif job.status == 'done':
272+
for otherjob in job_list:
273+
if job.name in otherjob.depend:
274+
otherjob.depend.remove(job.name)
275+
done_set.add(job)
276+
elif job.status == 'error':
277+
raise RuntimeError
278+
if done_set == set(job_list):
279+
log(job_list, cfg_header)
280+
break
281+
time.sleep(sleeptime)
282+
# if job is aborted with errors
283+
except:
284+
# wait for rest job finished
285+
wait_finish(job_list, cfg_header, sleeptime)
286+
log(job_list, cfg_header)
287+
288+
289+
def daemonize(stdin='/dev/null',stdout= '/dev/null', stderr= 'dev/null'):
290+
#Perform first fork.
291+
try:
292+
pid = os.fork()
293+
if pid > 0:
294+
sys.exit(0) #first parent out
295+
except OSError, e:
296+
sys.stderr.write("fork #1 failed: (%d) %s\n" %(e.errno, e.strerror))
297+
sys.exit(1)
298+
299+
os.chdir("/")
300+
os.umask(022)
301+
os.setsid()
302+
try:
303+
pid = os.fork()
304+
if pid > 0:
305+
sys.exit(0) #second parent out
306+
except OSError, e:
307+
sys.stderr.write("fork #2 failed: (%d) %s]n" %(e.errno,e.strerror))
308+
sys.exit(1)
309+
310+
for f in sys.stdout, sys.stderr:
311+
f.flush()
312+
si = open(stdin, 'r')
313+
so = open(stdout,'a+')
314+
se = open(stderr,'a+', 0)
315+
os.dup2(si.fileno(), sys.stdin.fileno())
316+
os.dup2(so.fileno(), sys.stdout.fileno())
317+
os.dup2(se.fileno(), sys.stderr.fileno())
318+
319+
320+
def main():
321+
# parse job file
322+
job_list, cfg_header = job_parser(job_file)
323+
job_manager(job_list, cfg_header, sleeptime)
324+
325+
326+
if __name__ == '__main__':
327+
## configparse
328+
config = ConfigParser.ConfigParser()
329+
config.read(os.path.split(os.path.realpath(__file__))[0]+'/pjm.cfg')
330+
default_nodes = config.get('Default', 'nodes')
331+
default_ppn = config.get('Default', 'ppn')
332+
default_queue = config.get('Default', 'queue')
333+
default_stringency = config.get('Default', 'stringency')
334+
queue_list = config.get('PBS', 'QueueList').split(',')
335+
336+
## argparse
337+
parser = argparse.ArgumentParser(
338+
formatter_class = argparse.RawDescriptionHelpFormatter,
339+
description = """** A Simple Job Manager for PBS (PJM) Version 0.9 **
340+
. Author: Xiaofei Zeng
341+
342+
parser.add_argument('job_file', help='input job file')
343+
parser.add_argument('--sleep', default=60, type=int,
344+
help='monitor interval time (seconds) [default: %(default)i]')
345+
346+
analyse_dependency()
347+
348+
args = parser.parse_args()
349+
sleeptime = args.sleep
350+
job_file = os.path.abspath(args.job_file)
351+
job_filename = os.path.basename(args.job_file)
352+
workdir = os.getcwd() + '/'
353+
log_file = workdir + job_filename + '.log'
354+
status_file = workdir + job_filename + '.status'
355+
resource_file = workdir + job_filename + '.rsc'
356+
daemon_out = workdir + job_filename + '.out'
357+
358+
daemonize('/dev/null', daemon_out, daemon_out)
359+
main()

0 commit comments

Comments
 (0)