Skip to content

Commit 0acbe94

Browse files
authored
Merge pull request #19 from common-workflow-language/arv-support
Arv support
2 parents 1382004 + 6515d77 commit 0acbe94

File tree

3 files changed

+124
-22
lines changed

3 files changed

+124
-22
lines changed

Dockerfile

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,14 @@ RUN mkdir -p /etc/apt/sources.list.d && \
2323
apt-get -yq --no-install-recommends install docker-engine=17.05.0~ce-0~debian-stretch && \
2424
apt-get clean
2525

26-
ARG version
2726
ARG arvversion
28-
29-
COPY dist/wes-service-${version}.tar.gz /root
3027
COPY dist/arvados-cwl-runner-${arvversion}.tar.gz /root
31-
3228
RUN cd /root && tar xzf arvados-cwl-runner-${arvversion}.tar.gz && \
3329
cd arvados-cwl-runner-${arvversion} && \
3430
pip install .
3531

32+
ARG version
33+
COPY dist/wes-service-${version}.tar.gz /root
3634
RUN cd /root && tar xzf wes-service-${version}.tar.gz && \
3735
cd wes-service-${version} && \
3836
pip install .[arvados]

wes_client/__init__.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,28 +99,40 @@ def fixpaths(d):
9999
else:
100100
d["location"] = d["path"]
101101
del d["path"]
102+
loc = d.get("location", "")
102103
if d.get("class") == "Directory":
103-
loc = d.get("location", "")
104104
if loc.startswith("http:") or loc.startswith("https:"):
105105
logging.error("Directory inputs not supported with http references")
106106
exit(33)
107+
if not (loc.startswith("http:") or loc.startswith("https:")
108+
or args.job_order.startswith("http:") or args.job_order.startswith("https:")):
109+
logging.error("Upload local files not supported, must use http: or https: references.")
110+
exit(33)
107111

108112
visit(input, fixpaths)
109113

110114
workflow_url = args.workflow_url
111115
if not workflow_url.startswith("/") and ":" not in workflow_url:
112-
workflow_url = os.path.abspath(workflow_url)
116+
workflow_url = "file://" + os.path.abspath(workflow_url)
113117

114118
if args.quiet:
115119
logging.basicConfig(level=logging.WARNING)
116120
else:
117121
logging.basicConfig(level=logging.INFO)
118122

119-
r = client.WorkflowExecutionService.RunWorkflow(body={
120-
"workflow_url": workflow_url,
123+
body = {
121124
"workflow_params": input,
122125
"workflow_type": "CWL",
123-
"workflow_type_version": "v1.0"}).result()
126+
"workflow_type_version": "v1.0"
127+
}
128+
129+
if workflow_url.startswith("file://"):
130+
with open(workflow_url[7:], "r") as f:
131+
body["workflow_descriptor"] = f.read()
132+
else:
133+
body["workflow_url"] = workflow_url
134+
135+
r = client.WorkflowExecutionService.RunWorkflow(body=body).result()
124136

125137
if args.wait:
126138
logging.info("Workflow id is %s", r["workflow_id"])
@@ -139,7 +151,7 @@ def fixpaths(d):
139151

140152
s = client.WorkflowExecutionService.GetWorkflowLog(
141153
workflow_id=r["workflow_id"]).result()
142-
logging.info(s["workflow_log"]["stderr"])
154+
logging.info("Workflow log:\n"+s["workflow_log"]["stderr"])
143155

144156
if "fields" in s["outputs"] and s["outputs"]["fields"] is None:
145157
del s["outputs"]["fields"]

wes_service/arvados_wes.py

Lines changed: 104 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,24 @@
11
import arvados
22
import arvados.util
33
import arvados.collection
4+
import arvados.errors
45
import os
56
import connexion
67
import json
78
import subprocess
89
import tempfile
10+
import functools
11+
import threading
12+
import logging
13+
914
from wes_service.util import visit, WESBackend
1015

16+
class MissingAuthorization(Exception):
17+
pass
1118

1219
def get_api():
20+
if not connexion.request.headers.get('Authorization'):
21+
raise MissingAuthorization()
1322
return arvados.api_from_config(version="v1", apiconfig={
1423
"ARVADOS_API_HOST": os.environ["ARVADOS_API_HOST"],
1524
"ARVADOS_API_TOKEN": connexion.request.headers['Authorization'],
@@ -26,6 +35,23 @@ def get_api():
2635
}
2736

2837

38+
def catch_exceptions(orig_func):
39+
"""Catch uncaught exceptions and turn them into http errors"""
40+
41+
@functools.wraps(orig_func)
42+
def catch_exceptions_wrapper(self, *args, **kwargs):
43+
try:
44+
return orig_func(self, *args, **kwargs)
45+
except arvados.errors.ApiError as e:
46+
logging.exception("Failure")
47+
return {"msg": e._get_reason(), "status_code": e.resp.status}, int(e.resp.status)
48+
except subprocess.CalledProcessError as e:
49+
return {"msg": str(e), "status_code": 500}, 500
50+
except MissingAuthorization:
51+
return {"msg": "'Authorization' header is missing or empty, expecting Arvados API token", "status_code": 401}, 401
52+
53+
return catch_exceptions_wrapper
54+
2955
class ArvadosBackend(WESBackend):
3056
def GetServiceInfo(self):
3157
return {
@@ -39,6 +65,7 @@ def GetServiceInfo(self):
3965
"key_values": {}
4066
}
4167

68+
@catch_exceptions
4269
def ListWorkflows(self):
4370
api = get_api()
4471

@@ -60,28 +87,87 @@ def ListWorkflows(self):
6087
"next_page_token": ""
6188
}
6289

90+
def invoke_cwl_runner(self, cr_uuid, workflow_url, workflow_params, env, workflow_descriptor_file):
91+
api = arvados.api_from_config(version="v1", apiconfig={
92+
"ARVADOS_API_HOST": env["ARVADOS_API_HOST"],
93+
"ARVADOS_API_TOKEN": env['ARVADOS_API_TOKEN'],
94+
"ARVADOS_API_HOST_INSECURE": env["ARVADOS_API_HOST_INSECURE"] # NOQA
95+
})
96+
97+
try:
98+
with tempfile.NamedTemporaryFile() as inputtemp:
99+
json.dump(workflow_params, inputtemp)
100+
inputtemp.flush()
101+
# TODO: run submission process in a container to prevent
102+
# a-c-r submission processes from seeing each other.
103+
proc = subprocess.Popen(["arvados-cwl-runner", "--submit-request-uuid="+cr_uuid, # NOQA
104+
"--submit", "--no-wait", "--api=containers", # NOQA
105+
workflow_url, inputtemp.name], env=env,
106+
stdout=subprocess.PIPE, stderr=subprocess.PIPE) # NOQA
107+
(stdoutdata, stderrdata) = proc.communicate()
108+
if proc.returncode != 0:
109+
api.container_requests().update(uuid=cr_uuid, body={"priority": 0,
110+
"properties": {"arvados-cwl-runner-log": stderrdata}}).execute()
111+
else:
112+
api.container_requests().update(uuid=cr_uuid, body={"properties": {"arvados-cwl-runner-log": stderrdata}}).execute()
113+
except subprocess.CalledProcessError as e:
114+
api.container_requests().update(uuid=cr_uuid, body={"priority": 0,
115+
"properties": {"arvados-cwl-runner-log": str(e)}}).execute()
116+
finally:
117+
if workflow_descriptor_file is not None:
118+
workflow_descriptor_file.close()
119+
120+
@catch_exceptions
63121
def RunWorkflow(self, body):
64122
if body["workflow_type"] != "CWL" or body["workflow_type_version"] != "v1.0": # NOQA
65123
return
66124

125+
if not connexion.request.headers.get('Authorization'):
126+
raise MissingAuthorization()
127+
67128
env = {
68129
"PATH": os.environ["PATH"],
69130
"ARVADOS_API_HOST": os.environ["ARVADOS_API_HOST"],
70131
"ARVADOS_API_TOKEN": connexion.request.headers['Authorization'],
71132
"ARVADOS_API_HOST_INSECURE": os.environ.get("ARVADOS_API_HOST_INSECURE", "false") # NOQA
72133
}
73-
with tempfile.NamedTemporaryFile() as inputtemp:
74-
json.dump(body["workflow_params"], inputtemp)
75-
inputtemp.flush()
76-
workflow_id = subprocess.check_output(["arvados-cwl-runner", "--submit", "--no-wait", "--api=containers", # NOQA
77-
body.get("workflow_url"), inputtemp.name], env=env).strip() # NOQA
78-
return {"workflow_id": workflow_id}
79134

135+
api = get_api()
136+
137+
cr = api.container_requests().create(body={"container_request":
138+
{"command": [""],
139+
"container_image": "n/a",
140+
"state": "Uncommitted",
141+
"output_path": "n/a",
142+
"priority": 500}}).execute()
143+
144+
workflow_url = body.get("workflow_url")
145+
workflow_descriptor_file = None
146+
if body.get("workflow_descriptor"):
147+
workflow_descriptor_file = tempfile.NamedTemporaryFile()
148+
workflow_descriptor_file.write(body.get('workflow_descriptor'))
149+
workflow_descriptor_file.flush()
150+
workflow_url = workflow_descriptor_file.name
151+
152+
threading.Thread(target=self.invoke_cwl_runner, args=(cr["uuid"],
153+
workflow_url,
154+
body["workflow_params"],
155+
env,
156+
workflow_descriptor_file)).start()
157+
158+
return {"workflow_id": cr["uuid"]}
159+
160+
@catch_exceptions
80161
def GetWorkflowLog(self, workflow_id):
81162
api = get_api()
82163

83164
request = api.container_requests().get(uuid=workflow_id).execute()
84-
container = api.containers().get(uuid=request["container_uuid"]).execute() # NOQA
165+
if request["container_uuid"]:
166+
container = api.containers().get(uuid=request["container_uuid"]).execute() # NOQA
167+
else:
168+
container = {"state": "Queued", "exit_code": None}
169+
170+
stderr = request["properties"].get("arvados-cwl-runner-log", "")
85171

86172
outputobj = {}
87173
if request["output_uuid"]:
@@ -91,16 +177,15 @@ def GetWorkflowLog(self, workflow_id):
91177

92178
def keepref(d):
93179
if isinstance(d, dict) and "location" in d:
94-
d["location"] = "keep:%s/%s" % (c.portable_data_hash(), d["location"]) # NOQA
180+
d["location"] = "%sc=%s/_/%s" % (api._resourceDesc["keepWebServiceUrl"], c.portable_data_hash(), d["location"]) # NOQA
95181

96182
visit(outputobj, keepref)
97183

98-
stderr = ""
99184
if request["log_uuid"]:
100185
c = arvados.collection.CollectionReader(request["log_uuid"], api_client=api)
101186
if "stderr.txt" in c:
102187
with c.open("stderr.txt") as f:
103-
stderr = f.read()
188+
stderr += f.read()
104189

105190
r = {
106191
"workflow_id": request["uuid"],
@@ -120,15 +205,22 @@ def keepref(d):
120205
r["workflow_log"]["exit_code"] = container["exit_code"]
121206
return r
122207

208+
@catch_exceptions
123209
def CancelJob(self, workflow_id): # NOQA
124210
api = get_api()
125-
request = api.container_requests().update(body={"priority": 0}).execute() # NOQA
211+
request = api.container_requests().update(uuid=workflow_id, body={"priority": 0}).execute() # NOQA
126212
return {"workflow_id": request["uuid"]}
127213

214+
@catch_exceptions
128215
def GetWorkflowStatus(self, workflow_id):
129216
api = get_api()
130217
request = api.container_requests().get(uuid=workflow_id).execute()
131-
container = api.containers().get(uuid=request["container_uuid"]).execute() # NOQA
218+
if request["container_uuid"]:
219+
container = api.containers().get(uuid=request["container_uuid"]).execute() # NOQA
220+
elif request["priority"] == 0:
221+
container = {"state": "Cancelled"}
222+
else:
223+
container = {"state": "Queued"}
132224
return {"workflow_id": request["uuid"],
133225
"state": statemap[container["state"]]}
134226

0 commit comments

Comments
 (0)