Skip to content

Commit a16b8b7

Browse files
fix: make storage's lifetime job type
1 parent 44265e4 commit a16b8b7

File tree

2 files changed

+28
-39
lines changed

2 files changed

+28
-39
lines changed

graphgen/common/init_storage.py

Lines changed: 26 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ def drop(self):
4747

4848
def reload(self):
4949
return self.kv.reload()
50+
51+
def ready(self) -> bool:
52+
return True
5053

5154

5255
class GraphStorageActor:
@@ -113,23 +116,15 @@ def delete_node(self, node_id: str):
113116

114117
def reload(self):
115118
return self.graph.reload()
116-
117-
118-
def get_actor_handle(name: str):
119-
try:
120-
return ray.get_actor(name)
121-
except ValueError as exc:
122-
raise RuntimeError(
123-
f"Actor {name} not found. Make sure it is created before accessing."
124-
) from exc
119+
120+
def ready(self) -> bool:
121+
return True
125122

126123

127124
class RemoteKVStorageProxy(BaseKVStorage):
128-
def __init__(self, namespace: str):
125+
def __init__(self, actor_handle: ray.actor.ActorHandle):
129126
super().__init__()
130-
self.namespace = namespace
131-
self.actor_name = f"Actor_KV_{namespace}"
132-
self.actor = get_actor_handle(self.actor_name)
127+
self.actor = actor_handle
133128

134129
def data(self) -> Dict[str, Any]:
135130
return ray.get(self.actor.data.remote())
@@ -163,11 +158,9 @@ def reload(self):
163158

164159

165160
class RemoteGraphStorageProxy(BaseGraphStorage):
166-
def __init__(self, namespace: str):
161+
def __init__(self, actor_handle: ray.actor.ActorHandle):
167162
super().__init__()
168-
self.namespace = namespace
169-
self.actor_name = f"Actor_Graph_{namespace}"
170-
self.actor = get_actor_handle(self.actor_name)
163+
self.actor = actor_handle
171164

172165
def index_done_callback(self):
173166
return ray.get(self.actor.index_done_callback.remote())
@@ -235,27 +228,23 @@ class StorageFactory:
235228
def create_storage(backend: str, working_dir: str, namespace: str):
236229
if backend in ["json_kv", "rocksdb"]:
237230
actor_name = f"Actor_KV_{namespace}"
238-
try:
239-
ray.get_actor(actor_name)
240-
except ValueError:
241-
ray.remote(KVStorageActor).options(
242-
name=actor_name,
243-
lifetime="detached",
244-
get_if_exists=True,
245-
).remote(backend, working_dir, namespace)
246-
return RemoteKVStorageProxy(namespace)
247-
if backend in ["networkx", "kuzu"]:
231+
actor_class = KVStorageActor
232+
proxy_class = RemoteKVStorageProxy
233+
elif backend in ["networkx", "kuzu"]:
248234
actor_name = f"Actor_Graph_{namespace}"
249-
try:
250-
ray.get_actor(actor_name)
251-
except ValueError:
252-
ray.remote(GraphStorageActor).options(
253-
name=actor_name,
254-
lifetime="detached",
255-
get_if_exists=True,
256-
).remote(backend, working_dir, namespace)
257-
return RemoteGraphStorageProxy(namespace)
258-
raise ValueError(f"Unknown storage backend: {backend}")
235+
actor_class = GraphStorageActor
236+
proxy_class = RemoteGraphStorageProxy
237+
else:
238+
raise ValueError(f"Unknown storage backend: {backend}")
239+
try:
240+
actor_handle = ray.get_actor(actor_name)
241+
except ValueError:
242+
actor_handle = ray.remote(actor_class).options(
243+
name=actor_name,
244+
get_if_exists=True,
245+
).remote(backend, working_dir, namespace)
246+
ray.get(actor_handle.ready.remote())
247+
return proxy_class(actor_handle)
259248

260249

261250
def init_storage(backend: str, working_dir: str, namespace: str):

graphgen/engine.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ def __init__(
5252
**ray_init_kwargs,
5353
)
5454
logger.info("Ray Dashboard URL: %s", context.dashboard_url)
55-
55+
5656
self._init_llms()
57-
57+
5858
def _init_llms(self):
5959
self.llm_actors["synthesizer"] = init_llm("synthesizer")
6060
self.llm_actors["trainee"] = init_llm("trainee")

0 commit comments

Comments
 (0)