From 87030ad479db24b5e2ef0ce9e5f1bbf7e8295a54 Mon Sep 17 00:00:00 2001 From: chenzihong-gavin Date: Wed, 26 Nov 2025 19:35:21 +0800 Subject: [PATCH 1/2] refactor: delete meta storage --- graphgen/graphgen.py | 27 ++++++------------------- graphgen/models/storage/json_storage.py | 20 ------------------ 2 files changed, 6 insertions(+), 41 deletions(-) diff --git a/graphgen/graphgen.py b/graphgen/graphgen.py index f4e222eb..167981e9 100644 --- a/graphgen/graphgen.py +++ b/graphgen/graphgen.py @@ -9,7 +9,6 @@ from graphgen.models import ( JsonKVStorage, JsonListStorage, - MetaJsonKVStorage, NetworkXStorage, OpenAIClient, Tokenizer, @@ -54,9 +53,6 @@ def __init__( ) self.trainee_llm_client: BaseLLMWrapper = trainee_llm_client - self.meta_storage: MetaJsonKVStorage = MetaJsonKVStorage( - self.working_dir, namespace="_meta" - ) self.full_docs_storage: JsonKVStorage = JsonKVStorage( self.working_dir, namespace="full_docs" ) @@ -98,11 +94,7 @@ async def read(self, read_config: Dict): batch = {} for doc in doc_stream: doc_id = compute_mm_hash(doc, prefix="doc-") - batch[doc_id] = doc - if batch: - self.full_docs_storage.upsert(batch) - self.full_docs_storage.index_done_callback() # TODO: configurable whether to use coreference resolution @@ -120,7 +112,7 @@ async def chunk(self, chunk_config: Dict): chunk documents into smaller pieces from full_docs_storage if not already present """ - new_docs = self.meta_storage.get_new_data(self.full_docs_storage) + new_docs = self.full_docs_storage.get_all() if len(new_docs) == 0: logger.warning("All documents are already in the storage") return @@ -143,16 +135,15 @@ async def chunk(self, chunk_config: Dict): self.chunks_storage.upsert(inserting_chunks) self.chunks_storage.index_done_callback() - self.meta_storage.mark_done(self.full_docs_storage) - self.meta_storage.index_done_callback() @async_to_sync_method async def build_kg(self): """ build knowledge graph from text chunks """ - # Step 1: get new chunks according to meta and chunks storage - inserting_chunks = self.meta_storage.get_new_data(self.chunks_storage) + # Step 1: get new chunks + inserting_chunks = self.chunks_storage.get_all() + if len(inserting_chunks) == 0: logger.warning("All chunks are already in the storage") return @@ -169,10 +160,8 @@ async def build_kg(self): logger.warning("No entities or relations extracted from text chunks") return - # Step 3: mark meta + # Step 3: upsert new entities and relations to the graph storage self.graph_storage.index_done_callback() - self.meta_storage.mark_done(self.chunks_storage) - self.meta_storage.index_done_callback() return _add_entities_and_relations @@ -180,7 +169,7 @@ async def build_kg(self): async def search(self, search_config: Dict): logger.info("[Search] %s ...", ", ".join(search_config["data_sources"])) - seeds = self.meta_storage.get_new_data(self.full_docs_storage) + seeds = self.full_docs_storage.get_all() if len(seeds) == 0: logger.warning("All documents are already been searched") return @@ -198,8 +187,6 @@ async def search(self, search_config: Dict): return self.search_storage.upsert(search_results) self.search_storage.index_done_callback() - self.meta_storage.mark_done(self.full_docs_storage) - self.meta_storage.index_done_callback() @async_to_sync_method async def quiz_and_judge(self, quiz_and_judge_config: Dict): @@ -268,8 +255,6 @@ async def extract(self, extract_config: Dict): self.extract_storage.upsert(results) self.extract_storage.index_done_callback() - self.meta_storage.mark_done(self.chunks_storage) - self.meta_storage.index_done_callback() @async_to_sync_method async def generate(self, generate_config: Dict): diff --git a/graphgen/models/storage/json_storage.py b/graphgen/models/storage/json_storage.py index ed5c6467..53962117 100644 --- a/graphgen/models/storage/json_storage.py +++ b/graphgen/models/storage/json_storage.py @@ -92,23 +92,3 @@ def upsert(self, data: list): def drop(self): self._data = [] - - -@dataclass -class MetaJsonKVStorage(JsonKVStorage): - def __post_init__(self): - self._file_name = os.path.join(self.working_dir, f"{self.namespace}.json") - self._data = load_json(self._file_name) or {} - logger.info("Load KV %s with %d data", self.namespace, len(self._data)) - - def get_new_data(self, storage_instance: "JsonKVStorage") -> dict: - new_data = {} - for k, v in storage_instance.data.items(): - if k not in self._data: - new_data[k] = v - return new_data - - def mark_done(self, storage_instance: "JsonKVStorage"): - new_data = self.get_new_data(storage_instance) - if new_data: - self._data.update(new_data) From b9901adf9f7fbbb5038211bd82d033e8b31c22fb Mon Sep 17 00:00:00 2001 From: chenzihong-gavin Date: Wed, 26 Nov 2025 19:43:27 +0800 Subject: [PATCH 2/2] fix: fix lint problem --- graphgen/models/__init__.py | 2 +- graphgen/models/storage/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/graphgen/models/__init__.py b/graphgen/models/__init__.py index 68fd2a5d..cc8dfd90 100644 --- a/graphgen/models/__init__.py +++ b/graphgen/models/__init__.py @@ -31,5 +31,5 @@ from .searcher.web.bing_search import BingSearch from .searcher.web.google_search import GoogleSearch from .splitter import ChineseRecursiveTextSplitter, RecursiveCharacterSplitter -from .storage import JsonKVStorage, JsonListStorage, MetaJsonKVStorage, NetworkXStorage +from .storage import JsonKVStorage, JsonListStorage, NetworkXStorage from .tokenizer import Tokenizer diff --git a/graphgen/models/storage/__init__.py b/graphgen/models/storage/__init__.py index 99fba3ba..56338984 100644 --- a/graphgen/models/storage/__init__.py +++ b/graphgen/models/storage/__init__.py @@ -1,2 +1,2 @@ -from .json_storage import JsonKVStorage, JsonListStorage, MetaJsonKVStorage +from .json_storage import JsonKVStorage, JsonListStorage from .networkx_storage import NetworkXStorage