diff --git a/pyproject.toml b/pyproject.toml index 60adc52..732e2dc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -175,7 +175,7 @@ omit = [ # Ignore questionable code or code in the process of reimplementation # Fastly code not currently used - "submit_ce/fastapi", + "submit_ce/fastapi/**", # "submit_ce/implementations/compile/common.py", "submit_ce/implementations/compile/compile_at_gcp.py", # migrate existing tests or eliminate script diff --git a/submit_ce/api/CompileService.py b/submit_ce/api/CompileService.py index c1c123b..6f121d5 100644 --- a/submit_ce/api/CompileService.py +++ b/submit_ce/api/CompileService.py @@ -57,3 +57,16 @@ def check(self, process_id: str, user: User, client: Client) -> ProcessStatus: ------- ProcessStatus """ + ... + + @abstractmethod + def is_available(self) -> bool: + """ + Check if the service is configured and available. + + Returns + ------- + str + `True` if service is configured and available. + """ + ... diff --git a/submit_ce/api/submit.py b/submit_ce/api/submit.py index 9918587..677b731 100644 --- a/submit_ce/api/submit.py +++ b/submit_ce/api/submit.py @@ -229,6 +229,7 @@ def save(self, *events: Event, submission_id: Optional[int] = None) \ """ ... + @abstractmethod def upload(self, files: SubmitFile, submission_id: int, user: User, client: Client) -> Workspace: """Uploads a file to an existing submission. @@ -247,6 +248,7 @@ def upload(self, files: SubmitFile, submission_id: int, user: User, client: Clie # TODO Make this just an Event+save() ... + @abstractmethod def licenses(self, active_only=True) -> List[License]: """Gets a list of licenses that submitters can be set on the contents of submissions. @@ -258,6 +260,7 @@ def licenses(self, active_only=True) -> List[License]: """ ... + @abstractmethod def categories_for_user(self, user_id: str) -> list[str]: """Gets list of categories the user may submit to. @@ -268,6 +271,7 @@ def categories_for_user(self, user_id: str) -> list[str]: """ ... + @abstractmethod def next_announcement_time(self, reference: Optional[datetime] = None) -> datetime: """Gets the next announce time. If `reference` is not passed, it does the next announce time from now. @@ -280,7 +284,9 @@ def next_announcement_time(self, reference: Optional[datetime] = None) -> dateti Returns -------- Time of next announce always TZ aware""" + ... + @abstractmethod def next_freeze_time(self, reference: Optional[datetime] = None) -> datetime: """Gets the next freeze time. If `reference` is not passed, it does the next freeze time from now. @@ -293,3 +299,20 @@ def next_freeze_time(self, reference: Optional[datetime] = None) -> datetime: Returns -------- Time of next freeze always TZ aware""" + ... + + @abstractmethod + def healthy(self)-> Tuple[bool, str]: + """Return a message about this api that is safe to show the general + public, raise a :class:`RuntimeError` if api is not configured correctly + or services it depends on are not working. + + Returns + ------- + bool + `True` if this and all dependent services are healthy. + str + A message about the api that is safe to show to the general public. + + """ + ... diff --git a/submit_ce/domain/event/tests/test_event_util.py b/submit_ce/domain/event/tests/test_event_util.py deleted file mode 100644 index 674ebfe..0000000 --- a/submit_ce/domain/event/tests/test_event_util.py +++ /dev/null @@ -1,74 +0,0 @@ -"""Test util.py under domain/event""" -import pytest -import dataclasses -import submit_ce.domain.event.util as event_util - -# -# 1) dataclass() with NO kwargs: should wrap base dataclass and then install __hash__/__eq__ -# - -def test_event_dataclass_without_kwargs_sets_hash_and_eq(): - @event_util.dataclass() # no kwargs branch inside util.dataclass # [event.util] - class E: - event_id: str - x: int = 0 - - a = E(event_id="A", x=1) - b = E(event_id="A", x=99) # same event_id -> same hash, equal - c = E(event_id="C", x=1) # different event_id -> different hash, not equal - - # __hash__ should be derived from event_id - assert hash(a) == hash(b) - assert hash(a) != hash(c) - - # __eq__ uses event_util.event_eq, which compares hashes - assert a == b - assert a != c - -# -# 2) dataclass() WITH kwargs: should honor kwargs (e.g., frozen=True) and still install __hash__/__eq__ -# - -def test_event_dataclass_with_kwargs_preserves_kwargs_and_sets_hash_eq(): - @event_util.dataclass(frozen=True) # kwargs branch - class E: - event_id: str - y: int = 0 - - e1 = E(event_id="Z", y=1) - # frozen=True should make the instance immutable - with pytest.raises(dataclasses.FrozenInstanceError): - e1.y = 2 # type: ignore[attr-defined] - - # __hash__ / __eq__ still installed - e2 = E(event_id="Z", y=999) - e3 = E(event_id="OTHER") - assert hash(e1) == hash(e2) and e1 == e2 - assert hash(e1) != hash(e3) and e1 != e3 - -# -# 3) event_hash: explicitly uses instance.event_id -# - -def test_event_hash_uses_event_id(): - class Dummy: - def __init__(self, eid): self.event_id = eid - d1, d2 = Dummy("K"), Dummy("K") - assert event_util.event_hash(d1) == event_util.event_hash(d2) # same event_id - -# -# 4) event_eq compares hashes, not types or fields -# - -def test_event_eq_compares_hashes_not_types(): - @event_util.dataclass() - class E: - event_id: str - - class Other: - # Make it hash to the same value as E("SAME") - def __hash__(self): return hash("SAME") - - e = E(event_id="SAME") - o = Other() - assert event_util.event_eq(e, o) # equal because hashes match diff --git a/submit_ce/domain/event/util.py b/submit_ce/domain/event/util.py deleted file mode 100644 index 19e8a78..0000000 --- a/submit_ce/domain/event/util.py +++ /dev/null @@ -1,27 +0,0 @@ -"""Helpers for event classes.""" - -from typing import Any, Callable - -from dataclasses import dataclass as base_dataclass - - -def event_hash(instance: Any) -> int: - """Use event ID as object hash.""" - return hash(instance.event_id) # typing: ignore - - -def event_eq(instance: Any, other: Any) -> bool: - """Compare this event to another event.""" - return hash(instance) == hash(other) - - -def dataclass(**kwargs: Any) -> Callable[[Any], Any]: - def inner(cls: type) -> Any: - if kwargs: - new_cls = base_dataclass(**kwargs)(cls) - else: - new_cls = base_dataclass(cls) - setattr(new_cls, '__hash__', event_hash) - setattr(new_cls, '__eq__', event_eq) - return new_cls - return inner diff --git a/submit_ce/domain/types.py b/submit_ce/domain/types.py index cee3383..32d1776 100644 --- a/submit_ce/domain/types.py +++ b/submit_ce/domain/types.py @@ -12,7 +12,7 @@ class SubmitFile(Protocol): """Name of the file as provided by the client.""" content_type: str """The MIME type of the file as provided by the client.""" - stream: BytesIO + stream: Any # should be BytesIO but often SpooledTemporaryFile Not sure how to handle this """File contents as provided by the client.""" @classmethod diff --git a/submit_ce/domain/uploads.py b/submit_ce/domain/uploads.py index c87a16b..1fb8987 100644 --- a/submit_ce/domain/uploads.py +++ b/submit_ce/domain/uploads.py @@ -57,6 +57,9 @@ class FileStatus(BaseModel): ancillary: bool = False errors: List[FileError] = [] + @property + def size(self): + return self.bytes class UploadStatus(Enum): # type: ignore """The status of the upload workspace with respect to submission.""" diff --git a/submit_ce/implementations/compile/compile_at_gcp_service.py b/submit_ce/implementations/compile/compile_at_gcp_service.py index fc94982..c6f882f 100644 --- a/submit_ce/implementations/compile/compile_at_gcp_service.py +++ b/submit_ce/implementations/compile/compile_at_gcp_service.py @@ -1,5 +1,8 @@ from datetime import timezone, datetime from typing import Optional +from venv import logger +import httpx +from typing_extensions import override from zoneinfo import ZoneInfo from arxiv.base.config import ARXIV_BUSINESS_TZ @@ -37,6 +40,14 @@ def __init__(self, self.max_tex_files = max_tex_files self.timeout = timeout self.timezone = ARXIV_BUSINESS_TZ + + def __repr__(self) -> str: + return (f"{self.__class__.__name__}(" + f"tex2pdf_url={self.tex2pdf_url}," + f"base_submissions_dir={self.base_submissions_dir}" + ) + + @override def start_compile(self, submission: Submission, user: User, client: Client, api: SubmitApi, @@ -78,10 +89,16 @@ def start_compile(self, submission: Submission, duration_sec = json_data["total_time"], utc_start_time=utc_start_time, url=f"FAKE_URL_{__file__}" - ) - - + @override def check(self, process_id: str, user: User, client: Client) -> ProcessStatus: pass + + @override + def is_available(self) -> bool: + resp = httpx.get(self.tex2pdf_url) + if resp.status_code == 200: + return True + logger.error(f"Healthcheck failed: service at '{self.tex2pdf_url}' responded with status {resp.status_code}") + return False diff --git a/submit_ce/implementations/file_store/gs_file_store.py b/submit_ce/implementations/file_store/gs_file_store.py index 3db50a7..53cf6e0 100644 --- a/submit_ce/implementations/file_store/gs_file_store.py +++ b/submit_ce/implementations/file_store/gs_file_store.py @@ -23,12 +23,6 @@ logger = logging.getLogger(__file__) -class SecurityError(RuntimeError): - """Something suspicious happened.""" - - -class UserFile: - pass class GsFileStore(SubmissionFileStore, FileStoreMixin): @@ -61,14 +55,20 @@ def __init__(self, """Prefix in the {gs_bucket}/{shard}/{id} directory to store the source.""" self.source_prefix = source_prefix """Prefix for source files""" - + if self.gs_prefix.startswith("/"): self.gs_prefix = self.gs_prefix[1:] - + self.storage_client = storage.Client() self.bucket = self.storage_client.bucket(self.gs_bucket) self.obj_store = GsObjectStore(self.bucket) + def __repr__(self) -> str: + return (f"{self.__class__.__name__}(" + f"gs_bucket={self.gs_bucket}," + f"gs_prefix={self.gs_prefix}," + f"source_prefix={self.source_prefix}," + ) def _blob_to_file_status(self, submission_id, blob) -> FileStatus: src_dir = self._source_path(submission_id) @@ -123,7 +123,7 @@ def store_source_file(self, content: SubmitFile, chunk_size: int) -> FileStatus: """Stores a file for a submisison.""" - blob = self.bucket.blob(self._source_path(submission_id) / content.filename) + blob = self.bucket.blob(str(self._source_path(submission_id) / content.filename)) blob.upload_from_file(content.stream, content_type=content.content_type) return self._blob_to_file_status(submission_id, blob) @@ -134,16 +134,16 @@ def store_source_package(self, """Store a source package for a submission.""" files=[] src_dir = self._source_path(submission_id) - + with tarfile.open(fileobj=content.stream, mode="r:*") as tar: for member in tar.getmembers(): if not member.isfile(): - continue + continue with tar.extractfile(member) as file: blob = self.bucket.blob(str(src_dir / member.name)) blob.upload_from_file(file, size=member.size) files.append( {"file":member.name, "bytes": member.size}) - + return files def get_preview(self, submission_id: str) -> FileObj: @@ -173,7 +173,7 @@ def does_source_exist(self, submission_id: str) -> bool: return self.bucket.blob(str(self._source_package_path(submission_id))).exists() def get_preview_checksum(self, submission_id: str) -> str: - """Get the checksum of the preview PDF for a submission.""" + """Get the checksum of the preview PDF for a submission.""" return self._get_checksum(self._preview_path(submission_id)) def does_preview_exist(self, submission_id: str) -> bool: @@ -183,7 +183,7 @@ def does_preview_exist(self, submission_id: str) -> bool: def _get_checksum(self, path: str) -> str: item = self.bucket.blob(path) return item.crc32c - + def _submission_path(self, submission_id: str|str) -> Path: """Gets GS filesystem structure ex /{rootdir}/{first 4 digits of submission id}/{submission id}""" shard_dir = self.gs_prefix / Path(str(submission_id)[:4]) @@ -215,30 +215,14 @@ def delete_workspace(self, submission_id: str): for blob in blobs: blob.delete() - def is_available(self) -> bool: """Determine whether the filesystem is available.""" - return self.bucket.exists() - - - # def _check_safe(self, workspace: Workspace, full_path: str, # TODO implement - # is_ancillary: bool = False, is_removed: bool = False, - # is_persisted: bool = False, is_system: bool = False, - # strict: bool = True) -> None: - # if not strict or is_system: - # wks_full_path = self.get_path_bare(workspace.base_path, - # is_persisted=is_persisted) - # elif is_ancillary: - # wks_full_path = self.get_path_bare(workspace.ancillary_path, - # is_persisted=is_persisted) - # elif is_removed: - # wks_full_path = self.get_path_bare(workspace.removed_path, - # is_persisted=is_persisted) - # else: - # wks_full_path = self.get_path_bare(workspace.source_path, - # is_persisted=is_persisted) - # if wks_full_path not in full_path: - # raise ValueError(f'Not a valid path for workspace: {full_path}') + try: + return self.bucket.exists() + except Exception as ex: + logger.error(f"Could not check if bucket exists: {ex}") + return False + def delete_all_source_files(self, submission_id: str) -> None: pass diff --git a/submit_ce/implementations/file_store/legacy_file_store.py b/submit_ce/implementations/file_store/legacy_file_store.py index 78a9b21..bf9382e 100644 --- a/submit_ce/implementations/file_store/legacy_file_store.py +++ b/submit_ce/implementations/file_store/legacy_file_store.py @@ -77,6 +77,9 @@ def __init__(self, self.source_prefix = source_prefix """Prefix in the {root}/{shard}/{id} directory to store the source.""" + def __repr__(self) -> str: + return f"{self.__class__.__name__}(root_dir={self.root_dir})" + def get_source_file(self, submission_id: str, path: Path | str) -> FileObj: src_path = self._source_path(submission_id) / path if src_path.exists(): @@ -111,13 +114,16 @@ def get_workspace(self, submission_id: str, upload_id: str) -> Workspace: files: List[FileStatus] = [] for path in Path(self._source_path(submission_id)).rglob("*"): stat = path.stat() - files.append(FileStatus(str(path.relative_to(src_dir)), - path.name, - "unknown", - stat.st_size, - datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc), - anc_dir in path.parent.parents, - [])) + files.append(FileStatus(path=str(path.relative_to(src_dir)), + name=path.name, + content_type="unknown", + bytes=stat.st_size, + modified=datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc), + url="http://example.com/fakeurl", + crc32c="fakecrc32", + ancillary=anc_dir in path.parent.parents, + is_versioned=False + )) return Workspace( identifier=submission_id, diff --git a/submit_ce/implementations/legacy_implementation/__init__.py b/submit_ce/implementations/legacy_implementation/__init__.py index 0c6e7b3..0400b4f 100644 --- a/submit_ce/implementations/legacy_implementation/__init__.py +++ b/submit_ce/implementations/legacy_implementation/__init__.py @@ -1,12 +1,13 @@ import logging from datetime import datetime, UTC from typing import Optional, List, Tuple, Callable +from sqlalchemy.exc import OperationalError, ProgrammingError from typing_extensions import override from arxiv.auth.domain import User as AuthDomainUser from arxiv.auth.legacy.endorsements import get_endorsements from fastapi import HTTPException, status -from sqlalchemy import select +from sqlalchemy import select, text from sqlalchemy.orm import Session as SqlalchemySession, Session from submit_ce.api import SubmitApi @@ -65,11 +66,15 @@ def __init__(self, self.get_session = get_session self.serialize_file_operations = serialize_file_operations self.compiler = compiler + self.store = store + + def __repr__(self) -> str: + return (f"{self.__class__.__name__}(" + f"store={self.store.__repr__()}," + f"compiler={self.compiler.__repr__()}," + f"serialize_file_operations={self.serialize_file_operations}" + ")") - if store is None: - self.store = LegacyFileStore(root_dir="data/new") # for testing only - else: - self.store = store @override def get(self, submission_id: str) -> Submission: @@ -175,9 +180,6 @@ def categories_for_user(self, user_id: int) -> Optional[str]: @override def upload(self, file: SubmitFile, submission_id: int, user: User, client: Client) -> Workspace: """Saves file to legacy FS and sets the upload package on the submission.""" - if not isinstance(file, SubmitFile): - raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, - detail="SubmitFile Must have file, it must have a filename, content-type and steam") logger.debug(f"Uploaded archive MIME type: {file.content_type}.") if file.content_type not in acceptable_types: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, @@ -216,8 +218,8 @@ def upload(self, file: SubmitFile, submission_id: int, user: User, client: Clien submission, event_list = self._load(session, submission_id, lock_row=self.serialize_file_operations) - self.store.store_source_package(submission.submission_id, file, 4098) - workspace = self.store.get_workspace(submission.submission_id, "fakeuploadid") + self.store.store_source_package(str(submission.submission_id), file, 4098) + workspace = self.store.get_workspace(str(submission.submission_id), "fakeuploadid") command = SetUploadPackage(creator=user, client=client, submission_id=submission.submission_id, @@ -247,3 +249,34 @@ def next_announcement_time(self, reference: Optional[datetime] = None) -> dateti @override def next_freeze_time(self, reference: Optional[datetime] = None) -> datetime: return next_freeze_time(reference) + + @override + def healthy(self) -> tuple[bool, str]: + msgs = [] + healthy = True + try: + session = self.get_session() + session.execute(text("SELECT 1")).fetchone() + msgs.append("main api db healthy") + except (OperationalError, ProgrammingError) as e: + logger.error(f"DB connection failed due to {e}") + healthy=False + msgs.append("Main api DB opertional error") + except Exception as e: + logger.error(f"Unexpected error while testing db connection: {e}") + healthy=False + msgs.append("Main api DB failed unexpectedly") + + if not self.get_compiler().is_available(): + healthy = False + msgs.append("Compiler unhealthy") + else: + msgs.append("Compiler healthy") + + if not self.get_file_store().is_available(): + healthy = False + msgs.append("File store unhealthy") + else: + msgs.append("File store healthy") + + return healthy, ", ".join(msgs) diff --git a/submit_ce/implementations/pubsub/__init__.py b/submit_ce/implementations/pubsub/__init__.py index 44eb38b..189b0ce 100644 --- a/submit_ce/implementations/pubsub/__init__.py +++ b/submit_ce/implementations/pubsub/__init__.py @@ -1,5 +1,6 @@ """submit-ce API implementation that sends pubsub events.""" +from datetime import datetime from typing import Optional, Tuple, List import logging @@ -8,7 +9,11 @@ from submit_ce.api import SubmitApi, SubmissionFileStore from submit_ce.domain import Event, Submission from submit_ce.api.CompileService import CompileService +from submit_ce.domain.agent import Client, User from submit_ce.domain.event.base import EventList +from submit_ce.domain.meta import License +from submit_ce.domain.types import SubmitFile +from submit_ce.domain.uploads import Workspace logger = logging.getLogger(__name__) @@ -47,6 +52,24 @@ def get_file_store(self) -> SubmissionFileStore: def get_compiler(self) -> CompileService: return self.inner_api.get_compiler() + def categories_for_user(self, user_id: str) -> list[str]: + return self.inner_api.categories_for_user(user_id) + + def licenses(self, active_only=True) -> List[License]: + return self.inner_api.licenses(active_only) + + def next_announcement_time(self, reference: Optional[datetime] = None) -> datetime: + return self.inner_api.next_announcement_time(reference) + + def next_freeze_time(self, reference: Optional[datetime] = None) -> datetime: + return self.inner_api.next_freeze_time(reference) + + def healthy(self) -> tuple[bool,str]: + return self.inner_api.healthy() + + def upload(self, files: SubmitFile, submission_id: int, user: User, client: Client) -> Workspace: + return self.inner_api.upload(files, submission_id, user, client) + @staticmethod def serialize_msg(*events: Event) -> bytes: """Serialize events to `bytes` to send as pubsub message.""" diff --git a/submit_ce/ui/backend.py b/submit_ce/ui/backend.py index 68eaf83..06b4e51 100644 --- a/submit_ce/ui/backend.py +++ b/submit_ce/ui/backend.py @@ -96,3 +96,21 @@ def endorsed_for(user: User, category: str) -> bool: return category in endorsements \ or f"{archive}.*" in endorsements \ or "*.*" in endorsements + + +def backend_startup_health_check(): + """Raises error if backend is not healthy.""" + if not current_app.api: + raise RuntimeError("Flask current_app has no api") + errors=[] + if not current_app.api.get_file_store(): + errors.append("API lacks filestore") + elif not current_app.api.get_file_store().is_available(): + errors.append("Filestore service is misconfigured or not available") + if not current_app.api.get_compiler(): + errors.append("API lacks compiler") + elif not current_app.api.get_compiler().is_available(): + errors.append("Compiler serivce is misconfigured or not available") + + if errors: + raise RuntimeError(", ".join(errors)) diff --git a/submit_ce/ui/controllers/new/upload.py b/submit_ce/ui/controllers/new/upload.py index 4e6f6e2..ec246f8 100644 --- a/submit_ce/ui/controllers/new/upload.py +++ b/submit_ce/ui/controllers/new/upload.py @@ -221,7 +221,7 @@ def _get_upload(params: MultiDict, session: Session, submission: Submission, if type(status_data) is dict and status_data['identifier'] == upload_id: workspace = Workspace.from_dict(status_data) else: - workspace = current_app.api.get_file_store().get_workspace(submission_id=submission.submission_id, + workspace = current_app.api.get_file_store().get_workspace(submission_id=str(submission.submission_id), upload_id=submission.source_content.identifier) rdata.update({'status': workspace}) diff --git a/submit_ce/ui/factory.py b/submit_ce/ui/factory.py index 5a87d98..4e52d1b 100644 --- a/submit_ce/ui/factory.py +++ b/submit_ce/ui/factory.py @@ -32,7 +32,7 @@ def create_web_app(config: Optional[dict]=None) -> Flask: app.config.from_object(settings) app.config['TEMPLATES_AUTO_RELOAD']=True - # Put the SubmitApi on the flask app + app.api = backend.config_backend_api(settings) db.init(settings) diff --git a/test.sh b/test.sh index 7bd5c9c..7a2863a 100755 --- a/test.sh +++ b/test.sh @@ -2,6 +2,7 @@ # all coverage settings configured in pyproject.toml uv run pytest \ + --cov=submit_ce \ submit_ce/api \ submit_ce/implementations \ submit_ce/ui