-
-
Notifications
You must be signed in to change notification settings - Fork 178
Expand file tree
/
Copy pathrouter.py
More file actions
101 lines (87 loc) · 3.8 KB
/
router.py
File metadata and controls
101 lines (87 loc) · 3.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
"""Message routing based on @mentions with per-channel loop guard."""
import re
class Router:
def __init__(self, agent_names: list[str], default_mention: str = "both",
max_hops: int = 4, online_checker=None):
self.agent_names = set(n.lower() for n in agent_names)
self.default_mention = default_mention
self.max_hops = max_hops
self._online_checker = online_checker # callable() -> set of online agent names
# Per-channel state: { channel: { hop_count, paused, guard_emitted } }
self._channels: dict[str, dict] = {}
self._build_pattern()
def _get_ch(self, channel: str) -> dict:
if channel not in self._channels:
self._channels[channel] = {
"hop_count": 0,
"paused": False,
"guard_emitted": False,
}
return self._channels[channel]
def _build_pattern(self):
# Sort longest-first so "gemini-2" is tried before "gemini"
names = "|".join(re.escape(n) for n in sorted(self.agent_names, key=len, reverse=True))
self._mention_re = re.compile(
rf"@({names}|both|all)\b", re.IGNORECASE
)
def parse_mentions(self, text: str) -> list[str]:
mentions = set()
for match in self._mention_re.finditer(text):
name = match.group(1).lower()
if name in ("both", "all"):
# Only tag online agents when using @all
if self._online_checker:
online = self._online_checker()
mentions.update(n for n in self.agent_names if n in online)
else:
mentions.update(self.agent_names)
else:
mentions.add(name)
return list(mentions)
def _is_agent(self, sender: str) -> bool:
return sender.lower() in self.agent_names
def get_targets(self, sender: str, text: str, channel: str = "general") -> list[str]:
"""Determine which agents should receive this message."""
ch = self._get_ch(channel)
mentions = self.parse_mentions(text)
if not self._is_agent(sender):
# Human message resets hop counter and unpauses
ch["hop_count"] = 0
ch["paused"] = False
ch["guard_emitted"] = False
if not mentions:
if self.default_mention in ("both", "all"):
return list(self.agent_names)
elif self.default_mention == "none":
return []
return [self.default_mention]
return mentions
else:
# Agent message: blocked while loop guard is active
if ch["paused"]:
return []
# Only route if explicit @mention
if not mentions:
return []
ch["hop_count"] += 1
if ch["hop_count"] > self.max_hops:
ch["paused"] = True
return []
# Don't route back to self
return [m for m in mentions if m != sender]
def continue_routing(self, channel: str = "general"):
"""Resume after loop guard pause."""
ch = self._get_ch(channel)
ch["hop_count"] = 0
ch["paused"] = False
ch["guard_emitted"] = False
def is_paused(self, channel: str = "general") -> bool:
return self._get_ch(channel)["paused"]
def is_guard_emitted(self, channel: str = "general") -> bool:
return self._get_ch(channel)["guard_emitted"]
def set_guard_emitted(self, channel: str = "general"):
self._get_ch(channel)["guard_emitted"] = True
def update_agents(self, names: list[str]):
"""Replace the agent name set and rebuild the mention regex."""
self.agent_names = set(n.lower() for n in names)
self._build_pattern()