Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions compose/app/cron.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,4 @@

set -xe

exec /usr/local/bin/celery \
--app=hawc.main.celery \
beat \
--loglevel=INFO
exec python manage.py crontask
2 changes: 1 addition & 1 deletion compose/app/sync.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ manage migrate --noinput
# drop/rebuild materialized views
manage recreate_views

# succcessful exit for healthchecks
# successful exit for healthcheck
exit 0
5 changes: 1 addition & 4 deletions compose/app/workers.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,4 @@

set -xe

exec /usr/local/bin/celery \
--app=hawc.main.celery \
worker \
--loglevel=INFO
exec python manage.py db_worker
4 changes: 2 additions & 2 deletions compose/dc-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ services:
- ./data/private:/app/private
- ./data/public:/app/public
healthcheck:
test: celery --app=hawc.main.celery inspect ping -d celery@$$HOSTNAME --json
test: pgrep -f "manage.py db_worker"
interval: 60s
timeout: 10s
retries: 3
Expand All @@ -106,7 +106,7 @@ services:
- ./data/private:/app/private
- ./data/public:/app/public
healthcheck:
test: celery --app=hawc.main.celery inspect ping --json
test: pgrep -f "manage.py run_scheduler"
interval: 60s
timeout: 10s
retries: 3
Expand Down
5 changes: 1 addition & 4 deletions hawc/apps/assessment/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -937,10 +937,7 @@ def add_time_spent_job(
):
cache_name = cls.get_cache_name(url or request.path, request.session.session_key)
content_type_id = ContentType.objects.get_for_model(obj).id
# wait 10 seconds to make sure database is populated
add_time_spent.s(cache_name, obj.id, assessment_id, content_type_id).apply_async(
countdown=10
)
add_time_spent.enqueue(cache_name, obj.id, assessment_id, content_type_id)

@classmethod
def add_time_spent(cls, cache_name, object_id, assessment_id, content_type_id):
Expand Down
11 changes: 6 additions & 5 deletions hawc/apps/assessment/tasks.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from celery import shared_task
from celery.utils.log import get_task_logger
import logging

from django.apps import apps
from django_tasks import task

logger = get_task_logger(__name__)
logger = logging.getLogger(__name__)


@shared_task
@task
def delete_orphan_relations(delete: bool = False):
# remove orphan relations in cases where the db cannot do so directly
Log = apps.get_model("assessment", "Log")
Expand All @@ -15,7 +16,7 @@ def delete_orphan_relations(delete: bool = False):
Log.objects.create(message=message)


@shared_task
@task
def add_time_spent(cache_name: str, object_id: int, assessment_id: int, content_type_id: int):
apps.get_model("assessment", "TimeSpentEditing").add_time_spent(
cache_name, object_id, assessment_id, content_type_id
Expand Down
2 changes: 1 addition & 1 deletion hawc/apps/bmd/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def save_and_execute(self):
self.instance.save()

# trigger BMD model execution
tasks.execute.delay(self.instance.id)
tasks.execute.enqueue(self.instance.id)

@transaction.atomic
def select(self):
Expand Down
9 changes: 5 additions & 4 deletions hawc/apps/bmd/tasks.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from celery import shared_task
from celery.utils.log import get_task_logger
import logging

from django.apps import apps
from django_tasks import task

logger = get_task_logger(__name__)
logger = logging.getLogger(__name__)


@shared_task
@task
def execute(session_id: int):
logger.info(f"BMD execution -> {session_id}")
session = apps.get_model("bmd", "Session").objects.get(id=session_id)
Expand Down
8 changes: 4 additions & 4 deletions hawc/apps/common/diagnostics.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ def diagnostic_500(modeladmin, request, queryset):
raise IntentionalException(message)


def diagnostic_celery_task(modeladmin, request, queryset):
response = tasks.diagnostic_celery_task.delay(request.user.id).get()
message = f"Celery task executed successfully: {response}"
def diagnostic_task_test(modeladmin, request, queryset):
result = tasks.diagnostic_task.enqueue(request.user.id)
message = f"Task enqueued successfully with ID: {result.id}"
modeladmin.message_user(request, message)


Expand Down Expand Up @@ -105,6 +105,6 @@ def series(self) -> pd.Series:


diagnostic_500.short_description = "Diagnostic server error (500)"
diagnostic_celery_task.short_description = "Diagnostic celery task test"
diagnostic_task_test.short_description = "Diagnostic task test"
diagnostic_cache.short_description = "Diagnostic cache test"
diagnostic_email.short_description = "Diagnostic email test"
14 changes: 14 additions & 0 deletions hawc/apps/common/management/commands/crontasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from django.core.management.base import BaseCommand

from hawc.main.crontasks import setup_scheduler


class Command(BaseCommand):
help = "Run the task scheduler for periodic tasks"

def handle(self, *args, **options):
scheduler = setup_scheduler()
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
11 changes: 0 additions & 11 deletions hawc/apps/common/management/commands/run_celery.py

This file was deleted.

18 changes: 9 additions & 9 deletions hawc/apps/common/tasks.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,39 @@
import logging
from datetime import timedelta

from celery import shared_task
from celery.utils.log import get_task_logger
from django.conf import settings
from django.contrib.auth import get_user_model
from django.core.management import call_command
from django.utils.timezone import now
from django_tasks import task
from rest_framework.authtoken.models import Token

logger = get_task_logger(__name__)
logger = logging.getLogger(__name__)


@shared_task
def diagnostic_celery_task(id_: str):
@task
def diagnostic_task(id_: str):
user = get_user_model().objects.get(id=id_)
logger.info(f"Diagnostic celery task triggered by: {user}")
logger.info(f"Diagnostic task triggered by: {user}")
return dict(success=True, when=str(now()), user=user.email)


@shared_task
@task
def worker_healthcheck():
from .diagnostics import worker_healthcheck

worker_healthcheck.push()


@shared_task
@task
def destroy_old_api_tokens():
deletion_date = now() - timedelta(seconds=settings.SESSION_COOKIE_AGE)
qs = Token.objects.filter(created__lt=deletion_date)
logger.info(f"Destroying {qs.count()} old tokens")
qs.delete()


@shared_task
@task
def create_initial_revisions():
"""
Most apis/views should create initial revisions; however if we're importing data from
Expand Down
17 changes: 5 additions & 12 deletions hawc/apps/lit/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
from typing import Self
from urllib import parse

from celery import chain
from celery.result import ResultBase
from django.apps import apps
from django.conf import settings
from django.contrib.postgres.fields import ArrayField
Expand Down Expand Up @@ -680,7 +678,7 @@ def get_content(self) -> dict:

@staticmethod
def update_pubmed_content(idents):
tasks.update_pubmed_content.delay([d.unique_id for d in idents])
tasks.update_pubmed_content.enqueue([d.unique_id for d in idents])

@classmethod
def existing_doi_map(cls, dois: list[str]) -> dict[str, int]:
Expand Down Expand Up @@ -1071,7 +1069,7 @@ def delete_cache(cls, assessment_id: int, delete_study_cache: bool = True):
)

@classmethod
def update_hero_metadata(cls, assessment_id: int) -> ResultBase:
def update_hero_metadata(cls, assessment_id: int):
"""Update reference metadata for all references in an assessment.

Async worker task; updates data from HERO and then applies new data to references.
Expand All @@ -1084,14 +1082,9 @@ def update_hero_metadata(cls, assessment_id: int) -> ResultBase:
hero_ids = identifiers.values_list("unique_id", flat=True)
hero_ids = list(hero_ids) # queryset to list for JSON serializability

# update content of hero identifiers
t1 = tasks.update_hero_content.si(hero_ids)

# update fields from content
t2 = tasks.update_hero_fields.si(reference_ids)

# run chained tasks
return chain(t1, t2)()
# call functions directly; not tasks on the task queue
tasks.update_hero_content.func(hero_ids)
tasks.update_hero_fields.func(reference_ids)

@property
def ref_full_citation(self):
Expand Down
22 changes: 6 additions & 16 deletions hawc/apps/lit/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
from io import StringIO

import pandas as pd
from celery import chain
from celery.result import ResultBase
from django.core.exceptions import ValidationError
from django.db import transaction
from django.db.models import Q
Expand Down Expand Up @@ -383,21 +381,13 @@ def validate_replace(self, replace: list) -> list:

return replace

def execute(self) -> ResultBase:
# import missing identifiers
def execute(self):
# Import missing identifiers
models.Identifiers.objects.bulk_create_hero_ids(self.fetched_content)

# set hero ref
t1 = tasks.replace_hero_ids.si(self.validated_data["replace"])

# update content
t2 = tasks.update_hero_content.si(self.hero_ids)

# update fields with content
t3 = tasks.update_hero_fields.si(self.ref_ids)

# run chained tasks
return chain(t1, t2, t3)()
# call these 3 tasks directly; do not enqueue in the task queue
tasks.replace_hero_ids.func(self.validated_data["replace"])
tasks.update_hero_content.func(self.hero_ids)
tasks.update_hero_fields.func(self.ref_ids)


class FilterReferences(PydanticDrfSerializer):
Expand Down
16 changes: 8 additions & 8 deletions hawc/apps/lit/tasks.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import json
import logging

from celery import shared_task
from celery.utils.log import get_task_logger
from django.apps import apps
from django.db import transaction
from django.db.models import Model
from django_tasks import task

from ...services.epa import hero
from ...services.nih import pubmed
from . import constants

logger = get_task_logger(__name__)
logger = logging.getLogger(__name__)


@shared_task
@task
def update_hero_content(ids: list[int]):
"""Fetch the latest data from HERO and update identifier object."""

Expand All @@ -33,7 +33,7 @@ def update_hero_content(ids: list[int]):
).update(content='{"status": "failed"}')


@shared_task
@task
def update_hero_fields(ref_ids: list[int]):
"""
Updates the reference fields with most recent content from HERO
Expand Down Expand Up @@ -73,7 +73,7 @@ def update_hero_fields(ref_ids: list[int]):
)


@shared_task
@task
def replace_hero_ids(replace: list[list[int]]):
"""
Replace the identifier on each reference with the given HERO ID
Expand Down Expand Up @@ -116,7 +116,7 @@ def replace_hero_ids(replace: list[list[int]]):
reference.identifiers.set(identifier_ids)


@shared_task
@task
def update_pubmed_content(ids: list[int]):
"""Fetch the latest data from Pubmed and update identifier object."""
Identifiers = apps.get_model("lit", "identifiers")
Expand All @@ -133,7 +133,7 @@ def update_pubmed_content(ids: list[int]):
).update(content='{"status": "failed"}')


@shared_task
@task
def fix_pubmed_without_content():
# Try getting pubmed data without content
Identifiers = apps.get_model("lit", "identifiers")
Expand Down
4 changes: 2 additions & 2 deletions hawc/apps/materialized/tasks.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from celery import shared_task
from django_tasks import task

from . import models


@shared_task
@task
def refresh_all_mvs(force: bool = False):
models.refresh_all_mvs(force)
4 changes: 2 additions & 2 deletions hawc/apps/myuser/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
clear_cache,
diagnostic_500,
diagnostic_cache,
diagnostic_celery_task,
diagnostic_email,
diagnostic_task_test,
)
from . import forms, models

Expand Down Expand Up @@ -83,7 +83,7 @@ def set_password(modeladmin, request, queryset):
set_password,
clear_cache,
diagnostic_500,
diagnostic_celery_task,
diagnostic_task_test,
diagnostic_cache,
diagnostic_email,
)
22 changes: 22 additions & 0 deletions hawc/apps/vocab/migrations/0009_alter_entity_terms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Generated by Django 5.2.10 on 2026-01-11 20:35

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("vocab", "0008_alter_term_namespace"),
]

operations = [
migrations.AlterField(
model_name="entity",
name="terms",
field=models.ManyToManyField(
related_name="entities",
through="vocab.EntityTermRelation",
through_fields=("entity", "term"),
to="vocab.term",
),
),
]
Loading
Loading