Skip to content

Commit

Permalink
job.started calculation of job.duration + adjust serialize/deserializ…
Browse files Browse the repository at this point in the history
…ee into appropriate calls
  • Loading branch information
fmigneault committed May 11, 2020
1 parent 5975121 commit 0b67d70
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 27 deletions.
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ Changes:
- Add `fmigneault/cornice.ext.swagger@openapi-3 <https://github.com/fmigneault/cornice.ext.swagger/tree/openapi-3>`_
as ``cornice_swagger`` requirement to allow OpenAPI-3 definitions support of schema generation and deserialization
validation of JSON payloads.
- Provide ``Job.started`` datetime and calculate ``Job.duration`` from it to indicate the duration of the process
execution instead of counting from the time the job was summitted (i.e.: ``Job.created``).

Fixes:
------
Expand Down
58 changes: 46 additions & 12 deletions weaver/datatype.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,21 @@ def created(self):
self["created"] = now()
return localize_datetime(self.get("created"))

@property
def started(self):
# type: () -> Optional[datetime]
started = self.get("started", None)
if not started:
return None
return localize_datetime(started)

@started.setter
def started(self, started):
# type: (datetime) -> None
if not isinstance(started, datetime):
raise TypeError("Type 'datetime' is required for '{}.started'".format(type(self)))
self["started"] = started

@property
def finished(self):
# type: () -> Optional[datetime]
Expand All @@ -408,14 +423,19 @@ def mark_finished(self):

@property
def duration(self):
# type: () -> timedelta
# type: () -> Optional[timedelta]
if not self.started:
return None
final_time = self.finished or now()
return localize_datetime(final_time) - localize_datetime(self.created)
return localize_datetime(final_time) - localize_datetime(self.started)

@property
def duration_str(self):
# type: () -> AnyStr
return str(self.duration).split(".")[0]
duration = self.duration
if duration is None:
return "00:00:00"
return str(duration).split(".")[0].zfill(8) # "HH:MM:SS"

@property
def progress(self):
Expand Down Expand Up @@ -587,10 +607,14 @@ def json(self, container=None, self_link=None): # pylint: disable=W0221,argu
"status": self.status,
"message": self.status_message,
"duration": self.duration_str,
"percentCompleted": self.progress
# TODO: available fields not yet employed (https://github.com/crim-ca/weaver/issues/129)
"nextPoll": None,
"expirationDate": None,
"estimatedCompletion": None,
"percentCompleted": self.progress,
}
job_json.update(self.links(settings, self_link=self_link))
return sd.JobStatusInfo().deserialize(job_json)
return sd.JobStatusInfo().serialize(job_json)

def params(self):
# type: () -> Dict[AnyStr, Any]
Expand All @@ -607,6 +631,7 @@ def params(self):
"execute_async": self.execute_async,
"is_workflow": self.is_workflow,
"created": self.created,
"started": self.started,
"finished": self.finished,
"progress": self.progress,
"results": self.results,
Expand Down Expand Up @@ -849,22 +874,31 @@ def params_wps(self):

def json(self):
# type: () -> JSON
return sd.Process().deserialize(self)
"""
Obtains the JSON serializable complete representation of the process.
"""
return sd.Process().serialize(self)

def process_offering(self):
def offering(self):
# type: () -> JSON
"""
Obtains the JSON serializable offering representation of the process.
"""
process_offering = {"process": self}
if self.version:
process_offering.update({"processVersion": self.version})
if self.jobControlOptions:
process_offering.update({"jobControlOptions": self.jobControlOptions})
if self.outputTransmission:
process_offering.update({"outputTransmission": self.outputTransmission})
return sd.ProcessOffering().deserialize(process_offering)
return sd.ProcessOffering().serialize(process_offering)

def process_summary(self):
def summary(self):
# type: () -> JSON
return sd.ProcessSummary().deserialize(self)
"""
Obtains the JSON serializable summary representation of the process.
"""
return sd.ProcessSummary().serialize(self)

@staticmethod
def from_wps(wps_process, **extra_params):
Expand Down Expand Up @@ -1038,7 +1072,7 @@ def params(self):

def json(self):
# type: () -> JSON
return sd.QuoteSchema().deserialize(self)
return sd.QuoteSchema().serialize(self)


class Bill(Base):
Expand Down Expand Up @@ -1139,4 +1173,4 @@ def params(self):

def json(self):
# type: () -> JSON
return sd.BillSchema().deserialize(self)
return sd.BillSchema().serialize(self)
5 changes: 4 additions & 1 deletion weaver/processes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ def convert_process_wps_to_db(service, process, container):
is_request=False)
def _check_deploy(payload):
"""Validate minimum deploy payload field requirements with exception handling."""
# FIXME: handle colander invalid directly in tween (https://github.com/crim-ca/weaver/issues/112)
try:
sd.Deploy().deserialize(payload)
except colander.Invalid as ex:
Expand Down Expand Up @@ -327,11 +328,13 @@ def deploy_process_from_payload(payload, container):
elif isinstance(ows_context, dict):
process_info["owsContext"] = ows_context

# FIXME: handle colander invalid directly in tween (https://github.com/crim-ca/weaver/issues/112)
try:
store = get_db(container).get_store(StoreProcesses)
process = ProcessDB(process_info)
process_summary = process.process_summary() # make if fail before save if invalid
sd.ProcessSummary().deserialize(process) # make if fail before save if invalid
store.save_process(process, overwrite=False)
process_summary = process.summary()
except ProcessRegistrationError as ex:
raise HTTPConflict(detail=str(ex))
except (ValueError, colander.Invalid) as ex:
Expand Down
2 changes: 1 addition & 1 deletion weaver/store/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ def save_job(self,
accept_language=None, # type: Optional[AnyStr]
): # type: (...) -> Job
"""
Stores a job in mongodb.
Creates a new :class:`Job` and stores it in mongodb.
"""
try:
tags = ["dev"]
Expand Down
2 changes: 1 addition & 1 deletion weaver/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ def expires_at(hours=1):
def localize_datetime(dt, tz_name="UTC"):
# type: (datetime, Optional[AnyStr]) -> datetime
"""
Provide a timezone-aware object for a given datetime and timezone name
Provide a timezone-aware object for a given datetime and timezone name.
"""
tz_aware_dt = dt
if dt.tzinfo is None:
Expand Down
6 changes: 5 additions & 1 deletion weaver/wps_restapi/colander_extras.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,14 @@ def schema_type():
# pylint: disable=W0222,signature-differs
def deserialize(self, cstruct):
if self.default is colander.null and self.missing is colander.drop:
if cstruct is colander.drop:
return colander.drop
containers = (colander.SequenceSchema.schema_type,
colander.MappingSchema.schema_type,
colander.TupleSchema.schema_type)
if self.schema_type in containers and not cstruct:
return colander.drop
elif cstruct is None:
elif cstruct in (None, colander.null):
return colander.drop
return super(DropableSchemaNode, self).deserialize(cstruct)

Expand Down Expand Up @@ -311,6 +313,8 @@ def _get_sub_variable(self, subnodes):

# pylint: disable=W0222,signature-differs
def deserialize(self, cstruct):
if cstruct in (colander.drop, colander.null):
return cstruct
var_map = getattr(self, self._variable_map, {})
if not isinstance(var_map, dict) or not len(var_map):
return super(VariableSchemaNode, self).deserialize(cstruct)
Expand Down
2 changes: 1 addition & 1 deletion weaver/wps_restapi/jobs/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ def _job_list(jobs):
body.update({"groups": items})
else:
body.update({"jobs": _job_list(items), "page": page, "limit": limit})
body = sd.GetQueriedJobsSchema().deserialize(body)
body = sd.GetQueriedJobsSchema().serialize(body)
return HTTPOk(json=body)


Expand Down
25 changes: 21 additions & 4 deletions weaver/wps_restapi/processes/processes.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
get_any_value,
get_cookie_headers,
get_settings,
now,
raise_on_xml_exception,
transform_json,
wait_secs
Expand Down Expand Up @@ -96,6 +97,7 @@ def execute_process(self, job_id, url, headers=None, notification_email=None):
task_logger.debug("Job task setup.")
store = get_db(app).get_store(StoreJobs)
job = store.fetch_by_id(job_id)
job.started = now()
job.task_id = self.request.id
job.progress = JOB_PROGRESS_SETUP
job.save_log(logger=task_logger, message="Job task setup completed.")
Expand Down Expand Up @@ -354,6 +356,7 @@ def submit_job_handler(request, service_url, is_workflow=False, visibility=None)
transform_json(i, rename={"data": "value"}) # backward compatibility
except Exception as ex:
raise HTTPBadRequest("Invalid JSON body cannot be decoded for job submission. [{}]".format(ex))
# FIXME: handle colander invalid directly in tween (https://github.com/crim-ca/weaver/issues/112)
try:
json_body = sd.Execute().deserialize(json_body)
except colander.Invalid as ex:
Expand Down Expand Up @@ -467,8 +470,10 @@ def get_provider_process(request):
"""
try:
process = describe_provider_process(request)
process_offering = process.process_offering()
sd.ProcessOffering().deserialize(process)
process_offering = process.offering()
return HTTPOk(json=process_offering)
# FIXME: handle colander invalid directly in tween (https://github.com/crim-ca/weaver/issues/112)
except colander.Invalid as ex:
raise HTTPBadRequest("Invalid schema: [{!s}]".format(ex))

Expand All @@ -485,7 +490,7 @@ def get_processes_filtered_by_valid_schemas(request):
invalid_processes_ids = list()
for process in processes:
try:
valid_processes.append(process.process_summary())
valid_processes.append(process.summary())
except colander.Invalid:
invalid_processes_ids.append(process.identifier)
return valid_processes, invalid_processes_ids
Expand All @@ -512,6 +517,7 @@ def get_processes(request):
# if 'EMS' and '?providers=True', also fetch each provider's processes
if get_weaver_configuration(get_settings(request)) == WEAVER_CONFIGURATION_EMS:
queries = parse_request_query(request)
# FIXME: many steps below suppose that everything goes well...
if "providers" in queries and asbool(queries["providers"][0]) is True:
providers_response = requests.request("GET", "{host}/providers".format(host=request.host_url),
headers=request.headers, cookies=request.cookies)
Expand All @@ -522,10 +528,12 @@ def get_processes(request):
processes = requests.request("GET", "{host}/providers/{provider_id}/processes"
.format(host=request.host_url, provider_id=provider_id),
headers=request.headers, cookies=request.cookies)
processes = processes.json().get("processes", [])
response_body["providers"][i].update({
"processes": processes if detail else [get_any_id(p) for p in processes]
"processes": processes if detail else [get_any_id(proc) for proc in processes.json()]
})
return HTTPOk(json=response_body)
# FIXME: handle colander invalid directly in tween (https://github.com/crim-ca/weaver/issues/112)
except colander.Invalid as ex:
raise HTTPBadRequest("Invalid schema: [{!s}]".format(ex))

Expand All @@ -542,6 +550,12 @@ def add_local_process(request):

def get_process(request):
# type: (Request) -> ProcessDB
"""
Obtains the *local* process from the database using the request details.
Handles basic operations such as *NotFound* so that when the function completes successfully, an existing process
is guaranteed.
"""
process_id = request.matchdict.get("process_id")
if not isinstance(process_id, six.string_types):
raise HTTPUnprocessableEntity("Invalid parameter 'process_id'.")
Expand All @@ -555,6 +569,7 @@ def get_process(request):
raise HTTPUnauthorized("Process with id '{!s}' is not accessible.".format(process_id))
except ProcessNotFound:
raise HTTPNotFound("Process with id '{!s}' does not exist.".format(process_id))
# FIXME: handle colander invalid directly in tween (https://github.com/crim-ca/weaver/issues/112)
except colander.Invalid as ex:
raise HTTPBadRequest("Invalid schema:\n[{0!r}].".format(ex))

Expand All @@ -569,8 +584,10 @@ def get_local_process(request):
try:
process = get_process(request)
process["inputs"] = opensearch.replace_inputs_describe_process(process.inputs, process.payload)
process_offering = process.process_offering()
sd.ProcessOffering().deserialize(process) # validate
process_offering = process.offering()
return HTTPOk(json=process_offering)
# FIXME: handle colander invalid directly in tween (https://github.com/crim-ca/weaver/issues/112)
except colander.Invalid as ex:
raise HTTPBadRequest("Invalid schema: [{!s}]".format(ex))

Expand Down
12 changes: 6 additions & 6 deletions weaver/wps_restapi/swagger_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""
# pylint: disable=C0103,invalid-name

from colander import Boolean, DateTime, Float, Integer
from colander import Boolean, DateTime, Float, Integer, Time
from colander import OneOf, Range
from colander import String, drop
from cornice import Service
Expand Down Expand Up @@ -836,25 +836,25 @@ class ProcessOutputDescriptionSchema(ExtendedMappingSchema):


class JobStatusInfo(ExtendedMappingSchema):
jobId = UUID(example="a9d14bf4-84e0-449a-bac8-16e598efe807", description="ID of the job.")
jobID = UUID(example="a9d14bf4-84e0-449a-bac8-16e598efe807", description="ID of the job.")
status = JobStatusEnum()
message = ExtendedSchemaNode(String(), missing=drop)
expirationDate = ExtendedSchemaNode(DateTime(), missing=drop)
estimatedCompletion = ExtendedSchemaNode(DateTime(), missing=drop)
duration = ExtendedSchemaNode(String(), missing=drop, description="Duration of the process execution.")
duration = ExtendedSchemaNode(Time(), missing=drop, description="Duration of the process execution.")
nextPoll = ExtendedSchemaNode(DateTime(), missing=drop)
percentCompleted = ExtendedSchemaNode(Integer(), example=0, validator=Range(min=0, max=100))
links = LinkList(missing=drop)


class JobEntrySchema(OneOfKeywordSchema):
# note:
# Since JobID is a simple string (not a dict), no additional mapping field can be added here.
# They will be discarded by `OneOfKeywordSchema.deserialize()`.
_one_of = (
JobStatusInfo,
ExtendedSchemaNode(String(), description="Job ID."),
)
# note:
# Since JobId is a simple string (not a dict), no additional mapping field can be added here.
# They will be discarded by `OneOfKeywordSchema.deserialize()`.


class JobCollection(ExtendedSequenceSchema):
Expand Down

0 comments on commit 0b67d70

Please sign in to comment.