Skip to content

Commit

Permalink
feat: Implement conf.tasks_default_queues (#1434)
Browse files Browse the repository at this point in the history
@CallDeferred tasks run in the Cloud Tasks Queue "default" by default.
One way to run them in a different task queue is to use the `_queue`
parameter when calling the task.
However, as this is not possible for existing or low-hanging calls,
default values can be defined here for each task.
To do this, the task path must be mapped to the queue name:
```
conf.tasks_default_queues["updateRelations.viur.core.skeleton"] = "update_relations"
```
The queue (in the example: `"update_relations"`) must exist. The default
queue can be changed by overwriting `"__default__"`.
  • Loading branch information
sveneberth authored Feb 27, 2025
1 parent 07b04b8 commit 9061fbd
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 2 deletions.
17 changes: 17 additions & 0 deletions src/viur/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,23 @@ def tasks_custom_environment_handler(self, value: "CustomEnvironmentHandler") ->
else:
raise ValueError(f"Invalid type {type(value)}. Expected a CustomEnvironmentHandler object.")

tasks_default_queues: dict[str, str] = {
"__default__": "default",
}
"""
@CallDeferred tasks run in the Cloud Tasks Queue "default" by default.
One way to run them in a different task queue is to use the `_queue` parameter
when calling the task.
However, as this is not possible for existing or low-hanging calls,
default values can be defined here for each task.
To do this, the task path must be mapped to the queue name:
```
conf.tasks_default_queues["updateRelations.viur.core.skeleton"] = "update_relations"
```
The queue (in the example: `"update_relations"`) must exist.
The default queue can be changed by overwriting `"__default__"`.
"""

valid_application_ids: list[str] = []
"""Which application-ids we're supposed to run on"""

Expand Down
11 changes: 9 additions & 2 deletions src/viur/core/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,9 @@ def CallDeferred(func: t.Callable) -> t.Callable:
In addition to the arguments for the wrapped methods you can set these:
_queue: Specify the queue in which the task should be pushed.
"default" is the default value. The queue must exist (use the queue.yaml).
If no value is given, the queue name set in `conf.tasks_default_queues`
will be used. If the config does not have a value for this task, "default"
is used as the default. The queue must exist (use the queue.yaml).
_countdown: Specify a time in seconds after which the task should be called.
This time is relative to the moment where the wrapped method has been called.
Expand Down Expand Up @@ -500,7 +502,7 @@ def make_deferred(
func: t.Callable,
self=__undefinedFlag_,
*args,
_queue: str = "default",
_queue: str = None,
_name: str | None = None,
_call_deferred: bool = True,
_target_version: str = conf.instance.app_version,
Expand Down Expand Up @@ -564,6 +566,11 @@ def task():
args = (self,) + args # Re-append self to args, as this function is (hopefully) unbound
command = "unb"

if _queue is None:
_queue = conf.tasks_default_queues.get(
funcPath, conf.tasks_default_queues.get("__default__", "default")
)

# Try to preserve the important data from the current environment
try: # We might get called inside a warmup request without session
usr = current.session.get().get("user")
Expand Down

0 comments on commit 9061fbd

Please sign in to comment.