From b25385c90dd1d85e2dba903db418eb4a03fdeb54 Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Thu, 19 Sep 2024 15:04:10 -0400 Subject: [PATCH] [wip] job results representation negotiation --- docs/source/processes.rst | 4 + tests/functional/test_wps_package.py | 36 ++++-- tests/functional/utils.py | 12 +- tests/test_execute.py | 92 ++++++++++++++++ tests/test_utils.py | 89 --------------- weaver/execute.py | 128 +++++++++++++++++++++- weaver/processes/execution.py | 3 +- weaver/typedefs.py | 10 ++ weaver/utils.py | 107 ------------------ weaver/wps_restapi/jobs/utils.py | 76 ++++++++++--- weaver/wps_restapi/quotation/quotes.py | 4 +- weaver/wps_restapi/swagger_definitions.py | 21 +++- 12 files changed, 351 insertions(+), 231 deletions(-) create mode 100644 tests/test_execute.py diff --git a/docs/source/processes.rst b/docs/source/processes.rst index 7e668bf02..9602330c8 100644 --- a/docs/source/processes.rst +++ b/docs/source/processes.rst @@ -2007,6 +2007,10 @@ format is employed according to the chosen location. .. seealso:: For the :term:`WPS` endpoint, refer to :ref:`conf_settings`. +.. _proc_op_job_status: +.. fixme: add example +.. fixme: describe minimum fields and extra fields + .. _proc_op_result: Obtaining Job Outputs, Results, Logs or Errors diff --git a/tests/functional/test_wps_package.py b/tests/functional/test_wps_package.py index 7b2fdb9e5..c5f4b52e3 100644 --- a/tests/functional/test_wps_package.py +++ b/tests/functional/test_wps_package.py @@ -14,7 +14,6 @@ import json import logging import os -import re import shutil import tempfile from inspect import cleandoc @@ -124,10 +123,6 @@ def setUpClass(cls) -> None: def setUp(self) -> None: self.process_store.clear_processes() - @classmethod - def request(cls, method, url, *args, **kwargs): - raise NotImplementedError # not used - def test_deploy_cwl_label_as_process_title(self): title = "This process title comes from the CWL label" cwl = { @@ -3521,6 +3516,31 @@ def test_execute_cwl_enum_schema_combined_type_single_array_from_wps(self, mock_ assert results + +@pytest.mark.functional +class WpsPackageAppTestResultResponses(WpsConfigBase, ResourcesUtil): + """ + Tests to evaluate the various combinations of results response representations. + + .. seealso:: + - :ref:`proc_exec_results` + - :ref:`proc_op_job_results` + """ + @classmethod + def setUpClass(cls) -> None: + cls.settings = { + "weaver.wps": True, + "weaver.wps_path": "/ows/wps", + "weaver.wps_restapi_path": "/", + "weaver.wps_output_path": "/wpsoutputs", + "weaver.wps_output_url": "http://localhost/wpsoutputs", + "weaver.wps_output_dir": "/tmp/weaver-test/wps-outputs", # nosec: B108 # don't care hardcoded for test + } + super(WpsPackageAppTestResultResponses, cls).setUpClass() + + def setUp(self) -> None: + self.process_store.clear_processes() + def test_execute_single_output_prefer_header_return_representation_literal(self): proc = "EchoResultsTester" p_id = self.fully_qualified_test_process_name(proc) @@ -3781,7 +3801,7 @@ def test_execute_single_output_response_raw_value_complex(self): assert outputs.json["outputs"] == { "output_json": { "value": output_json, - "type": ContentType.APP_JSON, + "mediaType": ContentType.APP_JSON, }, } @@ -4362,7 +4382,7 @@ def test_execute_multi_output_prefer_header_return_minimal_override_transmission }, "output_json": { "value": output_json, - "type": ContentType.APP_JSON, + "mediaType": ContentType.APP_JSON, }, "output_text": { "href": f"{out_url}/{job_id}/output_text/output.txt", @@ -4492,7 +4512,7 @@ def test_execute_multi_output_response_document_mixed(self): }, "output_json": { "value": output_json, - "type": ContentType.APP_JSON, + "mediaType": ContentType.APP_JSON, }, "output_text": { "href": f"{out_url}/{job_id}/output_text/output.txt", diff --git a/tests/functional/utils.py b/tests/functional/utils.py index 6f14739df..dbdfe31bb 100644 --- a/tests/functional/utils.py +++ b/tests/functional/utils.py @@ -48,6 +48,7 @@ AnyUUID, CWL, ExecutionResults, + JobStatusResponse, JSON, ProcessDeployment, ProcessDescription, @@ -433,7 +434,7 @@ def deploy_process(cls, return info # type: ignore def _try_get_logs(self, status_url): - _resp = self.app.get(f"{status_url}/logs", headers=self.json_headers) + _resp = self.app.get(f"{status_url}/logs", headers=dict(self.json_headers)) if _resp.status_code == 200: _text = "\n".join(_resp.json) return f"Error logs:\n{_text}" @@ -445,6 +446,11 @@ def fully_qualified_test_process_name(self, name=""): test_name = f"{class_name}.{self._testMethodName}{extra_name}".replace(".", "-") return test_name + @overload + def monitor_job(self, status_url, return_status=False, **__): + # type: (str, Literal[True], **Any) -> JobStatusResponse + ... + def monitor_job(self, status_url, # type: str timeout=None, # type: Optional[int] @@ -452,7 +458,7 @@ def monitor_job(self, return_status=False, # type: bool wait_for_status=None, # type: Optional[str] expect_failed=False, # type: bool - ): # type: (...) -> ExecutionResults + ): # type: (...) -> Union[ExecutionResults, JobStatusResponse] """ Job polling of status URL until completion or timeout. @@ -505,7 +511,7 @@ def check_job_status(_resp, running=False): return resp.json def get_outputs(self, status_url): - resp = self.app.get(f"{status_url}/outputs", headers=self.json_headers) + resp = self.app.get(f"{status_url}/outputs", headers=dict(self.json_headers)) body = resp.json pretty = json.dumps(body, indent=2, ensure_ascii=False) assert resp.status_code == 200, f"Get outputs failed:\n{pretty}\n{self._try_get_logs(status_url)}" diff --git a/tests/test_execute.py b/tests/test_execute.py new file mode 100644 index 000000000..4cace41a0 --- /dev/null +++ b/tests/test_execute.py @@ -0,0 +1,92 @@ +import itertools + +import pytest +from pyramid.httpexceptions import HTTPBadRequest + +from weaver.execute import ExecuteControlOption, ExecuteMode, ExecuteReturnPreference, parse_prefer_header_execute_mode + + +@pytest.mark.parametrize( + ["headers", "support", "expected", "extra_prefer"], + [ + # both modes supported (sync attempted upto max/specified wait time, unless async requested explicitly) + ({}, [ExecuteControlOption.ASYNC, ExecuteControlOption.SYNC], (ExecuteMode.SYNC, 10, {}), ""), + # only supported async (enforced) - original behaviour + ({}, [ExecuteControlOption.ASYNC], (ExecuteMode.ASYNC, None, {}), ""), + ] + + [ + (_headers, _support, _expected, _extra) + for (_headers, _support, _expected), _extra + in itertools.product( + [ + # both modes supported (sync attempted upto max/specified wait time, unless async requested explicitly) + ({"Prefer": ""}, [ExecuteControlOption.ASYNC, ExecuteControlOption.SYNC], + (ExecuteMode.SYNC, 10, {})), + ({"Prefer": "respond-async"}, [ExecuteControlOption.ASYNC, ExecuteControlOption.SYNC], + (ExecuteMode.ASYNC, None, {"Preference-Applied": "respond-async"})), + ({"Prefer": "respond-async, wait=4"}, [ExecuteControlOption.ASYNC, ExecuteControlOption.SYNC], + (ExecuteMode.ASYNC, None, {"Preference-Applied": "respond-async"})), + ({"Prefer": "wait=4"}, [ExecuteControlOption.ASYNC, ExecuteControlOption.SYNC], + (ExecuteMode.SYNC, 4, {"Preference-Applied": "wait=4"})), + ({"Prefer": "wait=20"}, [ExecuteControlOption.ASYNC, ExecuteControlOption.SYNC], + (ExecuteMode.ASYNC, None, {})), # larger than max time + # only supported async (enforced) - original behaviour + ({"Prefer": ""}, [ExecuteControlOption.ASYNC], + (ExecuteMode.ASYNC, None, {})), + ({"Prefer": "respond-async"}, [ExecuteControlOption.ASYNC], + (ExecuteMode.ASYNC, None, {"Preference-Applied": "respond-async"})), + ({"Prefer": "respond-async, wait=4"}, [ExecuteControlOption.ASYNC], + (ExecuteMode.ASYNC, None, {"Preference-Applied": "respond-async"})), + ({"Prefer": "wait=4"}, [ExecuteControlOption.ASYNC], + (ExecuteMode.ASYNC, None, {})), + + ], + [ + "", + f"return={ExecuteReturnPreference.MINIMAL}", + f"return={ExecuteReturnPreference.REPRESENTATION}" + # FIXME: + # Support with added ``Prefer: handling=strict`` or ``Prefer: handling=lenient`` + # https://github.com/crim-ca/weaver/issues/701 + ] + ) + ] +) +def test_prefer_header_execute_mode(headers, support, expected, extra_prefer): + if extra_prefer and "Prefer" in headers: + headers["Prefer"] += f", {extra_prefer}" if headers["Prefer"] else extra_prefer + result = parse_prefer_header_execute_mode(headers, support) + assert result == expected + + +@pytest.mark.parametrize( + ["headers", "expected"], + [ + # 1st variant is considered as 1 Prefer header with all values supplied simultaneously + # 2nd variant is considered as 2 Prefer headers, each with their respective value + # (this is because urllib, under the hood, concatenates the list of header-values using ';' separator) + ({"Prefer": "respond-async, wait=4"}, (ExecuteMode.ASYNC, None, {"Preference-Applied": "respond-async"})), + ({"Prefer": "respond-async; wait=4"}, (ExecuteMode.ASYNC, None, {"Preference-Applied": "respond-async"})), + ] +) +def test_parse_prefer_header_execute_mode_flexible(headers, expected): + """ + Ensure that the ``Prefer`` header supplied multiple times (allowed by :rfc:`7240`) is handled correctly. + """ + result = parse_prefer_header_execute_mode(headers, [ExecuteControlOption.ASYNC, ExecuteControlOption.SYNC]) + assert result == expected + + +@pytest.mark.parametrize("prefer_header", [ + "wait=10s", + "wait=3.1416", + "wait=yes", + "wait=1,2,3", # technically, gets parsed as 'wait=1' (valid) and other '2', '3' parameters on their own + "wait=1;2;3", + "wait=1, wait=2", + "wait=1; wait=2", +]) +def test_parse_prefer_header_execute_mode_invalid(prefer_header): + headers = {"Prefer": prefer_header} + with pytest.raises(HTTPBadRequest): + parse_prefer_header_execute_mode(headers, [ExecuteControlOption.ASYNC]) diff --git a/tests/test_utils.py b/tests/test_utils.py index 6d7f1cfd9..d046f9481 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -23,7 +23,6 @@ from beaker.cache import cache_region from mypy_boto3_s3.literals import RegionName from pyramid.httpexceptions import ( - HTTPBadRequest, HTTPConflict, HTTPCreated, HTTPError as PyramidHTTPError, @@ -48,7 +47,6 @@ setup_test_file_hierarchy ) from weaver import xml_util -from weaver.execute import ExecuteControlOption, ExecuteMode, ExecuteReturnPreference from weaver.formats import ContentEncoding, ContentType, repr_json from weaver.status import JOB_STATUS_CATEGORIES, STATUS_PYWPS_IDS, STATUS_PYWPS_MAP, Status, StatusCompliant, map_status from weaver.utils import ( @@ -83,7 +81,6 @@ null, parse_kvp, parse_number_with_unit, - parse_prefer_header_execute_mode, pass_http_error, request_extra, resolve_s3_from_http, @@ -2122,92 +2119,6 @@ def test_parse_kvp(query, params, expected): assert result == expected -@pytest.mark.parametrize( - ["headers", "support", "expected", "extra_prefer"], - [ - # both modes supported (sync attempted upto max/specified wait time, unless async requested explicitly) - ({}, [ExecuteControlOption.ASYNC, ExecuteControlOption.SYNC], (ExecuteMode.SYNC, 10, {}), ""), - # only supported async (enforced) - original behaviour - ({}, [ExecuteControlOption.ASYNC], (ExecuteMode.ASYNC, None, {}), ""), - ] + - [ - (_headers, _support, _expected, _extra) - for (_headers, _support, _expected), _extra - in itertools.product( - [ - # both modes supported (sync attempted upto max/specified wait time, unless async requested explicitly) - ({"Prefer": ""}, [ExecuteControlOption.ASYNC, ExecuteControlOption.SYNC], - (ExecuteMode.SYNC, 10, {})), - ({"Prefer": "respond-async"}, [ExecuteControlOption.ASYNC, ExecuteControlOption.SYNC], - (ExecuteMode.ASYNC, None, {"Preference-Applied": "respond-async"})), - ({"Prefer": "respond-async, wait=4"}, [ExecuteControlOption.ASYNC, ExecuteControlOption.SYNC], - (ExecuteMode.ASYNC, None, {"Preference-Applied": "respond-async"})), - ({"Prefer": "wait=4"}, [ExecuteControlOption.ASYNC, ExecuteControlOption.SYNC], - (ExecuteMode.SYNC, 4, {"Preference-Applied": "wait=4"})), - ({"Prefer": "wait=20"}, [ExecuteControlOption.ASYNC, ExecuteControlOption.SYNC], - (ExecuteMode.ASYNC, None, {})), # larger than max time - # only supported async (enforced) - original behaviour - ({"Prefer": ""}, [ExecuteControlOption.ASYNC], - (ExecuteMode.ASYNC, None, {})), - ({"Prefer": "respond-async"}, [ExecuteControlOption.ASYNC], - (ExecuteMode.ASYNC, None, {"Preference-Applied": "respond-async"})), - ({"Prefer": "respond-async, wait=4"}, [ExecuteControlOption.ASYNC], - (ExecuteMode.ASYNC, None, {"Preference-Applied": "respond-async"})), - ({"Prefer": "wait=4"}, [ExecuteControlOption.ASYNC], - (ExecuteMode.ASYNC, None, {})), - - ], - [ - "", - f"return={ExecuteReturnPreference.MINIMAL}", - f"return={ExecuteReturnPreference.REPRESENTATION}" - # FIXME: - # Support with added ``Prefer: handling=strict`` or ``Prefer: handling=lenient`` - # https://github.com/crim-ca/weaver/issues/701 - ] - ) - ] -) -def test_prefer_header_execute_mode(headers, support, expected, extra_prefer): - if extra_prefer and "Prefer" in headers: - headers["Prefer"] += f", {extra_prefer}" if headers["Prefer"] else extra_prefer - result = parse_prefer_header_execute_mode(headers, support) - assert result == expected - - -@pytest.mark.parametrize( - ["headers", "expected"], - [ - # 1st variant is considered as 1 Prefer header with all values supplied simultaneously - # 2nd variant is considered as 2 Prefer headers, each with their respective value - # (this is because urllib, under the hood, concatenates the list of header-values using ';' separator) - ({"Prefer": "respond-async, wait=4"}, (ExecuteMode.ASYNC, None, {"Preference-Applied": "respond-async"})), - ({"Prefer": "respond-async; wait=4"}, (ExecuteMode.ASYNC, None, {"Preference-Applied": "respond-async"})), - ] -) -def test_parse_prefer_header_execute_mode_flexible(headers, expected): - """ - Ensure that the ``Prefer`` header supplied multiple times (allowed by :rfc:`7240`) is handled correctly. - """ - result = parse_prefer_header_execute_mode(headers, [ExecuteControlOption.ASYNC, ExecuteControlOption.SYNC]) - assert result == expected - - -@pytest.mark.parametrize("prefer_header", [ - "wait=10s", - "wait=3.1416", - "wait=yes", - "wait=1,2,3", # technically, gets parsed as 'wait=1' (valid) and other '2', '3' parameters on their own - "wait=1;2;3", - "wait=1, wait=2", - "wait=1; wait=2", -]) -def test_parse_prefer_header_execute_mode_invalid(prefer_header): - headers = {"Prefer": prefer_header} - with pytest.raises(HTTPBadRequest): - parse_prefer_header_execute_mode(headers, [ExecuteControlOption.ASYNC]) - - @pytest.mark.parametrize(["number", "binary", "unit", "expect"], [ (1.234, False, "B", "1.234 B"), (10_000_000, False, "B", "10.000 MB"), diff --git a/weaver/execute.py b/weaver/execute.py index dfb7df14b..4a1e427be 100644 --- a/weaver/execute.py +++ b/weaver/execute.py @@ -1,9 +1,17 @@ +import logging from typing import TYPE_CHECKING +from pyramid.httpexceptions import HTTPBadRequest + from weaver.base import Constants +from weaver.utils import get_header, parse_kvp if TYPE_CHECKING: - from typing import List + from typing import List, Optional, Tuple + + from weaver.typedefs import AnyHeadersContainer, HeadersType + +LOGGER = logging.getLogger(__name__) class ExecuteMode(Constants): @@ -74,3 +82,121 @@ class ExecuteCollectionFormat(Constants): ExecuteCollectionFormat.OGC_MAP, ExecuteCollectionFormat.GEOJSON, ] + + +def parse_prefer_header_return(headers): + # type: (AnyHeadersContainer) -> Optional[ExecuteReturnPreference] + """ + Get the return preference if specified. + """ + prefer_header = get_header("prefer", headers) + prefer_params = parse_kvp(prefer_header) + prefer_return = prefer_params.get("return") + if prefer_return: + return ExecuteReturnPreference.get(prefer_return[0]) + + +def parse_prefer_header_execute_mode( + header_container, # type: AnyHeadersContainer + supported_modes=None, # type: Optional[List[AnyExecuteControlOption]] + wait_max=10, # type: int +): # type: (...) -> Tuple[AnyExecuteMode, Optional[int], HeadersType] + """ + Obtain execution preference if provided in request headers. + + .. seealso:: + - :term:`OGC API - Processes`: Core, Execution mode < + https://docs.ogc.org/is/18-062r2/18-062r2.html#sc_execution_mode>`_. + This defines all conditions how to handle ``Prefer`` against applicable :term:`Process` description. + - :rfc:`7240#section-4.1` HTTP Prefer header ``respond-async`` + + .. seealso:: + If ``Prefer`` format is valid, but server decides it cannot be respected, it can be transparently ignored + (:rfc:`7240#section-2`). The server must respond with ``Preference-Applied`` indicating preserved preferences + it decided to respect. + + :param header_container: Request headers to retrieve preference, if any available. + :param supported_modes: + Execute modes that are permitted for the operation that received the ``Prefer`` header. + Resolved mode will respect this constraint following specification requirements of :term:`OGC API - Processes`. + :param wait_max: + Maximum wait time enforced by the server. If requested wait time is greater, ``wait`` preference will not be + applied and will fall back to asynchronous response. + :return: + Tuple of resolved execution mode, wait time if specified, and header of applied preferences if possible. + Maximum wait time indicates duration until synchronous response should fall back to asynchronous response. + :raises HTTPBadRequest: If contents of ``Prefer`` are not valid. + """ + + prefer = get_header("prefer", header_container) + relevant_modes = {ExecuteControlOption.ASYNC, ExecuteControlOption.SYNC} + supported_modes = list(set(supported_modes or []).intersection(relevant_modes)) + + if not prefer: + # /req/core/process-execute-default-execution-mode (A & B) + if not supported_modes: + return ExecuteMode.ASYNC, None, {} # Weaver's default + if len(supported_modes) == 1: + mode = ExecuteMode.ASYNC if supported_modes[0] == ExecuteControlOption.ASYNC else ExecuteMode.SYNC + wait = None if mode == ExecuteMode.ASYNC else wait_max + return mode, wait, {} + # /req/core/process-execute-default-execution-mode (C) + return ExecuteMode.SYNC, wait_max, {} + + params = parse_kvp(prefer, pair_sep=",", multi_value_sep=None) + wait = wait_max + if "wait" in params: + try: + if any(param.isnumeric() for param in params): + # 'wait=x,y,z' parsed as 'wait=x' and 'y' / 'z' parameters on their own + # since 'wait' is the only referenced that users integers, it is guaranteed to be a misuse + raise ValueError("Invalid 'wait' with comma-separated values.") + if not len(params["wait"]) == 1: + raise ValueError("Too many values.") + wait = params["wait"][0] + if not str.isnumeric(wait) or "." in wait or wait.startswith("-"): + raise ValueError("Invalid integer for 'wait' in seconds.") + wait = int(wait) + except (TypeError, ValueError) as exc: + raise HTTPBadRequest(json={ + "code": "InvalidParameterValue", + "description": "HTTP Prefer header contains invalid 'wait' definition.", + "error": type(exc).__name__, + "cause": str(exc), + "value": str(params["wait"]), + }) + + if wait > wait_max: + LOGGER.info("Requested Prefer wait header too large (%ss > %ss), revert to async execution.", wait, wait_max) + return ExecuteMode.ASYNC, None, {} + + auto = ExecuteMode.ASYNC if "respond-async" in params else ExecuteMode.SYNC + applied_preferences = [] + # /req/core/process-execute-auto-execution-mode (A & B) + if len(supported_modes) == 1: + # supported mode is enforced, only indicate if it matches preferences to honour them + # otherwise, server is allowed to discard preference since it cannot be honoured + mode = ExecuteMode.ASYNC if supported_modes[0] == ExecuteControlOption.ASYNC else ExecuteMode.SYNC + wait = None if mode == ExecuteMode.ASYNC else wait_max + if auto == mode: + if auto == ExecuteMode.ASYNC: + applied_preferences.append("respond-async") + if wait and "wait" in params: + applied_preferences.append(f"wait={wait}") + # /rec/core/process-execute-honor-prefer (A: async & B: wait) + # https://datatracker.ietf.org/doc/html/rfc7240#section-3 + applied = {} + if applied_preferences: + applied = {"Preference-Applied": ", ".join(applied_preferences)} + return mode, wait, applied + + # Weaver's default, at server's discretion when both mode are supported + # /req/core/process-execute-auto-execution-mode (C) + if len(supported_modes) == 2: + if auto == ExecuteMode.ASYNC: + return ExecuteMode.ASYNC, None, {"Preference-Applied": "respond-async"} + if wait and "wait" in params: + return ExecuteMode.SYNC, wait, {"Preference-Applied": f"wait={wait}"} + if wait: # default used, not a supplied preference + return ExecuteMode.SYNC, wait, {} + return ExecuteMode.ASYNC, None, {} diff --git a/weaver/processes/execution.py b/weaver/processes/execution.py index e63f9bd20..ade7181d1 100644 --- a/weaver/processes/execution.py +++ b/weaver/processes/execution.py @@ -15,7 +15,7 @@ from weaver.database import get_db from weaver.datatype import Process, Service -from weaver.execute import ExecuteControlOption, ExecuteMode +from weaver.execute import ExecuteControlOption, ExecuteMode, parse_prefer_header_execute_mode from weaver.formats import AcceptLanguage, ContentType, clean_media_type_format, map_cwl_media_type, repr_json from weaver.notify import map_job_subscribers, notify_job_subscribers from weaver.owsexceptions import OWSInvalidParameterValue, OWSNoApplicableCode @@ -43,7 +43,6 @@ now, parse_kvp, parse_number_with_unit, - parse_prefer_header_execute_mode, raise_on_xml_exception, wait_secs ) diff --git a/weaver/typedefs.py b/weaver/typedefs.py index ae77096d5..4ad1469b6 100644 --- a/weaver/typedefs.py +++ b/weaver/typedefs.py @@ -523,6 +523,16 @@ class CWL_SchemaName(Protocol): "successEmail": NotRequired[str], "inProgressEmail": NotRequired[str], }, total=True) + JobStatusResponse = TypedDict("JobStatusResponse", { + "status": Required[AnyStatusType], + "type": Required[Literal["process", "provider"]], + "id": NotRequired[str], # TBD alternative to 'jobID' considered by SWG + "jobID": Required[str], + "processID": Required[str], + "providerID": NotRequired[Optional[str]], + "links": NotRequired[List[Link]], + # many other fields... only listing accessed ones in code + }, total=False) # when schema='weaver.processes.constants.ProcessSchema.OGC' ExecutionInputsMap = Dict[str, Union[AnyValueType, JobValueObject, List[JobValueObject]]] diff --git a/weaver/utils.py b/weaver/utils.py index 246d69a75..d6d39086d 100644 --- a/weaver/utils.py +++ b/weaver/utils.py @@ -65,7 +65,6 @@ from weaver.base import Constants, ExtendedEnum from weaver.compat import Version from weaver.exceptions import WeaverException -from weaver.execute import ExecuteControlOption, ExecuteMode from weaver.formats import ContentType, get_content_type, get_extension, get_format, repr_json from weaver.status import map_status from weaver.warning import TimeZoneInfoAlreadySetWarning, UndefinedContainerWarning @@ -800,112 +799,6 @@ def parse_kvp(query, # type: str return kvp -def parse_prefer_header_execute_mode( - header_container, # type: AnyHeadersContainer - supported_modes=None, # type: Optional[List[AnyExecuteControlOption]] - wait_max=10, # type: int -): # type: (...) -> Tuple[AnyExecuteMode, Optional[int], HeadersType] - """ - Obtain execution preference if provided in request headers. - - .. seealso:: - - :term:`OGC API - Processes`: Core, Execution mode < - https://docs.ogc.org/is/18-062r2/18-062r2.html#sc_execution_mode>`_. - This defines all conditions how to handle ``Prefer`` against applicable :term:`Process` description. - - :rfc:`7240#section-4.1` HTTP Prefer header ``respond-async`` - - .. seealso:: - If ``Prefer`` format is valid, but server decides it cannot be respected, it can be transparently ignored - (:rfc:`7240#section-2`). The server must respond with ``Preference-Applied`` indicating preserved preferences - it decided to respect. - - :param header_container: Request headers to retrieve preference, if any available. - :param supported_modes: - Execute modes that are permitted for the operation that received the ``Prefer`` header. - Resolved mode will respect this constraint following specification requirements of :term:`OGC API - Processes`. - :param wait_max: - Maximum wait time enforced by the server. If requested wait time is greater, ``wait`` preference will not be - applied and will fall back to asynchronous response. - :return: - Tuple of resolved execution mode, wait time if specified, and header of applied preferences if possible. - Maximum wait time indicates duration until synchronous response should fall back to asynchronous response. - :raises HTTPBadRequest: If contents of ``Prefer`` are not valid. - """ - - prefer = get_header("prefer", header_container) - relevant_modes = {ExecuteControlOption.ASYNC, ExecuteControlOption.SYNC} - supported_modes = list(set(supported_modes or []).intersection(relevant_modes)) - - if not prefer: - # /req/core/process-execute-default-execution-mode (A & B) - if not supported_modes: - return ExecuteMode.ASYNC, None, {} # Weaver's default - if len(supported_modes) == 1: - mode = ExecuteMode.ASYNC if supported_modes[0] == ExecuteControlOption.ASYNC else ExecuteMode.SYNC - wait = None if mode == ExecuteMode.ASYNC else wait_max - return mode, wait, {} - # /req/core/process-execute-default-execution-mode (C) - return ExecuteMode.SYNC, wait_max, {} - - params = parse_kvp(prefer, pair_sep=",", multi_value_sep=None) - wait = wait_max - if "wait" in params: - try: - if any(param.isnumeric() for param in params): - # 'wait=x,y,z' parsed as 'wait=x' and 'y' / 'z' parameters on their own - # since 'wait' is the only referenced that users integers, it is guaranteed to be a misuse - raise ValueError("Invalid 'wait' with comma-separated values.") - if not len(params["wait"]) == 1: - raise ValueError("Too many values.") - wait = params["wait"][0] - if not str.isnumeric(wait) or "." in wait or wait.startswith("-"): - raise ValueError("Invalid integer for 'wait' in seconds.") - wait = int(wait) - except (TypeError, ValueError) as exc: - raise HTTPBadRequest(json={ - "code": "InvalidParameterValue", - "description": "HTTP Prefer header contains invalid 'wait' definition.", - "error": type(exc).__name__, - "cause": str(exc), - "value": str(params["wait"]), - }) - - if wait > wait_max: - LOGGER.info("Requested Prefer wait header too large (%ss > %ss), revert to async execution.", wait, wait_max) - return ExecuteMode.ASYNC, None, {} - - auto = ExecuteMode.ASYNC if "respond-async" in params else ExecuteMode.SYNC - applied_preferences = [] - # /req/core/process-execute-auto-execution-mode (A & B) - if len(supported_modes) == 1: - # supported mode is enforced, only indicate if it matches preferences to honour them - # otherwise, server is allowed to discard preference since it cannot be honoured - mode = ExecuteMode.ASYNC if supported_modes[0] == ExecuteControlOption.ASYNC else ExecuteMode.SYNC - wait = None if mode == ExecuteMode.ASYNC else wait_max - if auto == mode: - if auto == ExecuteMode.ASYNC: - applied_preferences.append("respond-async") - if wait and "wait" in params: - applied_preferences.append(f"wait={wait}") - # /rec/core/process-execute-honor-prefer (A: async & B: wait) - # https://datatracker.ietf.org/doc/html/rfc7240#section-3 - applied = {} - if applied_preferences: - applied = {"Preference-Applied": ", ".join(applied_preferences)} - return mode, wait, applied - - # Weaver's default, at server's discretion when both mode are supported - # /req/core/process-execute-auto-execution-mode (C) - if len(supported_modes) == 2: - if auto == ExecuteMode.ASYNC: - return ExecuteMode.ASYNC, None, {"Preference-Applied": "respond-async"} - if wait and "wait" in params: - return ExecuteMode.SYNC, wait, {"Preference-Applied": f"wait={wait}"} - if wait: # default used, not a supplied preference - return ExecuteMode.SYNC, wait, {} - return ExecuteMode.ASYNC, None, {} - - def get_url_without_query(url): # type: (Union[str, ParseResult]) -> str """ diff --git a/weaver/wps_restapi/jobs/utils.py b/weaver/wps_restapi/jobs/utils.py index 2f9c4e321..e9a9f4f33 100644 --- a/weaver/wps_restapi/jobs/utils.py +++ b/weaver/wps_restapi/jobs/utils.py @@ -31,7 +31,7 @@ ServiceNotAccessible, ServiceNotFound ) -from weaver.execute import ExecuteResponse, ExecuteTransmissionMode +from weaver.execute import ExecuteResponse, ExecuteTransmissionMode, parse_prefer_header_return, ExecuteReturnPreference from weaver.formats import ContentType, get_format, repr_json from weaver.owsexceptions import OWSNoApplicableCode, OWSNotFound from weaver.processes.constants import JobInputsOutputsSchema @@ -57,7 +57,7 @@ from weaver.wps_restapi.providers.utils import forbid_local_only if TYPE_CHECKING: - from typing import Dict, List, Optional, Tuple, Union + from typing import Any, Dict, List, Optional, Tuple, Union from weaver.processes.constants import JobInputsOutputsSchemaType from weaver.typedefs import ( @@ -441,24 +441,69 @@ def get_results( # pylint: disable=R1260 return outputs, headers -def get_job_results_response(job, container, headers=None): - # type: (Job, AnySettingsContainer, Optional[AnyHeadersContainer]) -> AnyResponseType +def get_job_return(job, body=None, headers=None): + # type: (Job, Optional[JSON], Optional[AnyHeadersContainer]) -> ExecuteResponse + """ + Obtain the :term:`Job` result representation based on the resolution order of preferences and request parameters. + """ + body = body or {} + resp = ExecuteResponse.get(body.get("response")) + if resp: + return resp + + pref = parse_prefer_header_return(headers) + if pref == ExecuteReturnPreference.MINIMAL: + return ExecuteResponse.DOCUMENT + if pref == ExecuteReturnPreference.REPRESENTATION: + return ExecuteResponse.RAW + + return job.execution_response + + +def get_job_results_response( + job, # type: Job + container, # type: AnySettingsContainer + *, # type: Any + headers=None, # type: Optional[AnyHeadersContainer] + results_headers=None, # type: Optional[AnyHeadersContainer] + results_contents=None, # type: Optional[JSON] +): # type: (...) -> AnyResponseType """ Generates the :term:`OGC` compliant :term:`Job` results response according to submitted execution parameters. Parameters that impact the format of the response are: - - Amount of outputs to be returned. - - Parameter ``response: raw|document`` - - Parameter ``transmissionMode: value|reference`` per output if ``response: raw``. + - Body parameter ``outputs`` with the amount of *requested outputs* to be returned. + - Body parameter ``response: raw|document`` for content representation. + - Body parameter ``transmissionMode: value|reference`` per output. + - Header parameter ``Prefer: return=representation|minimal`` for content representation. + - Overrides, for any of the previous parameters, allowing request of an alternate representation. + + Resolution order/priority: + + 1. :paramref:`override_contents` + 2. :paramref:`override_headers` + 3. :paramref:`job` definitions + + The logic of the resolution order is that any body parameters resolving to an equivalent information provided + by header parameters will be more important, since ``Prefer`` are *soft* requirements, whereas body parameters + are *hard* requirements. The parameters stored in the :paramref:`job` are defined during :term:`Job` submission, + which become the "default" results representation if requested as is. If further parameters are provided to + override during the results request, they modify the "default" results representation. In this case, an header + provided in the results request overrides the body parameters from the original :term:`Job`, since their results + request context is "closer" than the ones at the time of the :term:`Job` submission. .. seealso:: More details available for each combination: - https://docs.ogc.org/is/18-062r2/18-062r2.html#sc_execute_response - https://docs.ogc.org/is/18-062r2/18-062r2.html#_response_7 + - :ref:`proc_op_job_results` + - :ref:`proc_exec_results` - :param job: Job for which to generate the results response. + :param job: Job for which to generate the results response, which contains the originally submitted parameters. :param container: Application settings. :param headers: Additional headers to provide in the response. + :param results_headers: Headers that override originally submitted job parameters when requesting results. + :param results_contents: Body contents that override originally submitted job parameters when requesting results. """ raise_job_dismissed(job, container) raise_job_bad_status(job, container) @@ -467,7 +512,7 @@ def get_job_results_response(job, container, headers=None): # See: # - https://docs.ogc.org/is/18-062r2/18-062r2.html#_response_7 (/req/core/job-results-async-document) # - https://docs.ogc.org/is/18-062r2/18-062r2.html#req_core_process-execute-sync-document - is_raw = job.execution_response == ExecuteResponse.RAW + is_raw = get_job_return(job, results_contents, results_headers) == ExecuteResponse.RAW results, refs = get_results(job, container, value_key="value", schema=JobInputsOutputsSchema.OGC, # not strict to provide more format details link_references=is_raw) @@ -477,7 +522,7 @@ def get_job_results_response(job, container, headers=None): if not is_raw: try: - results_schema = sd.Result() + results_schema = sd.ResultsDocument() results_json = results_schema.deserialize(results) if len(results_json) != len(results): # pragma: no cover # ensure no outputs silently dismissed raise colander.Invalid( @@ -547,18 +592,21 @@ def get_job_results_response(job, container, headers=None): def get_job_submission_response(body, headers, error=False): - # type: (JSON, AnyHeadersContainer, bool) -> Union[HTTPOk, HTTPCreated] + # type: (JSON, AnyHeadersContainer, bool) -> Union[HTTPOk, HTTPCreated, HTTPBadRequest] """ - Generates the successful response from contents returned by :term:`Job` submission process. + Generates the response contents returned by :term:`Job` submission process. If :term:`Job` already finished processing within requested ``Prefer: wait=X`` seconds delay (and if allowed by the :term:`Process` ``jobControlOptions``), return the successful status immediately instead of created status. + If the status is not successful, return the failed :term:`Job` status response. + Otherwise, return the status monitoring location of the created :term:`Job` to be monitored asynchronously. .. seealso:: - :func:`weaver.processes.execution.submit_job` - :func:`weaver.processes.execution.submit_job_handler` + - :func:`weaver.processes.execution.submit_job` + - :func:`weaver.processes.execution.submit_job_handler` + - :ref:`proc_op_job_status` """ # convert headers to pass as list to avoid any duplicate Content-related headers # otherwise auto-added by JSON handling when provided by dict-like structure diff --git a/weaver/wps_restapi/quotation/quotes.py b/weaver/wps_restapi/quotation/quotes.py index 49fd4c828..8cc0c3165 100644 --- a/weaver/wps_restapi/quotation/quotes.py +++ b/weaver/wps_restapi/quotation/quotes.py @@ -10,7 +10,7 @@ from weaver.database import get_db from weaver.datatype import Bill, Quote from weaver.exceptions import log_unhandled_exceptions -from weaver.execute import ExecuteMode +from weaver.execute import ExecuteMode, parse_prefer_header_execute_mode from weaver.formats import ContentType, OutputFormat from weaver.owsexceptions import OWSInvalidParameterValue from weaver.processes.execution import validate_process_io @@ -24,7 +24,7 @@ ) from weaver.sort import Sort from weaver.store.base import StoreBills, StoreProcesses, StoreQuotes -from weaver.utils import as_int, get_header, get_settings, parse_prefer_header_execute_mode +from weaver.utils import as_int, get_header, get_settings from weaver.wps_restapi import swagger_definitions as sd from weaver.wps_restapi.processes.processes import submit_local_job from weaver.wps_restapi.quotation.utils import get_quote diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index 11b46305d..29816937e 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -5924,10 +5924,8 @@ class ResultData(OneOfKeywordSchema): ] -class Result(ExtendedMappingSchema): - """ - Result outputs obtained from a successful process job execution. - """ +class ResultsDocument(ExtendedMappingSchema): + description = "Results representation as JSON document." _schema = f"{OGC_API_PROC_PART1_SCHEMAS}/results.yaml" output_id = ResultData( variable="{output-id}", title="ResultData", @@ -5938,6 +5936,19 @@ class Result(ExtendedMappingSchema): ) +class ResultsContent(ExtendedSchemaNode): + description = "Results representation as literal contents." + schema_type = String() + + +class ResultsBody(OneOfKeywordSchema): + description = "Results obtained from a successful process job execution." + _one_of = [ + ResultsDocument(), + ResultsContent(), + ] + + class JobInputsBody(ExecuteInputOutputs): links = LinkList(missing=drop) @@ -7240,7 +7251,7 @@ class RedirectResultResponse(ExtendedMappingSchema): class OkGetJobResultsResponse(ExtendedMappingSchema): _schema = f"{OGC_API_PROC_PART1_RESPONSES}/Results.yaml" header = ResponseHeaders() - body = Result() + body = ResultsBody() class NoContentJobResultsHeaders(NoContent):