Skip to content

Commit 962983a

Browse files
authored
Update to connexion 3 (#136)
1 parent 28a48a5 commit 962983a

10 files changed

+73
-62
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ test: $(PYSOURCES) FORCE
150150

151151
## testcov : run the wes-service test suite and collect coverage
152152
testcov: $(PYSOURCES)
153-
python -m pytest -rsx --cov ${PYTEST_EXTRA}
153+
python -m pytest ${PYTEST_EXTRA} -rsx --cov
154154

155155
sloccount.sc: $(PYSOURCES) Makefile
156156
sloccount --duplicates --wide --details $^ > $@

mypy-requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ types-PyYAML
33
types-requests
44
types-setuptools
55
arvados-cwl-runner
6+
flask

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta"
66

77
[project]
88
name = "wes-service"
9-
version = "4.0"
9+
version = "5.0"
1010
authors = [{name = "GA4GH Containers and Workflows task team", email = "[email protected]"}]
1111
description = "GA4GH Workflow Execution Service reference implementation"
1212
classifiers = [
@@ -23,7 +23,7 @@ classifiers = [
2323
]
2424
requires-python = ">=3.9"
2525
dependencies = [
26-
"connexion[swagger-ui] >= 2.0.2, < 3",
26+
"connexion[swagger-ui,flask,uvicorn] >= 3, < 4",
2727
"ruamel.yaml >= 0.15.78",
2828
"schema-salad",
2929
]

test/test_integration.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,6 @@ def setUp(self) -> None:
246246
"--opt",
247247
"runner=cwltool",
248248
"--port=8080",
249-
"--debug",
250249
]
251250
)
252251
time.sleep(5)
@@ -304,7 +303,6 @@ def setUp(self) -> None:
304303
os.path.abspath("wes_service/wes_service_main.py"),
305304
"--backend=wes_service.arvados_wes",
306305
"--port=8080",
307-
"--debug",
308306
]
309307
)
310308
self.client.auth = {

wes_client/util.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ def fixpaths(d: Any) -> None:
128128

129129
def build_wes_request(
130130
workflow_file: str, json_path: str, attachments: Optional[list[str]] = None
131-
) -> list[tuple[str, Any]]:
131+
) -> tuple[list[tuple[str, Any]], list[tuple[str, Any]]]:
132132
"""
133133
:param workflow_file: Path to cwl/wdl file. Can be http/https/file.
134134
:param json_path: Path to accompanying json file.
@@ -157,10 +157,12 @@ def build_wes_request(
157157
("workflow_type_version", wf_version),
158158
]
159159

160+
workflow_attachments = []
161+
160162
if workflow_file.startswith("file://"):
161163
if wfbase is None:
162164
wfbase = os.path.dirname(workflow_file[7:])
163-
parts.append(
165+
workflow_attachments.append(
164166
(
165167
"workflow_attachment",
166168
(os.path.basename(workflow_file[7:]), open(workflow_file[7:], "rb")),
@@ -182,9 +184,9 @@ def build_wes_request(
182184
attach_f = urlopen(attachment) # nosec B310
183185
relpath = os.path.basename(attach_f)
184186

185-
parts.append(("workflow_attachment", (relpath, attach_f)))
187+
workflow_attachments.append(("workflow_attachment", (relpath, attach_f)))
186188

187-
return parts
189+
return parts, workflow_attachments
188190

189191

190192
def expand_globs(attachments: Optional[Union[list[str], str]]) -> set[str]:
@@ -275,11 +277,12 @@ def run(
275277
:return: The body of the post result as a dictionary.
276278
"""
277279
attachments = list(expand_globs(attachments))
278-
parts = build_wes_request(wf, jsonyaml, attachments)
280+
parts, files = build_wes_request(wf, jsonyaml, attachments)
279281
postresult = requests.post( # nosec B113
280282
f"{self.proto}://{self.host}/ga4gh/wes/v1/runs",
281-
files=parts,
282-
headers=self.auth,
283+
data=parts,
284+
files=files,
285+
# headers=self.auth,
283286
)
284287
return wes_response(postresult)
285288

wes_service/arvados_wes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ def RunWorkflow(
298298
)
299299

300300
try:
301-
tempdir, body = self.collect_attachments(cr["uuid"])
301+
tempdir, body = self.collect_attachments(args, cr["uuid"])
302302

303303
workflow_engine_parameters = cast(
304304
dict[str, Any], body.get("workflow_engine_parameters", {})

wes_service/cwl_runner.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ def ListRuns(
198198

199199
def RunWorkflow(self, **args: str) -> dict[str, str]:
200200
"""Submit the workflow run request."""
201-
tempdir, body = self.collect_attachments()
201+
tempdir, body = self.collect_attachments(args)
202202

203203
run_id = uuid.uuid4().hex
204204
job = Workflow(run_id)

wes_service/toil_wes.py

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
"""Toil backed for the WES service."""
2+
3+
import errno
14
import json
25
import logging
36
import os
@@ -294,18 +297,41 @@ def getstate(self) -> tuple[str, int]:
294297
logging.info("Workflow " + self.run_id + ": EXECUTOR_ERROR")
295298
open(self.staterrorfile, "a").close()
296299
return "EXECUTOR_ERROR", 255
300+
301+
# get the jobstore
302+
with open(self.jobstorefile, "r") as f:
303+
jobstore = f.read().rstrip()
297304
if (
298305
subprocess.run( # nosec B603
299306
[
300307
shutil.which("toil") or "toil",
301308
"status",
302309
"--failIfNotComplete",
303-
self.jobstorefile,
310+
jobstore,
304311
]
305312
).returncode
306313
== 0
307314
):
308-
completed = True
315+
# Get the PID of the running process
316+
with open(self.pidfile, "r") as f:
317+
pid = int(f.read())
318+
try:
319+
os.kill(pid, 0)
320+
except OSError as e:
321+
if e.errno == errno.ESRCH:
322+
# Process is no longer running, could be completed
323+
completed = True
324+
# Reap zombie child processes in a non-blocking manner
325+
# os.WNOHANG still raises an error if no child processes exist
326+
try:
327+
os.waitpid(pid, os.WNOHANG)
328+
except OSError as e:
329+
if e.errno != errno.ECHILD:
330+
raise
331+
else:
332+
raise
333+
# If no exception, process is still running
334+
# We can't rely on toil status as the process may not have created the jobstore yet
309335
if completed:
310336
logging.info("Workflow " + self.run_id + ": COMPLETE")
311337
open(self.statcompletefile, "a").close()
@@ -354,9 +380,9 @@ def ListRuns(
354380
workflows = [{"run_id": w.run_id, "state": w.getstate()[0]} for w in wf] # NOQA
355381
return {"workflows": workflows, "next_page_token": ""}
356382

357-
def RunWorkflow(self) -> dict[str, str]:
383+
def RunWorkflow(self, **args: str) -> dict[str, str]:
358384
"""Submit the workflow run request."""
359-
tempdir, body = self.collect_attachments()
385+
tempdir, body = self.collect_attachments(args)
360386

361387
run_id = uuid.uuid4().hex
362388
job = ToilWorkflow(run_id)

wes_service/util.py

Lines changed: 26 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import tempfile
55
from typing import Any, Callable, Optional
66

7-
import connexion # type: ignore[import-untyped]
87
from werkzeug.utils import secure_filename
98

109

@@ -49,52 +48,37 @@ def log_for_run(self, run_id: Optional[str], message: str) -> None:
4948
logging.info("Workflow %s: %s", run_id, message)
5049

5150
def collect_attachments(
52-
self, run_id: Optional[str] = None
51+
self, args: dict[str, Any], run_id: Optional[str] = None
5352
) -> tuple[str, dict[str, str]]:
5453
"""Stage all attachments to a temporary directory."""
5554
tempdir = tempfile.mkdtemp()
5655
body: dict[str, str] = {}
5756
has_attachments = False
58-
for k, ls in connexion.request.files.lists():
59-
try:
60-
for v in ls:
61-
if k == "workflow_attachment":
62-
sp = v.filename.split("/")
63-
fn = []
64-
for p in sp:
65-
if p not in ("", ".", ".."):
66-
fn.append(secure_filename(p))
67-
dest = os.path.join(tempdir, *fn)
68-
if not os.path.isdir(os.path.dirname(dest)):
69-
os.makedirs(os.path.dirname(dest))
70-
self.log_for_run(
71-
run_id,
72-
f"Staging attachment {v.filename!r} to {dest!r}",
73-
)
74-
v.save(dest)
75-
has_attachments = True
76-
body[k] = (
77-
"file://%s" % tempdir
78-
) # Reference to temp working dir.
79-
elif k in ("workflow_params", "tags", "workflow_engine_parameters"):
80-
content = v.read()
81-
body[k] = json.loads(content.decode("utf-8"))
82-
else:
83-
body[k] = v.read().decode()
84-
except Exception as e:
85-
raise ValueError(f"Error reading parameter {k!r}: {e}") from e
86-
for k, ls in connexion.request.form.lists():
87-
try:
88-
for v in ls:
89-
if not v:
90-
continue
91-
if k in ("workflow_params", "tags", "workflow_engine_parameters"):
92-
body[k] = json.loads(v)
93-
else:
94-
body[k] = v
95-
except Exception as e:
96-
raise ValueError(f"Error reading parameter {k!r}: {e}") from e
97-
57+
for k, v in args.items():
58+
if k == "workflow_attachment":
59+
for file in v or []:
60+
sp = file.filename.split("/")
61+
fn = []
62+
for p in sp:
63+
if p not in ("", ".", ".."):
64+
fn.append(secure_filename(p))
65+
dest = os.path.join(tempdir, *fn)
66+
if not os.path.isdir(os.path.dirname(dest)):
67+
os.makedirs(os.path.dirname(dest))
68+
self.log_for_run(
69+
run_id,
70+
f"Staging attachment {file.filename!r} to {dest!r}",
71+
)
72+
file.save(dest)
73+
has_attachments = True
74+
body["workflow_attachment"] = (
75+
"file://%s" % tempdir
76+
) # Reference to temp working dir.
77+
elif k in ("workflow_params", "tags", "workflow_engine_parameters"):
78+
if v is not None:
79+
body[k] = json.loads(v)
80+
else:
81+
body[k] = v
9882
if "workflow_url" in body:
9983
if ":" not in body["workflow_url"]:
10084
if not has_attachments:

wes_service/wes_service_main.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ def get_parser() -> argparse.ArgumentParser:
6363
help="Example: '--opt runner=cwltoil --opt extra=--logLevel=CRITICAL' "
6464
"or '--opt extra=--workDir=/'. Accepts multiple values.",
6565
)
66-
parser.add_argument("--debug", action="store_true", default=False)
6766
parser.add_argument("--version", action="store_true", default=False)
6867
return parser
6968

@@ -78,7 +77,7 @@ def main(argv: list[str] = sys.argv[1:]) -> None:
7877

7978
app = setup(args)
8079

81-
app.run(port=args.port, debug=args.debug)
80+
app.run(port=args.port)
8281

8382

8483
if __name__ == "__main__":

0 commit comments

Comments
 (0)