Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 23 additions & 24 deletions cogs/mentor_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def __init__(

self.lock = asyncio.Lock()
self.queue: asyncio.PriorityQueue[
tuple[int, TaskType, str, str | tuple[str, ...] | None]
tuple[int, TaskType, str, str | None]
] = asyncio.PriorityQueue()

if tracks:
Expand Down Expand Up @@ -103,7 +103,7 @@ async def task_manager(self):
if now < self.next_task_time or self.queue.empty():
return
task = self.queue.get_nowait()
task_time, task_type, track, *details = task
task_time, task_type, track, details = task
# If the next task is not yet due, queue it and return.
if now < task_time:
self.queue.put_nowait(task)
Expand All @@ -121,9 +121,9 @@ async def task_manager(self):
finally:
self.queue_query_discord(track)
elif task_type == TaskType.TASK_DISCORD_ADD:
await self.update_discord_add(track, *details)
await self.update_discord_add(track, details)
elif task_type == TaskType.TASK_DISCORD_DEL:
await self.update_discord_del(track, *details)
await self.update_discord_del(track, details)
else:
logger.exception("Unknown task type, %d", task_type)

Expand Down Expand Up @@ -182,10 +182,9 @@ async def fetch_track_requests(self, track: str) -> None:
logger.debug("Queue TASK_DISCORD_ADD %s %s for now", track, request_id)
self.queue.put_nowait((0, TaskType.TASK_DISCORD_ADD, track, request_id))

if del_requests:
to_del = tuple(list(del_requests)[:10])
logger.debug("Queue TASK_DISCORD_DEL %s %s for now", track, to_del)
self.queue.put_nowait((0, TaskType.TASK_DISCORD_DEL, track, to_del))
for request_id in list(del_requests)[:10]:
logger.debug("Queue TASK_DISCORD_DEL %s %s for now", track, request_id)
self.queue.put_nowait((0, TaskType.TASK_DISCORD_DEL, track, request_id))

def queue_query_exercism(self, track: str) -> None:
"""Queue a task to query Exercism for a track."""
Expand Down Expand Up @@ -216,7 +215,7 @@ async def fetch_discord_thread(self, track: str) -> None:

def queue_query_discord(self, track: str) -> None:
"""Queue a task to query a Discord request thread."""
interval = 60 # one minute
interval = 30 * 60 # 30 minutes
task_time = int(time.time()) + interval
logger.debug("Queue TASK_QUERY_DISCORD %s in %d seconds", track, interval)
self.queue.put_nowait((task_time, TaskType.TASK_QUERY_DISCORD, track, None))
Expand All @@ -231,27 +230,27 @@ async def update_discord_add(self, track: str, request_id: str) -> None:
message = await thread.send(description, suppress_embeds=True)
self.messages[track][request_id] = message.id

async def update_discord_del(self, track: str, message_ids: tuple[int, ...]) -> None:
async def update_discord_del(self, track: str, request_id: str) -> None:
"""Remove a request message from Discord."""
PROM_DISCORD_REQUESTS.labels(True).inc()
logger.debug("Start update_discord_del(%s, %s)", track, message_ids)
logger.debug("Start update_discord_del(%s, %s)", track, request_id)
thread = await self.get_thread(track)
await self.unarchive(self.threads[track])

async with asyncio.timeout(15):
message_id = next(
(
message_id for req, message_id in self.messages[track].items()
if req == request_id
), None
)
if not message_id:
return
async with asyncio.timeout(5):
try:
await thread.delete_messages(
discord.Object(message_id) for message_id in message_ids
)
await thread.get_partial_message(message_id).delete()
del self.messages[track][request_id]
except discord.errors.NotFound:
pass
request_ids = [
request_id
for request_id, message_id in self.messages[track].items()
if message_id in message_ids
]
for request_id in request_ids:
del self.messages[track][request_id]

def populate_task_queue(self):
"""Populate the initial task queue."""
Expand All @@ -260,8 +259,8 @@ def populate_task_queue(self):
# Spread the initial requests over 5 minutes
for track, offset in zip(tracks, range(0, 5 * 60, int(5 * 60 / len(tracks)))):
task_time = int(time.time()) + offset
self.queue.put_nowait((task_time, TaskType.TASK_QUERY_DISCORD, track))
self.queue.put_nowait((task_time + 1, TaskType.TASK_QUERY_EXERCISM, track))
self.queue.put_nowait((task_time, TaskType.TASK_QUERY_DISCORD, track, None))
self.queue.put_nowait((task_time + 1, TaskType.TASK_QUERY_EXERCISM, track, None))

async def unarchive(self, thread: discord.Thread) -> None:
"""Ensure a thread is not archived."""
Expand Down