From 017fc6e4dfb7791330acd42bfefa897373c71075 Mon Sep 17 00:00:00 2001 From: Tom Lee Date: Sat, 27 Jan 2024 12:35:34 -0800 Subject: [PATCH] feat: improve celery stability --- app/views/views.py | 26 ++++++++++++++++++-------- app/worker/app_celery.py | 16 ++++++++-------- app/worker/tasks/__init__.py | 7 ++++--- app/worker/tasks/exporter.py | 5 +++-- app/worker/tasks/importers/__init__.py | 6 ++++-- reboot/celeryconfig.py | 4 +++- 6 files changed, 40 insertions(+), 24 deletions(-) diff --git a/app/views/views.py b/app/views/views.py index 60b11027..7d1b284a 100644 --- a/app/views/views.py +++ b/app/views/views.py @@ -2,8 +2,8 @@ import logging from celery.exceptions import TimeoutError -from celery.result import AsyncResult -from celery.states import FAILURE, PENDING, SUCCESS +# from celery.result import AsyncResult +from celery.states import FAILURE, PENDING, STARTED, SUCCESS from django.contrib.auth.decorators import login_required from django.core import serializers from django.http import ( @@ -22,13 +22,14 @@ from app.constants.str import PERMISSION_DENIED from app.models import Item -from app.worker.app_celery import ATTEMPT_LIMIT, PROGRESS +from app.worker.app_celery import ATTEMPT_LIMIT from app.worker.tasks import receiptor from app.worker.tasks.exporter import exporter from app.worker.tasks.importers import historical_data_importer +from reboot.celery import app logger = logging.getLogger(__name__) - +tasks_cache = {} @require_GET @login_required(login_url="/login") @@ -121,14 +122,16 @@ def poll_state(request: HttpRequest): request=request, err_msg="The task_id query parameter of the request was omitted.") - task = AsyncResult(task_id) + task = app.AsyncResult(task_id) res = JsonResponse(_poll_state(PENDING, 0, 200)) + print(f"!!! task id={task_id},state={task.state},successful={task.successful()},ready={task.ready()},failed={task.failed()}") if task.state == FAILURE or task.failed(): res = JsonResponse(_poll_state(FAILURE, 0, 400)) - elif task.state == PROGRESS: + elif task.state == STARTED: res = JsonResponse(task.result) if isinstance( task.result, dict) else HttpResponse(task.result) elif task.state == SUCCESS or task.successful() or task.ready(): + tasks_cache[task_id] = task res = HttpResponse(SUCCESS) return res @@ -142,12 +145,18 @@ def download_file(request: HttpRequest): task_id = request.GET.get("task_id") task_name = request.GET.get("task_name", "task") attempts = 0 + task = tasks_cache[task_id] if tasks_cache[task_id] else app.AsyncResult(task_id) + # if tasks_cache[task_id]: + # result = tasks_cache[task_id].get(timeout=5) + # del tasks_cache[task_id] + # return result # CloudAMQP free tier is unstable and must be circuit breakered while (attempts < ATTEMPT_LIMIT): try: attempts += 1 - task = AsyncResult(task_id) - result = task.get(timeout=0.5 * attempts) + # task = app.AsyncResult(task_id) + print(f"!!! task id={task_id},state={task.state},successful={task.successful()},ready={task.ready()},failed={task.failed()}") + result = task.get(timeout=1.0 * attempts) print(f"{task} {task_name} success #{attempts}: {result}") break except TimeoutError: @@ -158,6 +167,7 @@ def download_file(request: HttpRequest): err_msg="Download exceeded max attempts") return result except Exception as e: + print(f"!!! error", e) return _error(request=request, err_msg=f"Failed to download file: {e}") diff --git a/app/worker/app_celery.py b/app/worker/app_celery.py index 3204964d..5c593364 100644 --- a/app/worker/app_celery.py +++ b/app/worker/app_celery.py @@ -1,17 +1,17 @@ import traceback -import celery -from celery.states import SUCCESS, FAILURE from http import HTTPStatus -PROGRESS = 'PROGRESS' +from celery.states import FAILURE, STARTED, SUCCESS + +from reboot.celery import app ATTEMPT_LIMIT = 5 def update_state(state, percent, http_status): print('{0!r} state: {1!r}, progress: {2!r}'.format( - celery.current_task.request.id, state, percent)) - celery.current_task.update_state(state=state, meta={ + app.current_task.request.id, state, percent)) + app.current_task.update_state(state=state, meta={ 'state': state, 'process_percent': percent, 'status': http_status, @@ -19,7 +19,7 @@ def update_state(state, percent, http_status): def update_percent(percent): - update_state(PROGRESS, percent, HTTPStatus.ACCEPTED) + update_state(STARTED, percent, HTTPStatus.ACCEPTED) def set_success(): @@ -27,7 +27,7 @@ def set_success(): def set_failure(e): - celery.current_task.update_state( + app.current_task.update_state( state=FAILURE, meta={ 'exc_type': type(e).__name__, @@ -38,7 +38,7 @@ def set_failure(e): }) -class AppTask(celery.Task): +class AppTask(app.Task): max_retries = 0 # default_retry_delay = 10 diff --git a/app/worker/tasks/__init__.py b/app/worker/tasks/__init__.py index fb4770f6..4bdf8a12 100644 --- a/app/worker/tasks/__init__.py +++ b/app/worker/tasks/__init__.py @@ -1,13 +1,14 @@ ''' Module for tasks to be sent on task queue ''' -from celery import task - from app.worker.app_celery import AppTask +# from celery import task +from reboot.celery import app + from .create_receipt import Receiptor -@task(bind=True, base=AppTask) +@app.task(bind=True, base=AppTask) def receiptor(self, queryset, total_count): receiptor = Receiptor(queryset, total_count) return receiptor() diff --git a/app/worker/tasks/exporter.py b/app/worker/tasks/exporter.py index 9419aec7..28a39714 100644 --- a/app/worker/tasks/exporter.py +++ b/app/worker/tasks/exporter.py @@ -1,6 +1,6 @@ import csv -from celery import task +# from celery import task from celery.utils.log import get_task_logger from django.core import serializers from django.db.models.query import QuerySet @@ -8,9 +8,10 @@ from app.constants.field_names import CURRENT_FIELDS from app.worker.app_celery import AppTask, update_percent +from reboot.celery import app -@task(bind=True, base=AppTask) +@app.task(bind=True, base=AppTask) def exporter(self, file_name, qs: QuerySet = None, total_count: int = 0): rows = serializers.deserialize('json', qs) csv_exporter = CsvExporter(file_name, rows, total_count) diff --git a/app/worker/tasks/importers/__init__.py b/app/worker/tasks/importers/__init__.py index cf650561..dc6b7dad 100644 --- a/app/worker/tasks/importers/__init__.py +++ b/app/worker/tasks/importers/__init__.py @@ -1,13 +1,15 @@ """ Module for csv file importers to be sent to queue """ -from celery import task +# from celery import task from app.worker.app_celery import AppTask +from reboot.celery import app + from .historical_data_importer import HistoricalDataImporter -@task(bind=True, base=AppTask) +@app.task(bind=True, base=AppTask) def historical_data_importer(self, csvpath): importer = HistoricalDataImporter(csvpath) importer() diff --git a/reboot/celeryconfig.py b/reboot/celeryconfig.py index 290d77b9..9a274bae 100644 --- a/reboot/celeryconfig.py +++ b/reboot/celeryconfig.py @@ -8,11 +8,13 @@ broker_pool_limit = 1 event_queue_expires = 60 worker_prefetch_multiplier = 1 -worker_concurrency = 10 +worker_concurrency = 1 accept_content = ['json', 'pickle'] result_backend = config("REDIS_URL") task_serializer = 'pickle' result_serializer = 'pickle' +task_track_started = True +task_ignore_result = False # Use PROD settings if valid CLOUDAMQP_URl, else dev if config('CLOUDAMQP_URL', default=False):