Skip to content

Store jobs on creation #475

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
Apr 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6c4d3d8
Remove leftover pylint comments
keiranjprice101 Mar 18, 2025
e5f71af
Rename job maker methods
keiranjprice101 Mar 18, 2025
5242eeb
Add job request exception
keiranjprice101 Mar 24, 2025
a89aa0b
Add job request exception handler
keiranjprice101 Mar 24, 2025
fe062ca
Update job maker
keiranjprice101 Mar 24, 2025
ffc4d0e
Formatting and linting commit
invalid-email-address Mar 24, 2025
1201afb
Add integration test
keiranjprice101 Mar 24, 2025
bb932d3
Update simple job test
keiranjprice101 Mar 24, 2025
c1b6cfe
Add via id
keiranjprice101 Mar 24, 2025
9fb6935
Use instrument value
keiranjprice101 Mar 24, 2025
ced91af
Use path stem
keiranjprice101 Mar 25, 2025
8c9ba7e
Remove left over logging line
keiranjprice101 Mar 26, 2025
1f195a4
use instrument id
keiranjprice101 Mar 31, 2025
d42282c
Configure the root path
keiranjprice101 Mar 31, 2025
15cff3a
temporarily push direct to staging
keiranjprice101 Mar 31, 2025
a84d328
Provide job_type on message
keiranjprice101 Mar 31, 2025
b2eb4ed
get or create script
keiranjprice101 Mar 31, 2025
0559cbd
return created job ids
keiranjprice101 Mar 31, 2025
15ce34a
refactor job maker
keiranjprice101 Mar 31, 2025
bcce9eb
Update job maker tests
keiranjprice101 Apr 3, 2025
3240780
mock repo method
keiranjprice101 Apr 3, 2025
e2667f6
Add type ignore
keiranjprice101 Apr 14, 2025
9222e9f
Add job types
keiranjprice101 Apr 14, 2025
4eaa6e6
Add missing fields on message
keiranjprice101 Apr 14, 2025
914c4b9
Revert build push change
keiranjprice101 Apr 15, 2025
4529d7f
Merge branch 'main' into store_jobs_on_creation
keiranjprice101 Apr 15, 2025
d2ed5f9
Revert attribute name change
keiranjprice101 Apr 15, 2025
e1b4fa2
Temporarily build on push
keiranjprice101 Apr 15, 2025
f2bffda
Revert: Temporarily build on push
keiranjprice101 Apr 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 37 additions & 37 deletions .github/workflows/build-and-push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ name: Build and Push Docker Images

on:
push:
branches:
- main
branches:
- main

env:
REGISTRY: ghcr.io
Expand All @@ -17,38 +17,38 @@ jobs:
packages: write

steps:
- name: Check out code
uses: actions/checkout@v4

- name: Log in to the Container registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.WRITE_PACKAGE_TOKEN }}

- name: Build and push fia-api Docker image
id: docker_build
uses: docker/build-push-action@v6
with:
file: ./container/fia_api.D
push: true
tags: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}/fia-api

- name: Checkout the Gitops repository
uses: actions/checkout@v4
with:
repository: fiaisis/gitops
token: ${{ secrets.GITOPS_STAGING_EDIT_TOKEN }}

- name: Edit the YAML fia-api file for staging
uses: mikefarah/[email protected]
with:
cmd: yq e -i '.spec.template.spec.containers[] |= select(.name == "fia-api").image = "ghcr.io/fiaisis/fia-api@${{ steps.docker_build.outputs.digest }}"' './components/fia-api/envs/staging/fia-api.yml'

- name: Commit and push changes
run: |
git config --local user.email "[email protected]"
git config --local user.name "GitHub Action"
git commit -am "[CD] Update fia-api with this commit ${{ github.event.head_commit.url}}"
git push
- name: Check out code
uses: actions/checkout@v4

- name: Log in to the Container registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.WRITE_PACKAGE_TOKEN }}

- name: Build and push fia-api Docker image
id: docker_build
uses: docker/build-push-action@v6
with:
file: ./container/fia_api.D
push: true
tags: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}/fia-api

- name: Checkout the Gitops repository
uses: actions/checkout@v4
with:
repository: fiaisis/gitops
token: ${{ secrets.GITOPS_STAGING_EDIT_TOKEN }}

- name: Edit the YAML fia-api file for staging
uses: mikefarah/[email protected]
with:
cmd: yq e -i '.spec.template.spec.containers[] |= select(.name == "fia-api").image = "ghcr.io/fiaisis/fia-api@${{ steps.docker_build.outputs.digest }}"' './components/fia-api/envs/staging/fia-api.yml'

- name: Commit and push changes
run: |
git config --local user.email "[email protected]"
git config --local user.name "GitHub Action"
git commit -am "[CD] Update fia-api with this commit ${{ github.event.head_commit.url}}"
git push
2 changes: 1 addition & 1 deletion fia_api/core/auth/tokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def _is_jwt_access_token_valid(access_token: str) -> bool:
logger.info("JWT was valid")
return True
return False
except RuntimeError: # pylint: disable=broad-exception-caught)
except RuntimeError:
logger.exception("Error decoding JWT access token")
return False

Expand Down
4 changes: 4 additions & 0 deletions fia_api/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,7 @@ class MissingScriptError(Exception):

class UnsafePathError(Exception):
"""A path was given that is potentially unsafe and could lead to directory traversal"""


class JobRequestError(ValueError):
"""The job request was malformed"""
157 changes: 136 additions & 21 deletions fia_api/core/job_maker.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,59 @@
from __future__ import annotations

import functools
import hashlib
import json
import logging
from collections.abc import Callable
from pathlib import Path
from typing import Any

from db.data_models import Job, JobOwner, JobType, Script, State
from pika.adapters.blocking_connection import BlockingConnection # type: ignore[import-untyped]
from pika.connection import ConnectionParameters # type: ignore[import-untyped]
from pika.credentials import PlainCredentials # type: ignore[import-untyped]

from fia_api.core.exceptions import JobRequestError
from fia_api.core.repositories import Repo
from fia_api.core.specifications.job import JobSpecification
from fia_api.core.specifications.job_owner import JobOwnerSpecification
from fia_api.core.specifications.script import ScriptSpecification

logger = logging.getLogger(__name__)


def hash_script(script: str) -> str:
"""
Given a script, return the sha512 hash of the script
:param script: the script to hash
:return: The script hash
"""
return hashlib.sha512(script.encode()).hexdigest()


def require_owner(func: Callable[..., Any]) -> Callable[..., Any]:
"""
Decorator to ensure that either a user_number or experiment_number is provided to the function. if not, raise a
JobRequestError
:param func: The function to wrap
:return: The wrapped function
"""

@functools.wraps(func)
def wrapper(self: JobMaker, *args: tuple[Any], **kwargs: dict[str, Any]) -> Any:
if kwargs.get("user_number") is None and kwargs.get("experiment_number") is None:
raise JobRequestError("Something needs to own the job, either experiment_number or user_number.")
return func(self, *args, **kwargs)

return wrapper


class JobMaker:
def __init__(self, queue_host: str, username: str, password: str, queue_name: str):
credentials = PlainCredentials(username=username, password=password)
self._job_repo: Repo[Job] = Repo()
self._owner_repo: Repo[JobOwner] = Repo()
self._script_repo: Repo[Script] = Repo()
self.connection_parameters = ConnectionParameters(queue_host, 5672, credentials=credentials)
self.queue_name = queue_name
self.connection = None
Expand Down Expand Up @@ -39,14 +84,15 @@ def _send_message(self, message: str) -> None:
# Assuming channel is set in _connect_to_broker()
self.channel.basic_publish(exchange=self.queue_name, routing_key="", body=message) # type: ignore

def rerun_job(
@require_owner
def create_rerun_job(
self,
job_id: int,
runner_image: str,
script: str,
experiment_number: int | None = None,
user_number: int | None = None,
) -> None:
) -> int:
"""
Submit a rerun job to the scheduled job queue in the message broker. Default to using experiment_number over
user_number.
Expand All @@ -55,34 +101,103 @@ def rerun_job(
:param script: The script to be used in the runner
:param experiment_number: the experiment number of the owner
:param user_number: the user number of the owner
:return: None
:return: created job id
"""
json_dict: dict[str, Any] = {"job_id": job_id, "runner_image": runner_image, "script": script}
if experiment_number is not None:
json_dict["experiment_number"] = experiment_number
elif user_number is not None:
json_dict["user_number"] = user_number
else:
raise ValueError("Something needs to own the job, either experiment_number or user_number.")
original_job = self._job_repo.find_one(JobSpecification().by_id(job_id))
if original_job is None:
raise JobRequestError("Cannot rerun job that does not exist.")

job_owner = self._get_or_create_job_owner(experiment_number, user_number)

script_object = self._get_or_create_script(script)

rerun_job = Job(
owner_id=job_owner.id,
job_type=JobType.RERUN,
runner_image=runner_image,
script=script_object,
state=State.NOT_STARTED,
instrument_id=original_job.instrument_id,
inputs={},
run=original_job.run,
)

rerun_job = self._job_repo.add_one(rerun_job)

instrument = None
rb_number = 0
filename = None
if rerun_job.run:
filename = rerun_job.run.filename
instrument = rerun_job.run.instrument.instrument_name
if rerun_job.run.owner and rerun_job.run.owner.experiment_number:
rb_number = rerun_job.run.owner.experiment_number

if instrument is None or filename is None:
raise JobRequestError("Cannot create rerun job with missing run information.")

json_dict: dict[str, Any] = {
"filename": Path(filename).stem,
"job_type": "rerun",
"instrument": instrument,
"rb_number": rb_number,
"job_id": rerun_job.id,
"runner_image": runner_image,
"script": script,
}
self._send_message(json.dumps(json_dict))
return rerun_job.id

def simple_job(
def _get_or_create_job_owner(self, experiment_number: int | None, user_number: int | None) -> JobOwner:
job_owner = self._owner_repo.find_one(
JobOwnerSpecification().by_values(experiment_number=experiment_number, user_number=user_number)
)
if job_owner is None:
job_owner = JobOwner(experiment_number=experiment_number, user_number=user_number)
return job_owner

@require_owner
def create_simple_job(
self, runner_image: str, script: str, experiment_number: int | None = None, user_number: int | None = None
) -> None:
) -> int:
"""
Submit a job to the scheduled job queue in the message broker. Default to using experiment_number over
user_number.
:param runner_image: The image used as a runner on the cluster
:param script: The script to be used in the runner
:param experiment_number: the experiment number of the owner
:param user_number: the user number of the owner
:return: None
:return: created job id
"""
json_dict: dict[str, Any] = {"runner_image": runner_image, "script": script}
if experiment_number is not None:
json_dict["experiment_number"] = experiment_number
elif user_number is not None:
json_dict["user_number"] = user_number
else:
raise ValueError("Something needs to own the job, either experiment_number or user_number.")
self._send_message(json.dumps(json_dict))

job_owner = self._get_or_create_job_owner(experiment_number, user_number)

script_object = self._get_or_create_script(script)

job = Job(
owner=job_owner,
job_type=JobType.SIMPLE,
runner_image=runner_image,
script_id=script_object.id,
state=State.NOT_STARTED,
inputs={},
)
job = self._job_repo.add_one(job)

message_dict: dict[str, Any] = {
"runner_image": runner_image,
"script": script,
"job_type": "simple",
"experiment_number": experiment_number,
"user_number": user_number,
"job_id": job.id,
}
self._send_message(json.dumps(message_dict))
return job.id

def _get_or_create_script(self, script: str) -> Script:
script_hash = hash_script(script)
script_object = self._script_repo.find_one(ScriptSpecification().by_script_hash(script_hash))
if script_object is None:
script_object = Script(script=script, script_hash=hash_script(script))
return script_object
14 changes: 11 additions & 3 deletions fia_api/core/repositories.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ def find(self, spec: Specification[T]) -> Sequence[T]:
"""
with self._session() as session:
query = spec.value
logger.info(spec.value)
return session.execute(query).unique().scalars().all()

def find_one(self, spec: Specification[T]) -> T | None:
Expand Down Expand Up @@ -88,10 +87,8 @@ def count(self, spec: Specification[T]) -> int:
:return: The count of entities of type T that match the specification.
"""
with self._session() as session:
# pylint: disable = not-callable
# mypy does not like these, but they are valid.
result = session.execute(select(func.count()).select_from(spec.value)) # type: ignore
# pylint: enable = not-callable
return result.scalar() if result else 0 # type: ignore

def update_one(self, entity: T) -> T:
Expand All @@ -100,6 +97,17 @@ def update_one(self, entity: T) -> T:
:param entity: The entity to be updated
:return: The updated Entity
"""
return self._store_entity(entity)

def add_one(self, entity: T) -> T:
"""
Given an entity, persist the entity into the database.
:param entity: The entity to be added
:return: The stored entity
"""
return self._store_entity(entity)

def _store_entity(self, entity: T) -> T:
with self._session() as session:
session.add(entity)
session.commit()
Expand Down
2 changes: 1 addition & 1 deletion fia_api/core/services/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ def update_job_by_id(id_: int, job: PartialJobUpdateRequest) -> Job:
raise MissingRecordError(f"No job found with id {id_}")
# We only update the fields that should change, not those that should never e.g. script, inputs.
# The start is included because it is recorded based from the pod start, end time post job run
for attr in ["state", "end", "start", "status_message", "output_files", "stacktrace"]:
for attr in ["state", "end", "start", "status_message", "outputs", "stacktrace"]:
value = getattr(job, attr)
if value is not None:
setattr(original_job, attr, value)
Expand Down
24 changes: 24 additions & 0 deletions fia_api/core/specifications/job_owner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Job Owner Specification"""

from __future__ import annotations

from db.data_models import JobOwner
from sqlalchemy import select

from fia_api.core.specifications.base import Specification


class JobOwnerSpecification(Specification[JobOwner]):
"""Job Owner Specification class"""

@property
def model(self) -> type[JobOwner]:
return JobOwner

def by_values(self, experiment_number: int | None, user_number: int | None) -> JobOwnerSpecification:
self.value = (
select(JobOwner)
.where(JobOwner.experiment_number == experiment_number)
.where(JobOwner.user_number == user_number)
)
return self
24 changes: 24 additions & 0 deletions fia_api/core/specifications/script.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Script specification"""

from __future__ import annotations

from db.data_models import Script

from fia_api.core.specifications.base import Specification


class ScriptSpecification(Specification[Script]):
"""Script specification class"""

@property
def model(self) -> type[Script]:
return Script

def by_script_hash(self, script_hash: str) -> ScriptSpecification:
"""
Filter scripts by the given hash
:param script_hash: The hash to filter by
:return: The query specification
"""
self.value = self.value.where(Script.script_hash == script_hash)
return self
Loading