Skip to content

Commit 6515d77

Browse files
author
Peter Amstutz
committed
Support workflow_descriptor in arvados_wes and wes-client.
1 parent 58179f6 commit 6515d77

File tree

2 files changed

+39
-10
lines changed

2 files changed

+39
-10
lines changed

wes_client/__init__.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,28 +99,40 @@ def fixpaths(d):
9999
else:
100100
d["location"] = d["path"]
101101
del d["path"]
102+
loc = d.get("location", "")
102103
if d.get("class") == "Directory":
103-
loc = d.get("location", "")
104104
if loc.startswith("http:") or loc.startswith("https:"):
105105
logging.error("Directory inputs not supported with http references")
106106
exit(33)
107+
if not (loc.startswith("http:") or loc.startswith("https:")
108+
or args.job_order.startswith("http:") or args.job_order.startswith("https:")):
109+
logging.error("Upload local files not supported, must use http: or https: references.")
110+
exit(33)
107111

108112
visit(input, fixpaths)
109113

110114
workflow_url = args.workflow_url
111115
if not workflow_url.startswith("/") and ":" not in workflow_url:
112-
workflow_url = os.path.abspath(workflow_url)
116+
workflow_url = "file://" + os.path.abspath(workflow_url)
113117

114118
if args.quiet:
115119
logging.basicConfig(level=logging.WARNING)
116120
else:
117121
logging.basicConfig(level=logging.INFO)
118122

119-
r = client.WorkflowExecutionService.RunWorkflow(body={
120-
"workflow_url": workflow_url,
123+
body = {
121124
"workflow_params": input,
122125
"workflow_type": "CWL",
123-
"workflow_type_version": "v1.0"}).result()
126+
"workflow_type_version": "v1.0"
127+
}
128+
129+
if workflow_url.startswith("file://"):
130+
with open(workflow_url[7:], "r") as f:
131+
body["workflow_descriptor"] = f.read()
132+
else:
133+
body["workflow_url"] = workflow_url
134+
135+
r = client.WorkflowExecutionService.RunWorkflow(body=body).result()
124136

125137
if args.wait:
126138
logging.info("Workflow id is %s", r["workflow_id"])

wes_service/arvados_wes.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ def ListWorkflows(self):
8787
"next_page_token": ""
8888
}
8989

90-
def invoke_cwl_runner(self, cr_uuid, workflow_url, workflow_params, env):
90+
def invoke_cwl_runner(self, cr_uuid, workflow_url, workflow_params, env, workflow_descriptor_file):
9191
api = arvados.api_from_config(version="v1", apiconfig={
9292
"ARVADOS_API_HOST": env["ARVADOS_API_HOST"],
9393
"ARVADOS_API_TOKEN": env['ARVADOS_API_TOKEN'],
@@ -98,10 +98,12 @@ def invoke_cwl_runner(self, cr_uuid, workflow_url, workflow_params, env):
9898
with tempfile.NamedTemporaryFile() as inputtemp:
9999
json.dump(workflow_params, inputtemp)
100100
inputtemp.flush()
101+
# TODO: run submission process in a container to prevent
102+
# a-c-r submission processes from seeing each other.
101103
proc = subprocess.Popen(["arvados-cwl-runner", "--submit-request-uuid="+cr_uuid, # NOQA
102-
"--submit", "--no-wait", "--api=containers", # NOQA
103-
workflow_url, inputtemp.name], env=env,
104-
stdout=subprocess.PIPE, stderr=subprocess.PIPE) # NOQA
104+
"--submit", "--no-wait", "--api=containers", # NOQA
105+
workflow_url, inputtemp.name], env=env,
106+
stdout=subprocess.PIPE, stderr=subprocess.PIPE) # NOQA
105107
(stdoutdata, stderrdata) = proc.communicate()
106108
if proc.returncode != 0:
107109
api.container_requests().update(uuid=cr_uuid, body={"priority": 0,
@@ -111,6 +113,9 @@ def invoke_cwl_runner(self, cr_uuid, workflow_url, workflow_params, env):
111113
except subprocess.CalledProcessError as e:
112114
api.container_requests().update(uuid=cr_uuid, body={"priority": 0,
113115
"properties": {"arvados-cwl-runner-log": str(e)}}).execute()
116+
finally:
117+
if workflow_descriptor_file is not None:
118+
workflow_descriptor_file.close()
114119

115120
@catch_exceptions
116121
def RunWorkflow(self, body):
@@ -136,7 +141,19 @@ def RunWorkflow(self, body):
136141
"output_path": "n/a",
137142
"priority": 500}}).execute()
138143

139-
threading.Thread(target=self.invoke_cwl_runner, args=(cr["uuid"], body.get("workflow_url"), body["workflow_params"], env)).start()
144+
workflow_url = body.get("workflow_url")
145+
workflow_descriptor_file = None
146+
if body.get("workflow_descriptor"):
147+
workflow_descriptor_file = tempfile.NamedTemporaryFile()
148+
workflow_descriptor_file.write(body.get('workflow_descriptor'))
149+
workflow_descriptor_file.flush()
150+
workflow_url = workflow_descriptor_file.name
151+
152+
threading.Thread(target=self.invoke_cwl_runner, args=(cr["uuid"],
153+
workflow_url,
154+
body["workflow_params"],
155+
env,
156+
workflow_descriptor_file)).start()
140157

141158
return {"workflow_id": cr["uuid"]}
142159

0 commit comments

Comments
 (0)