Skip to content

Commit fb258e2

Browse files
Add syncing metagraph and remove manual script
1 parent 96bb89a commit fb258e2

File tree

5 files changed

+163
-75
lines changed

5 files changed

+163
-75
lines changed

api/src/endpoints/scoring.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from api.src.backend.queries.agents import get_top_agent, ban_agents as db_ban_agents, approve_agent_version, get_agent_by_version_id as db_get_agent_by_version_id
1818
from api.src.backend.entities import MinerAgentScored
1919
from api.src.backend.db_manager import get_transaction, new_db, get_db_connection
20-
from api.src.utils.refresh_subnet_hotkeys import check_if_hotkey_is_registered
20+
from api.src.utils.subtensor import get_subnet_hotkeys, check_if_hotkey_is_registered
2121
from api.src.utils.slack import notify_unregistered_top_miner, notify_unregistered_treasury_hotkey
2222
from api.src.backend.internal_tools import InternalTools
2323
from api.src.backend.entities import TreasuryTransaction
@@ -71,7 +71,7 @@ async def get_treasury_hotkey():
7171
raise ValueError("No active treasury wallets found in database")
7272
treasury_hotkey = treasury_hotkey_data[0]["hotkey"]
7373

74-
if not check_if_hotkey_is_registered(treasury_hotkey):
74+
if not await check_if_hotkey_is_registered(treasury_hotkey):
7575
logger.error(f"Treasury hotkey {treasury_hotkey} not registered on subnet")
7676
await notify_unregistered_treasury_hotkey(treasury_hotkey)
7777

@@ -100,7 +100,7 @@ async def weights() -> Dict[str, float]:
100100
if top_agent.miner_hotkey.startswith("open-"):
101101
weights[treasury_hotkey] = weight_left
102102
else:
103-
if check_if_hotkey_is_registered(top_agent.miner_hotkey):
103+
if await check_if_hotkey_is_registered(top_agent.miner_hotkey):
104104
weights[top_agent.miner_hotkey] = weight_left
105105
else:
106106
logger.error(f"Top agent {top_agent.miner_hotkey} not registered on subnet")

api/src/main.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from api.src.endpoints.benchmarks import router as benchmarks_router
2525
from api.src.utils.slack import send_slack_message
2626
from api.src.utils.config import WHITELISTED_VALIDATOR_IPS
27+
from api.src.utils.hotkey_subscription import start_hotkey_subscription, stop_hotkey_subscription
2728

2829
logger = get_logger(__name__)
2930

@@ -56,8 +57,13 @@ async def lifespan(app: FastAPI):
5657
# Start background tasks
5758
asyncio.create_task(run_weight_setting_loop(30))
5859

60+
# Start hotkey subscription service
61+
asyncio.create_task(start_hotkey_subscription())
62+
5963
yield
6064

65+
# Stop hotkey subscription service
66+
await stop_hotkey_subscription()
6167
await new_db.close()
6268

6369
app = FastAPI(lifespan=lifespan)
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
import asyncio
2+
import json
3+
import os
4+
import time
5+
import threading
6+
from typing import Optional
7+
from substrateinterface import SubstrateInterface
8+
from pathlib import Path
9+
10+
from loggers.logging_utils import get_logger
11+
12+
logger = get_logger(__name__)
13+
14+
NETUID = os.getenv("NETUID", "62")
15+
SUBTENSOR_URL = os.getenv("SUBTENSOR_ADDRESS", "ws://127.0.0.1:9945")
16+
CACHE_FILE = Path("subnet_hotkeys_cache.json")
17+
18+
class HotkeySubscriptionManager:
19+
def __init__(self):
20+
self.is_running = False
21+
self.main_loop: Optional[asyncio.AbstractEventLoop] = None
22+
23+
async def start_subscription(self) -> None:
24+
if self.is_running:
25+
return
26+
27+
self.is_running = True
28+
self.main_loop = asyncio.get_running_loop()
29+
logger.info("Starting hotkey subscription service")
30+
31+
await self._update_cache()
32+
33+
asyncio.create_task(self._subscription_loop())
34+
35+
async def stop_subscription(self) -> None:
36+
self.is_running = False
37+
logger.info("Stopped hotkey subscription service")
38+
39+
async def _subscription_loop(self) -> None:
40+
retry_delay = 1.0
41+
42+
while self.is_running:
43+
try:
44+
await self._run_subscription()
45+
retry_delay = 1.0 # Reset on success
46+
except Exception as e:
47+
logger.error(f"Subscription failed: {e}")
48+
if retry_delay < 60:
49+
retry_delay *= 2
50+
await asyncio.sleep(retry_delay)
51+
52+
async def _run_subscription(self) -> None:
53+
substrate = SubstrateInterface(
54+
url=SUBTENSOR_URL,
55+
ss58_format=42,
56+
type_registry_preset="substrate-node-template"
57+
)
58+
59+
try:
60+
storage_key = substrate.create_storage_key("SubtensorModule", "Uids", [NETUID])
61+
62+
def handler(storage_key, obj, update_nr, subscription_id):
63+
if not self.is_running:
64+
return True
65+
if self.main_loop and not self.main_loop.is_closed():
66+
asyncio.run_coroutine_threadsafe(self._update_cache(), self.main_loop)
67+
return None
68+
69+
# Run in thread to avoid blocking
70+
def subscription_thread():
71+
try:
72+
substrate.subscribe_storage([storage_key], handler)
73+
except Exception as e:
74+
# Filters out expected errors, such as
75+
# ERROR - Subscription error: Expecting value: line 1 column 1 (char 0)
76+
# ERROR - Subscription error: Connection closed
77+
# ERROR - Subscription error: WebSocket connection is closed
78+
if not any(x in str(e).lower() for x in ["expecting value", "json", "connection", "closed"]):
79+
logger.error(f"Subscription error: {e}")
80+
finally:
81+
try:
82+
substrate.close()
83+
except:
84+
pass
85+
86+
thread = threading.Thread(target=subscription_thread, daemon=True)
87+
thread.start()
88+
89+
while self.is_running and thread.is_alive():
90+
await asyncio.sleep(1)
91+
92+
finally:
93+
try:
94+
substrate.close()
95+
except:
96+
pass
97+
98+
async def _update_cache(self) -> None:
99+
try:
100+
substrate = SubstrateInterface(
101+
url=SUBTENSOR_URL,
102+
ss58_format=42,
103+
type_registry_preset="substrate-node-template"
104+
)
105+
106+
result = substrate.query_map("SubtensorModule", "Uids", [NETUID])
107+
hotkeys = []
108+
109+
for uid_data in result:
110+
try:
111+
hotkey = uid_data[0]
112+
if hasattr(hotkey, 'value'):
113+
hotkey = hotkey.value
114+
if isinstance(hotkey, bytes):
115+
hotkey = substrate.ss58_encode(hotkey)
116+
hotkeys.append(hotkey)
117+
except:
118+
continue
119+
120+
# Atomic write
121+
temp_file = CACHE_FILE.with_suffix('.tmp')
122+
with open(temp_file, 'w') as f:
123+
json.dump({"hotkeys": hotkeys, "timestamp": time.time()}, f)
124+
temp_file.replace(CACHE_FILE)
125+
126+
logger.info(f"Updated cache with {len(hotkeys)} hotkeys")
127+
substrate.close()
128+
129+
except Exception as e:
130+
logger.error(f"Failed to update cache: {e}")
131+
132+
# Global instance
133+
_manager: Optional[HotkeySubscriptionManager] = None
134+
135+
async def start_hotkey_subscription() -> None:
136+
global _manager
137+
if _manager is None:
138+
_manager = HotkeySubscriptionManager()
139+
await _manager.start_subscription()
140+
141+
async def stop_hotkey_subscription() -> None:
142+
global _manager
143+
if _manager:
144+
await _manager.stop_subscription()

api/src/utils/refresh_subnet_hotkeys.py

Lines changed: 0 additions & 71 deletions
This file was deleted.

api/src/utils/subtensor.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,17 @@ async def get_subnet_hotkeys():
2222
data = json.loads(content)
2323
return data["hotkeys"]
2424
else:
25-
logger.warning("Hotkeys cache file does not exist. Make sure refresh_subnet_hotkeys.py service is running.")
25+
logger.warning("Hotkeys cache file does not exist. Make sure hotkey subscription service is running.")
2626
return []
2727
except Exception as e:
2828
logger.error(f"Error reading hotkeys cache: {e}")
2929
return []
30+
31+
async def check_if_hotkey_is_registered(hotkey: str) -> bool:
32+
"""Check if a hotkey is registered on the subnet by looking in the cache."""
33+
try:
34+
hotkeys = await get_subnet_hotkeys()
35+
return hotkey in hotkeys
36+
except Exception as e:
37+
logger.error(f"Error checking if hotkey is registered: {e}")
38+
return False

0 commit comments

Comments
 (0)