Replies: 3 comments 5 replies
-
|
Before anything, thanks for continuing to improve this library! I'm particularly interested in this feature as I got a use case for it at work that is currently being handled (poorly) with celery that I would love to replace. At least from my perspective working with SQS and on
Also, I was wondering with this feature, would we be able to define a single task that reads from a number of queues like so? queues = [broker.extra_queue(...) i for range(1, 10)]
@broker.task.queues(*queues)
async def task() :..it could simplify the definition for having a task consuming from a number of queues a lot. |
Beta Was this translation helpful? Give feedback.
-
|
@s3rius what about this? IMHO answers to your questions:
Proposed syntax for the feature: # redis_example.py
from taskiq import Queue
from taskiq_redis import RedisStreamBroker, StreamQueue
first = Queue(name="first")
second = Queue(name="second")
third = StreamQueue(name="critical", maxlen=1000)
broker = RedisStreamBroker(
url="...",
default_queue=first,
queues={first, second, third},
)
@broker.task(queue_name=second.name)
async def funny_task(...): ...
funny_task.kiq(...) # kicks to the queue which is provided in the decorator, namely "second"
funny_task.kiq(queue_name=third.name, ...) # kicks to the provided queue, which is "third"Proposed list of changes:
@dataclass(slots=True, kw_only=True, frozen=True)
class Queue:
name: str
options: Mapping[str, Any] = field(default_factory=dict)
def __str__(self) -> str:
return self.namethen we can easily declare specific queues in all adapters: # taskiq_redis/queues.py
from dataclasses import dataclass
from taskiq.queues import Queue
@dataclass(slots=True, kw_only=True, frozen=True)
class RedisStreamQueue(Queue):
maxlen: int | None = None
approximate: bool = True
|
Beta Was this translation helpful? Give feedback.
-
|
So, to sum up: Targeted Syntax:# redis_example.py
from taskiq import Queue
from taskiq_redis import RedisStreamBroker, StreamQueue
first = Queue(name="first")
second = Queue(name="second")
third = StreamQueue(name="critical", maxlen=1000)
broker = RedisStreamBroker(
url="...",
).with_queues(first, second, third).with_default_queue(first)
@broker.task(queue_name=second.name)
async def funny_task(...): ...
funny_task.kiq(...) # Kicks to the queue specified in the decorator, namely "second"
funny_task.kicker().with_queue(third.name).kiq() # Kicks to the provided queue, which is "third"Steps:
|
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Abstract
Currently our brokers only leverage one queue and all tasks are sent in a single queue. The idea is to make it possible to assign different queues to tasks and being able to send tasks and read from multiple queues.
Solution
Currently there's a PR #418 that adds ability to add new methods to
broker.task. We can make use of this feature in order to achieve the following API:The question is how to specify queues, because queue API is different to all brokers.
One of possible solutions is to add function
extra_queuemethod to broker, that returns a genericExtraQueueobject which can be used by the same broker to change task's destination.Here's a usage example:
Questions
ExtraQueueclass. (or return strings)extra_queuetoextra_read_queueandextra_write_queueBeta Was this translation helpful? Give feedback.
All reactions