forked from ap-choji/patrol_subnet
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsubgraph_generator.py
More file actions
197 lines (146 loc) · 7.76 KB
/
subgraph_generator.py
File metadata and controls
197 lines (146 loc) · 7.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import time
import asyncio
from typing import Dict, List
import bittensor as bt
from patrol.constants import Constants
from patrol.chain_data.event_fetcher import EventFetcher
from patrol.chain_data.event_processor import EventProcessor
from patrol.protocol import GraphPayload, Node, Edge, TransferEvidence, StakeEvidence
class SubgraphGenerator:
# These parameters control the subgraph generation:
# - _max_future_events: The number of events into the past you will collect
# - _max_past_events: The number of events into the future you will collect
# - _batch_size: The number of events fetched in one go from the block chain
# Adjust these based on your needs - higher values give higher chance of being able to find and deliver larger subgraphs,
# but will require more time and resources to generate
def __init__(self, event_fetcher: EventFetcher, event_processor: EventProcessor, max_future_events: int = 50, max_past_events: int = 50, batch_size: int = 25, timeout=10):
self.event_fetcher = event_fetcher
self.event_processor = event_processor
self._max_future_events = max_future_events
self._max_past_events = max_past_events
self._batch_size = batch_size
self.timeout = timeout
async def generate_block_numbers(self, target_block: int, upper_block_limit: int, lower_block_limit: int = Constants.LOWER_BLOCK_LIMIT) -> List[int]:
bt.logging.info(f"Generating block numbers for target block: {target_block}")
start_block = max(target_block - self._max_past_events, lower_block_limit)
end_block = min(target_block + self._max_future_events, upper_block_limit)
return list(range(start_block, end_block + 1))
def generate_adjacency_graph_from_events(self, events: List[Dict]) -> Dict:
start_time = time.time()
graph = {}
# Iterate over the events and add edges based on available keys.
# We look for 'coldkey_source', 'coldkey_destination' and 'coldkey_owner'
for event in events:
src = event.get("coldkey_source")
dst = event.get("coldkey_destination")
ownr = event.get("coldkey_owner")
connections = []
if src and dst:
connections.append((src, dst))
connections.append((dst, src))
if src and ownr:
connections.append((src, ownr))
connections.append((ownr, src))
if dst and ownr:
connections.append((dst, ownr))
connections.append((ownr, dst))
for a, b in connections:
if a not in graph:
graph[a] = []
graph[a].append({"neighbor": b, "event": event})
bt.logging.info(f"Adjacency graph created in {time.time() - start_time} seconds.")
return graph
def generate_subgraph_from_adjacency_graph(self, adjacency_graph: Dict, target_address: str) -> Dict:
start_time = time.time()
nodes = []
edges = []
seen_nodes = set()
seen_edges = set()
queue = [target_address]
while queue:
current = queue.pop(0)
if current not in seen_nodes:
nodes.append(
Node(
id=current,
type="wallet",
origin="bittensor"
))
seen_nodes.add(current)
for conn in adjacency_graph.get(current, []):
neighbor = conn["neighbor"]
event = conn["event"]
evidence = event['evidence']
edge_key = (
event.get('coldkey_source'),
event.get('coldkey_destination'),
event.get('category'),
event.get('type'),
evidence.get('rao_amount'),
evidence.get('block_number')
)
try:
if edge_key not in seen_edges:
seen_edges.add(edge_key)
if event.get('category') == "balance":
edges.append(
Edge(
coldkey_source=event['coldkey_source'],
coldkey_destination=event['coldkey_destination'],
category=event['category'],
type=event['type'],
evidence=TransferEvidence(**event['evidence'])
)
)
elif event.get('category') == "staking":
edges.append(
Edge(
coldkey_source=event['coldkey_source'],
coldkey_destination=event['coldkey_destination'],
coldkey_owner=event.get('coldkey_owner'),
category=event['category'],
type=event['type'],
evidence=StakeEvidence(**event['evidence'])
)
)
except Exception as e:
bt.logging.debug(f"Issue with adding edge to subgraph, skipping for now. Error: {e}")
if neighbor not in seen_nodes and neighbor not in queue:
queue.append(neighbor)
subgraph_length = len(nodes) + len(edges)
bt.logging.info(f"Subgraph graph of length {subgraph_length} created in {time.time() - start_time} seconds.")
return GraphPayload(nodes=nodes, edges=edges)
async def run(self, target_address:str, target_block:int, max_block_number: int):
block_numbers = await self.generate_block_numbers(target_block, upper_block_limit=max_block_number)
events = await self.event_fetcher.fetch_all_events(block_numbers)
processed_events = await self.event_processor.process_event_data(events)
adjacency_graph = self.generate_adjacency_graph_from_events(processed_events)
subgraph = self.generate_subgraph_from_adjacency_graph(adjacency_graph, target_address)
return subgraph
if __name__ == "__main__":
from patrol.chain_data.coldkey_finder import ColdkeyFinder
async def example():
bt.debug()
from patrol.chain_data.substrate_client import SubstrateClient
from patrol.chain_data.runtime_groupings import load_versions
network_url = "wss://archive.chain.opentensor.ai:443/"
versions = load_versions()
client = SubstrateClient(runtime_mappings=versions, network_url=network_url, max_retries=3)
await client.initialize()
start_time = time.time()
target = "5FyCncAf9EBU8Nkcm5gL1DQu3hVmY7aphiqRn3CxwoTmB1cZ"
target_block = 4179349
fetcher = EventFetcher(substrate_client=client)
coldkey_finder = ColdkeyFinder(substrate_client=client)
event_processor = EventProcessor(coldkey_finder=coldkey_finder)
subgraph_generator = SubgraphGenerator(event_fetcher=fetcher, event_processor=event_processor, max_future_events=50, max_past_events=50, batch_size=50)
subgraph = await subgraph_generator.run(target, target_block, max_block_number=4179351)
volume = len(subgraph.nodes) + len(subgraph.edges)
import dataclasses
import json
output = dataclasses.asdict(subgraph)
with open("subgraph_output.json", "w") as f:
json.dump(output, f, indent=2)
# bt.logging.info(output)
bt.logging.info(f"Finished: {time.time() - start_time} with volume: {volume}")
asyncio.run(example())