Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 6 additions & 21 deletions graphgen/graphgen.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from graphgen.models import (
JsonKVStorage,
JsonListStorage,
MetaJsonKVStorage,
NetworkXStorage,
OpenAIClient,
Tokenizer,
Expand Down Expand Up @@ -54,9 +53,6 @@ def __init__(
)
self.trainee_llm_client: BaseLLMWrapper = trainee_llm_client

self.meta_storage: MetaJsonKVStorage = MetaJsonKVStorage(
self.working_dir, namespace="_meta"
)
self.full_docs_storage: JsonKVStorage = JsonKVStorage(
self.working_dir, namespace="full_docs"
)
Expand Down Expand Up @@ -98,11 +94,7 @@ async def read(self, read_config: Dict):
batch = {}
for doc in doc_stream:
doc_id = compute_mm_hash(doc, prefix="doc-")

batch[doc_id] = doc
if batch:
self.full_docs_storage.upsert(batch)
self.full_docs_storage.index_done_callback()

# TODO: configurable whether to use coreference resolution

Expand All @@ -120,7 +112,7 @@ async def chunk(self, chunk_config: Dict):
chunk documents into smaller pieces from full_docs_storage if not already present
"""

new_docs = self.meta_storage.get_new_data(self.full_docs_storage)
new_docs = self.full_docs_storage.get_all()
if len(new_docs) == 0:
logger.warning("All documents are already in the storage")
return
Expand All @@ -143,16 +135,15 @@ async def chunk(self, chunk_config: Dict):

self.chunks_storage.upsert(inserting_chunks)
self.chunks_storage.index_done_callback()
self.meta_storage.mark_done(self.full_docs_storage)
self.meta_storage.index_done_callback()

@async_to_sync_method
async def build_kg(self):
"""
build knowledge graph from text chunks
"""
# Step 1: get new chunks according to meta and chunks storage
inserting_chunks = self.meta_storage.get_new_data(self.chunks_storage)
# Step 1: get new chunks
inserting_chunks = self.chunks_storage.get_all()

if len(inserting_chunks) == 0:
logger.warning("All chunks are already in the storage")
return
Expand All @@ -169,18 +160,16 @@ async def build_kg(self):
logger.warning("No entities or relations extracted from text chunks")
return

# Step 3: mark meta
# Step 3: upsert new entities and relations to the graph storage
self.graph_storage.index_done_callback()
self.meta_storage.mark_done(self.chunks_storage)
self.meta_storage.index_done_callback()

return _add_entities_and_relations

@async_to_sync_method
async def search(self, search_config: Dict):
logger.info("[Search] %s ...", ", ".join(search_config["data_sources"]))

seeds = self.meta_storage.get_new_data(self.full_docs_storage)
seeds = self.full_docs_storage.get_all()
if len(seeds) == 0:
logger.warning("All documents are already been searched")
return
Expand All @@ -198,8 +187,6 @@ async def search(self, search_config: Dict):
return
self.search_storage.upsert(search_results)
self.search_storage.index_done_callback()
self.meta_storage.mark_done(self.full_docs_storage)
self.meta_storage.index_done_callback()

@async_to_sync_method
async def quiz_and_judge(self, quiz_and_judge_config: Dict):
Expand Down Expand Up @@ -268,8 +255,6 @@ async def extract(self, extract_config: Dict):

self.extract_storage.upsert(results)
self.extract_storage.index_done_callback()
self.meta_storage.mark_done(self.chunks_storage)
self.meta_storage.index_done_callback()

@async_to_sync_method
async def generate(self, generate_config: Dict):
Expand Down
2 changes: 1 addition & 1 deletion graphgen/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,5 @@
from .searcher.web.bing_search import BingSearch
from .searcher.web.google_search import GoogleSearch
from .splitter import ChineseRecursiveTextSplitter, RecursiveCharacterSplitter
from .storage import JsonKVStorage, JsonListStorage, MetaJsonKVStorage, NetworkXStorage
from .storage import JsonKVStorage, JsonListStorage, NetworkXStorage
from .tokenizer import Tokenizer
2 changes: 1 addition & 1 deletion graphgen/models/storage/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from .json_storage import JsonKVStorage, JsonListStorage, MetaJsonKVStorage
from .json_storage import JsonKVStorage, JsonListStorage
from .networkx_storage import NetworkXStorage
20 changes: 0 additions & 20 deletions graphgen/models/storage/json_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,23 +92,3 @@ def upsert(self, data: list):

def drop(self):
self._data = []


@dataclass
class MetaJsonKVStorage(JsonKVStorage):
def __post_init__(self):
self._file_name = os.path.join(self.working_dir, f"{self.namespace}.json")
self._data = load_json(self._file_name) or {}
logger.info("Load KV %s with %d data", self.namespace, len(self._data))

def get_new_data(self, storage_instance: "JsonKVStorage") -> dict:
new_data = {}
for k, v in storage_instance.data.items():
if k not in self._data:
new_data[k] = v
return new_data

def mark_done(self, storage_instance: "JsonKVStorage"):
new_data = self.get_new_data(storage_instance)
if new_data:
self._data.update(new_data)