Skip to content

Commit 834b03b

Browse files
authored
Merge pull request #112 from taskiq-python/fix-issues-in-ci
chore: fix issues in ci
2 parents f7c061f + ac1d78e commit 834b03b

File tree

4 files changed

+73
-77
lines changed

4 files changed

+73
-77
lines changed

pyproject.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,3 +127,9 @@ allow-magic-value-types = ["int", "str", "float"]
127127

128128
[tool.ruff.lint.flake8-bugbear]
129129
extend-immutable-calls = ["taskiq_dependencies.Depends", "taskiq.TaskiqDepends"]
130+
131+
[tool.pytest.ini_options]
132+
filterwarnings = [
133+
# about deprecated RedisScheduleSource usage - delete after removing RedisScheduleSource
134+
'ignore:RedisScheduleSource is deprecated:DeprecationWarning',
135+
]

taskiq_redis/list_schedule_source.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ async def _get_previous_time_schedules(self) -> list[bytes]:
130130
if key_time and key_time <= minute_before:
131131
time_keys.append(key.decode())
132132
for key in time_keys:
133-
schedules.extend(await redis.lrange(key, 0, -1))
133+
schedules.extend(await redis.lrange(key, 0, -1)) # type: ignore[misc]
134134

135135
return schedules
136136

@@ -146,10 +146,10 @@ async def delete_schedule(self, schedule_id: str) -> None:
146146
)
147147
# We need to remove the schedule from the cron or time list.
148148
if schedule.cron is not None:
149-
await redis.lrem(self._get_cron_key(), 0, schedule_id)
149+
await redis.lrem(self._get_cron_key(), 0, schedule_id) # type: ignore[misc]
150150
elif schedule.time is not None:
151151
time_key = self._get_time_key(schedule.time)
152-
await redis.lrem(time_key, 0, schedule_id)
152+
await redis.lrem(time_key, 0, schedule_id) # type: ignore[misc]
153153

154154
async def add_schedule(self, schedule: "ScheduledTask") -> None:
155155
"""Add a schedule to the source."""
@@ -163,9 +163,9 @@ async def add_schedule(self, schedule: "ScheduledTask") -> None:
163163
# This is an optimization, so we can get all the schedules
164164
# for the current time much faster.
165165
if schedule.cron is not None:
166-
await redis.rpush(self._get_cron_key(), schedule.schedule_id)
166+
await redis.rpush(self._get_cron_key(), schedule.schedule_id) # type: ignore[misc]
167167
elif schedule.time is not None:
168-
await redis.rpush(
168+
await redis.rpush( # type: ignore[misc]
169169
self._get_time_key(schedule.time),
170170
schedule.schedule_id,
171171
)
@@ -195,11 +195,11 @@ async def get_schedules(self) -> List["ScheduledTask"]:
195195
self._is_first_run = False
196196
async with Redis(connection_pool=self._connection_pool) as redis:
197197
buffer = []
198-
crons = await redis.lrange(self._get_cron_key(), 0, -1)
198+
crons = await redis.lrange(self._get_cron_key(), 0, -1) # type: ignore[misc]
199199
logger.debug("Got %d cron schedules", len(crons))
200200
if crons:
201201
buffer.extend(crons)
202-
timed.extend(await redis.lrange(self._get_time_key(current_time), 0, -1))
202+
timed.extend(await redis.lrange(self._get_time_key(current_time), 0, -1)) # type: ignore[misc]
203203
logger.debug("Got %d timed schedules", len(timed))
204204
if timed:
205205
buffer.extend(timed)

taskiq_redis/redis_backend.py

Lines changed: 55 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
TYPE_CHECKING,
55
Any,
66
AsyncIterator,
7-
Dict,
87
List,
98
Optional,
109
Tuple,
@@ -121,17 +120,15 @@ async def set_result(
121120
:param task_id: ID of the task.
122121
:param result: TaskiqResult instance.
123122
"""
124-
redis_set_params: Dict[str, Union[str, int, bytes]] = {
125-
"name": self._task_name(task_id),
126-
"value": self.serializer.dumpb(model_dump(result)),
127-
}
128-
if self.result_ex_time:
129-
redis_set_params["ex"] = self.result_ex_time
130-
elif self.result_px_time:
131-
redis_set_params["px"] = self.result_px_time
132-
123+
name = self._task_name(task_id)
124+
value = self.serializer.dumpb(model_dump(result))
133125
async with Redis(connection_pool=self.redis_pool) as redis:
134-
await redis.set(**redis_set_params)
126+
if self.result_ex_time:
127+
await redis.set(name=name, value=value, ex=self.result_ex_time)
128+
elif self.result_px_time:
129+
await redis.set(name=name, value=value, px=self.result_px_time)
130+
else:
131+
await redis.set(name=name, value=value)
135132

136133
async def is_result_ready(self, task_id: str) -> bool:
137134
"""
@@ -195,17 +192,15 @@ async def set_progress(
195192
:param task_id: ID of the task.
196193
:param result: task's TaskProgress instance.
197194
"""
198-
redis_set_params: Dict[str, Union[str, int, bytes]] = {
199-
"name": self._task_name(task_id) + PROGRESS_KEY_SUFFIX,
200-
"value": self.serializer.dumpb(model_dump(progress)),
201-
}
202-
if self.result_ex_time:
203-
redis_set_params["ex"] = self.result_ex_time
204-
elif self.result_px_time:
205-
redis_set_params["px"] = self.result_px_time
206-
195+
name = self._task_name(task_id) + PROGRESS_KEY_SUFFIX
196+
value = self.serializer.dumpb(model_dump(progress))
207197
async with Redis(connection_pool=self.redis_pool) as redis:
208-
await redis.set(**redis_set_params)
198+
if self.result_ex_time:
199+
await redis.set(name=name, value=value, ex=self.result_ex_time)
200+
elif self.result_px_time:
201+
await redis.set(name=name, value=value, px=self.result_px_time)
202+
else:
203+
await redis.set(name=name, value=value)
209204

210205
async def get_progress(
211206
self,
@@ -296,24 +291,23 @@ async def set_result(
296291
result: TaskiqResult[_ReturnType],
297292
) -> None:
298293
"""
299-
Sets task result in redis.
294+
Sets task result in redis cluster.
300295
301296
Dumps TaskiqResult instance into the bytes and writes
302-
it to redis.
297+
it to redis cluster.
303298
304299
:param task_id: ID of the task.
305300
:param result: TaskiqResult instance.
306301
"""
307-
redis_set_params: Dict[str, Union[str, bytes, int]] = {
308-
"name": self._task_name(task_id),
309-
"value": self.serializer.dumpb(model_dump(result)),
310-
}
311-
if self.result_ex_time:
312-
redis_set_params["ex"] = self.result_ex_time
313-
elif self.result_px_time:
314-
redis_set_params["px"] = self.result_px_time
315-
316-
await self.redis.set(**redis_set_params) # type: ignore
302+
name = self._task_name(task_id)
303+
value = self.serializer.dumpb(model_dump(result))
304+
async with self.redis as redis:
305+
if self.result_ex_time:
306+
await redis.set(name=name, value=value, ex=self.result_ex_time)
307+
elif self.result_px_time:
308+
await redis.set(name=name, value=value, px=self.result_px_time)
309+
else:
310+
await redis.set(name=name, value=value)
317311

318312
async def is_result_ready(self, task_id: str) -> bool:
319313
"""
@@ -367,24 +361,23 @@ async def set_progress(
367361
progress: TaskProgress[_ReturnType],
368362
) -> None:
369363
"""
370-
Sets task progress in redis.
364+
Sets task progress in redis cluster.
371365
372366
Dumps TaskProgress instance into the bytes and writes
373-
it to redis with a standard suffix on the task_id as the key
367+
it to redis cluster with a standard suffix on the task_id as the key
374368
375369
:param task_id: ID of the task.
376370
:param result: task's TaskProgress instance.
377371
"""
378-
redis_set_params: Dict[str, Union[str, int, bytes]] = {
379-
"name": self._task_name(task_id) + PROGRESS_KEY_SUFFIX,
380-
"value": self.serializer.dumpb(model_dump(progress)),
381-
}
382-
if self.result_ex_time:
383-
redis_set_params["ex"] = self.result_ex_time
384-
elif self.result_px_time:
385-
redis_set_params["px"] = self.result_px_time
386-
387-
await self.redis.set(**redis_set_params) # type: ignore
372+
name = self._task_name(task_id) + PROGRESS_KEY_SUFFIX
373+
value = self.serializer.dumpb(model_dump(progress))
374+
async with self.redis as redis:
375+
if self.result_ex_time:
376+
await redis.set(name=name, value=value, ex=self.result_ex_time)
377+
elif self.result_px_time:
378+
await redis.set(name=name, value=value, px=self.result_px_time)
379+
else:
380+
await redis.set(name=name, value=value)
388381

389382
async def get_progress(
390383
self,
@@ -490,17 +483,15 @@ async def set_result(
490483
:param task_id: ID of the task.
491484
:param result: TaskiqResult instance.
492485
"""
493-
redis_set_params: Dict[str, Union[str, bytes, int]] = {
494-
"name": self._task_name(task_id),
495-
"value": self.serializer.dumpb(model_dump(result)),
496-
}
497-
if self.result_ex_time:
498-
redis_set_params["ex"] = self.result_ex_time
499-
elif self.result_px_time:
500-
redis_set_params["px"] = self.result_px_time
501-
486+
name = self._task_name(task_id)
487+
value = self.serializer.dumpb(model_dump(result))
502488
async with self._acquire_master_conn() as redis:
503-
await redis.set(**redis_set_params) # type: ignore
489+
if self.result_ex_time:
490+
await redis.set(name=name, value=value, ex=self.result_ex_time)
491+
elif self.result_px_time:
492+
await redis.set(name=name, value=value, px=self.result_px_time)
493+
else:
494+
await redis.set(name=name, value=value)
504495

505496
async def is_result_ready(self, task_id: str) -> bool:
506497
"""
@@ -559,22 +550,20 @@ async def set_progress(
559550
Sets task progress in redis.
560551
561552
Dumps TaskProgress instance into the bytes and writes
562-
it to redis with a standard suffix on the task_id as the key
553+
it to redis via sentinel with a standard suffix on the task_id as the key
563554
564555
:param task_id: ID of the task.
565556
:param result: task's TaskProgress instance.
566557
"""
567-
redis_set_params: Dict[str, Union[str, int, bytes]] = {
568-
"name": self._task_name(task_id) + PROGRESS_KEY_SUFFIX,
569-
"value": self.serializer.dumpb(model_dump(progress)),
570-
}
571-
if self.result_ex_time:
572-
redis_set_params["ex"] = self.result_ex_time
573-
elif self.result_px_time:
574-
redis_set_params["px"] = self.result_px_time
575-
558+
name = self._task_name(task_id) + PROGRESS_KEY_SUFFIX
559+
value = self.serializer.dumpb(model_dump(progress))
576560
async with self._acquire_master_conn() as redis:
577-
await redis.set(**redis_set_params) # type: ignore
561+
if self.result_ex_time:
562+
await redis.set(name=name, value=value, ex=self.result_ex_time)
563+
elif self.result_px_time:
564+
await redis.set(name=name, value=value, px=self.result_px_time)
565+
else:
566+
await redis.set(name=name, value=value)
578567

579568
async def get_progress(
580569
self,

taskiq_redis/redis_broker.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
Dict,
1111
Optional,
1212
TypeVar,
13+
Union,
1314
)
1415

1516
from redis.asyncio import BlockingConnectionPool, Connection, Redis, ResponseError
@@ -122,7 +123,7 @@ async def kick(self, message: BrokerMessage) -> None:
122123
"""
123124
queue_name = message.labels.get("queue_name") or self.queue_name
124125
async with Redis(connection_pool=self.connection_pool) as redis_conn:
125-
await redis_conn.lpush(queue_name, message.message)
126+
await redis_conn.lpush(queue_name, message.message) # type: ignore
126127

127128
async def listen(self) -> AsyncGenerator[bytes, None]:
128129
"""
@@ -137,7 +138,7 @@ async def listen(self) -> AsyncGenerator[bytes, None]:
137138
while True:
138139
try:
139140
async with Redis(connection_pool=self.connection_pool) as redis_conn:
140-
yield (await redis_conn.brpop(self.queue_name))[
141+
yield (await redis_conn.brpop(self.queue_name))[ # type: ignore
141142
redis_brpop_data_position
142143
]
143144
except ConnectionError as exc:
@@ -170,7 +171,7 @@ def __init__(
170171
idle_timeout: int = 600000, # 10 minutes
171172
unacknowledged_batch_size: int = 100,
172173
xread_count: Optional[int] = 100,
173-
additional_streams: Optional[Dict[str, str]] = None,
174+
additional_streams: Optional[Dict[str, Union[str, int]]] = None,
174175
**connection_kwargs: Any,
175176
) -> None:
176177
"""
@@ -281,7 +282,7 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
281282
self.consumer_name,
282283
{
283284
self.queue_name: ">",
284-
**self.additional_streams,
285+
**self.additional_streams, # type: ignore[dict-item]
285286
},
286287
block=self.block,
287288
noack=False,

0 commit comments

Comments
 (0)