Skip to content

Commit a49691f

Browse files
committed
Rewrite a v2 of the Mentor Requests cog
1 parent 8469e1c commit a49691f

File tree

2 files changed

+348
-0
lines changed

2 files changed

+348
-0
lines changed

cogs/mentor_requests_v2.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# Mentor Requests Cog
2+
3+
## Overview
4+
5+
This cog polls Exercism's API for requests in the queue.
6+
The list of requests is synced to a per-track thread in the mentor requests Discord channel.
7+
8+
## Periodic Tasks
9+
10+
* Poll Exercism for mentor requests.
11+
* Each track can be handled as a separate task vs polling all tracks every time we get an updated.
12+
This is helpful since some tracks are significantly more active than others.
13+
We want to poll at most once every 5 minutes.
14+
Less active tracks can be polled as little as once per hour.
15+
* Store a copy of this in the DB and in memory.
16+
* Expose Exercism request rates to Prometheus.
17+
* Maintain the timestamps of the past N requests per-track to set the per-track interval.
18+
Spitballing, maybe use avg - 1 * stddev, clamped to 5-60 minutes.
19+
* Poll Discord to get all messages in the channel/threads.
20+
* Since we control the messages, they shouldn't drift out of sync too often.
21+
* Reading messages from Discord should be relatively light weight.
22+
* Spread the reads. One track per minute.
23+
* Store the results in the DB and in memory.
24+
* On any state change, queue the change (add message, remove message).
25+
26+
## Tasks
27+
28+
* Fetch track requests from Exercism.
29+
* Fetch Discord messages for a track.
30+
* Send a Discord message.
31+
* Delete a Discord message.
32+
33+
## Worker
34+
35+
* Use a loop task that runs every 5 seconds.
36+
* Use an async-safe lock so only one task runs at a time. If the lock is held, return.
37+
* Store the timestamp for the next queued task. If the timestamp is in the future, return.
38+
* If there is any issues executing a task, leave it for the next loop.

cogs/mentor_requests_v2.py

Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
"""Discord module to publish mentor request queues. Version 2."""
2+
3+
import asyncio
4+
import datetime
5+
import enum
6+
import logging
7+
import random
8+
import re
9+
import statistics
10+
import time
11+
from typing import Sequence
12+
13+
import discord
14+
import prometheus_client # type: ignore
15+
from discord.ext import commands
16+
from discord.ext import tasks
17+
from exercism_lib import exercism
18+
19+
from cogs import base_cog
20+
21+
logger = logging.getLogger(__name__)
22+
23+
PROM_EXERCISM_REQUESTS = prometheus_client.Counter(
24+
"mentor_request_exercism_rpc", "Number of API calls to Exercism", ["track"]
25+
)
26+
PROM_DISCORD_REQUESTS = prometheus_client.Counter(
27+
"mentor_request_discord_rpc", "Number of API calls to Discord", ["write"]
28+
)
29+
PROM_TASK_QUEUE = prometheus_client.Gauge("mentor_requests_task_queue", "size of the task queue")
30+
31+
32+
class TaskType(enum.IntEnum):
33+
"""Types of tasks to execute."""
34+
TASK_QUERY_EXERCISM = enum.auto()
35+
TASK_QUERY_DISCORD = enum.auto()
36+
TASK_DISCORD_ADD = enum.auto()
37+
TASK_DISCORD_DEL = enum.auto()
38+
39+
40+
class RequestNotifierV2(base_cog.BaseCog):
41+
"""Update Discord with Mentor Requests."""
42+
43+
qualified_name = "Request Notifier v2"
44+
45+
def __init__(
46+
self,
47+
bot: commands.Bot,
48+
channel_id: int,
49+
tracks: Sequence[str] | None = None,
50+
**kwargs,
51+
) -> None:
52+
super().__init__(bot=bot, **kwargs)
53+
self.exercism = exercism.AsyncExercism()
54+
self.channel_id = channel_id
55+
56+
self.threads: dict[str, discord.Thread] = {}
57+
self.requests: dict[str, tuple[str, discord.Message]] = {}
58+
self.messages: dict[str, dict[str, str]] = {}
59+
self.next_task_time = 0
60+
61+
self.lock = asyncio.Lock()
62+
self.queue = asyncio.PriorityQueue()
63+
64+
if tracks:
65+
self.tracks = list(tracks)
66+
else:
67+
self.tracks = exercism.Exercism().all_tracks()
68+
self.tracks.sort()
69+
# Default to 10 minute polling.
70+
self.request_interval = {track: 600 for track in self.tracks}
71+
self.request_timestamps = {track: [] for track in self.tracks}
72+
73+
self.task_manager.start() # pylint: disable=E1101
74+
75+
async def get_thread(self, track: str) -> discord.Thread:
76+
"""Return the request thread for a specific track."""
77+
thread = self.threads.get(track)
78+
if not thread:
79+
raise LookupError(f"Failed to find track {track} in threads")
80+
81+
# Refresh the thread object. This is helpful to update the is_archived bit.
82+
async with asyncio.timeout(10):
83+
got = await self.bot.fetch_channel(thread.id)
84+
assert isinstance(got, discord.Thread), f"Expected a Thread. {got=}"
85+
self.threads[track] = got
86+
return got
87+
88+
@tasks.loop(seconds=5)
89+
async def task_manager(self):
90+
"""Task loop."""
91+
if self.lock.locked():
92+
return
93+
async with self.lock:
94+
try:
95+
PROM_TASK_QUEUE.set(self.queue.qsize())
96+
now = int(time.time())
97+
# If the queue is empty or the next task is not yet due, return.
98+
if now < self.next_task_time or self.queue.empty():
99+
return
100+
task = self.queue.get_nowait()
101+
task_time, task_type, track, *details = task
102+
# If the next task is not yet due, queue it and return.
103+
if now < task_time:
104+
self.queue.put_nowait(task)
105+
self.next_task_time = task_time
106+
return
107+
# Handle a task.
108+
if task_type == TaskType.TASK_QUERY_EXERCISM:
109+
try:
110+
await self.fetch_track_requests(track)
111+
finally:
112+
self.queue_query_exercism(track)
113+
elif task_type == TaskType.TASK_QUERY_DISCORD:
114+
try:
115+
await self.fetch_discord_thread(track)
116+
finally:
117+
self.queue_query_discord(track)
118+
elif task_type == TaskType.TASK_DISCORD_ADD:
119+
await self.update_discord_add(track, *details)
120+
elif task_type == TaskType.TASK_DISCORD_DEL:
121+
await self.update_discord_del(track, *details)
122+
else:
123+
logger.exception("Unknown task type, %d", task_type)
124+
125+
except Exception: # pylint: disable=broad-exception-caught
126+
logger.exception("Unhandled exception in task manager loop.")
127+
128+
@task_manager.before_loop
129+
async def before_task_manager(self):
130+
"""Before starting the task manager, wait for ready and load Discord messages."""
131+
logger.debug("Start before_task_manager()")
132+
await self.bot.wait_until_ready()
133+
await self.load_data()
134+
self.populate_task_queue()
135+
logger.debug("End before_task_manager()")
136+
137+
def exercism_poll_interval(self, track: str) -> int:
138+
"""Return the poll interval between getting requests for a track."""
139+
interval = self.request_interval[track]
140+
times = self.request_timestamps[track]
141+
if len(times) < 2:
142+
self.request_interval[track] = min(int(1.5 * interval), 60 * 60 * 60)
143+
return interval
144+
times.sort()
145+
intervals = [a - b for a, b in zip(times[1:], times)]
146+
# [5 min ... avg ... 1 hour]
147+
return min(max(int(statistics.mean(intervals)), 60 * 5), 60 * 60 * 60)
148+
149+
async def fetch_track_requests(self, track: str) -> None:
150+
"""Fetch the requests for a given track. Queue tasks to update the Discord thread."""
151+
logger.debug("Start fetch_track_requests(%s)", track)
152+
if track not in self.messages:
153+
return
154+
PROM_EXERCISM_REQUESTS.labels(track).inc()
155+
# fetch requests
156+
# update DB
157+
# update request interval data
158+
# compare to Discord thread; queue tasks to add/remove.
159+
async with asyncio.timeout(15):
160+
requests = await self.get_requests(track)
161+
logger.debug("Found %d requests for %s.", len(requests), track)
162+
163+
add_requests = set(requests) - set(self.messages[track])
164+
del_requests = set(self.messages[track]) - set(requests)
165+
self.requests[track] = {
166+
request_id: message
167+
for request_id, (timestamp, message) in requests.items()
168+
}
169+
self.request_timestamps[track].extend(
170+
timestamp
171+
for request_id, (timestamp, message) in requests.items()
172+
if request_id in add_requests
173+
)
174+
self.request_timestamps[track] = sorted(self.request_timestamps[track], reverse=True)[:10]
175+
176+
for request_id in add_requests:
177+
logger.debug("Queue TASK_DISCORD_ADD %s %s for now", track, request_id)
178+
self.queue.put_nowait((0, TaskType.TASK_DISCORD_ADD, track, request_id))
179+
180+
if del_requests:
181+
to_del = tuple(list(del_requests)[:10])
182+
logger.debug("Queue TASK_DISCORD_DEL %s %s for now", track, to_del)
183+
self.queue.put_nowait((0, TaskType.TASK_DISCORD_DEL, track, to_del))
184+
185+
def queue_query_exercism(self, track) -> None:
186+
"""Queue a task to query Exercism for a track."""
187+
interval = self.exercism_poll_interval(track)
188+
task_time = int(time.time()) + interval
189+
logger.debug("Queue TASK_QUERY_EXERCISM %s in %d seconds", track, interval)
190+
self.queue.put_nowait((task_time, TaskType.TASK_QUERY_EXERCISM, track))
191+
192+
async def fetch_discord_thread(self, track) -> None:
193+
"""Fetch a track thread from Discord to update the local cache."""
194+
logger.debug("Start fetch_discord_thread(%s)", track)
195+
request_url_re = re.compile(r"\bhttps://exercism.org/mentoring/requests/(\w+)\b")
196+
PROM_DISCORD_REQUESTS.labels(False).inc()
197+
thread = await self.get_thread(track)
198+
messages = {}
199+
await self.unarchive(thread)
200+
async for message in thread.history():
201+
if message.author != thread.owner:
202+
continue
203+
if message == thread.starter_message:
204+
continue
205+
match = request_url_re.search(message.content)
206+
if match is None:
207+
continue
208+
messages[match.group(1)] = message.id
209+
self.messages[track] = messages
210+
211+
def queue_query_discord(self, track: str) -> None:
212+
"""Queue a task to query a Discord request thread."""
213+
interval = 60 # one minute
214+
task_time = int(time.time()) + interval
215+
logger.debug("Queue TASK_QUERY_DISCORD %s in %d seconds", track, interval)
216+
self.queue.put_nowait((task_time, TaskType.TASK_QUERY_DISCORD, track))
217+
218+
async def update_discord_add(self, track: str, request_id: str) -> None:
219+
"""Add a request message to Discord."""
220+
logger.debug("Start update_discord_add(%s, %s)", track, request_id)
221+
PROM_DISCORD_REQUESTS.labels(True).inc()
222+
thread = await self.get_thread(track)
223+
description = self.requests[track][request_id]
224+
async with asyncio.timeout(10):
225+
message = await thread.send(description, suppress_embeds=True)
226+
self.messages[track][request_id] = message.id
227+
228+
async def update_discord_del(self, track: str, message_ids: tuple[int, ...]) -> None:
229+
"""Remove a request message from Discord."""
230+
PROM_DISCORD_REQUESTS.labels(True).inc()
231+
logger.debug("Start update_discord_del(%s, %s)", track, message_ids)
232+
thread = await self.get_thread(track)
233+
await self.unarchive(self.threads[track])
234+
235+
async with asyncio.timeout(15):
236+
try:
237+
await thread.delete_messages(
238+
discord.Object(message_id) for message_id in message_ids
239+
)
240+
except discord.errors.NotFound:
241+
pass
242+
request_ids = [
243+
request_id
244+
for request_id, message_id in self.messages[track].items()
245+
if message_id in message_ids
246+
]
247+
for request_id in request_ids:
248+
del self.messages[track][request_id]
249+
250+
def populate_task_queue(self):
251+
"""Populate the initial task queue."""
252+
tracks = self.tracks.copy()
253+
random.shuffle(tracks)
254+
# Spread the initial requests over 5 minutes
255+
for track, offset in zip(tracks, range(0, 5 * 60, int(5 * 60 / len(tracks)))):
256+
task_time = int(time.time()) + offset
257+
self.queue.put_nowait((task_time, TaskType.TASK_QUERY_DISCORD, track))
258+
self.queue.put_nowait((task_time + 1, TaskType.TASK_QUERY_EXERCISM, track))
259+
260+
async def unarchive(self, thread: discord.Thread) -> None:
261+
"""Ensure a thread is not archived."""
262+
if not thread.archived:
263+
return
264+
async with asyncio.timeout(10):
265+
message = await thread.send("Sending a message to unarchive this thread.")
266+
async with asyncio.timeout(10):
267+
await message.delete()
268+
269+
async def load_data(self) -> None:
270+
"""Load Exercism data."""
271+
channel = self.bot.get_channel(self.channel_id)
272+
assert isinstance(channel, discord.TextChannel), f"{channel} is not a TextChannel."
273+
274+
self.threads = {}
275+
async for message in channel.history():
276+
if not message.thread:
277+
continue
278+
thread = await message.fetch_thread()
279+
self.threads[thread.name.lower()] = thread
280+
281+
for track in self.tracks:
282+
if track in self.threads:
283+
continue
284+
thread = await channel.create_thread(
285+
name=track.title(),
286+
type=discord.ChannelType.public_thread,
287+
)
288+
self.threads[track] = thread
289+
await asyncio.sleep(2)
290+
291+
async def get_requests(self, track_slug: str) -> dict[str, tuple[int, str]]:
292+
"""Return formatted mentor requests."""
293+
requests = {}
294+
for req in await self.exercism.mentor_requests(track_slug):
295+
# uuid = req["uuid"]
296+
track_title = req["track"]["title"]
297+
exercise_title = req["exercise"]["title"]
298+
student_handle = req["student"]["handle"]
299+
status = req["status"]
300+
url = req["url"]
301+
302+
msg = f"{track_title.title()}: {url} => {exercise_title} "
303+
if status:
304+
msg += f"({student_handle}, {status})"
305+
else:
306+
msg += f"({student_handle})"
307+
308+
timestamp = int(datetime.datetime.fromisoformat(req["updated_at"]).timestamp())
309+
requests[req["uuid"]] = (timestamp, msg)
310+
return requests

0 commit comments

Comments
 (0)