1616 Tokenizer ,
1717)
1818from graphgen .operators import (
19- build_kg ,
19+ build_mm_kg ,
20+ build_text_kg ,
2021 chunk_documents ,
2122 generate_qas ,
2223 judge_statement ,
2526 read_files ,
2627 search_all ,
2728)
28- from graphgen .utils import async_to_sync_method , compute_content_hash , logger
29+ from graphgen .utils import async_to_sync_method , compute_mm_hash , logger
2930
3031sys_path = os .path .abspath (os .path .join (os .path .dirname (__file__ ), ".." ))
3132
@@ -68,8 +69,8 @@ def __post_init__(self):
6869 self .full_docs_storage : JsonKVStorage = JsonKVStorage (
6970 self .working_dir , namespace = "full_docs"
7071 )
71- self .text_chunks_storage : JsonKVStorage = JsonKVStorage (
72- self .working_dir , namespace = "text_chunks "
72+ self .chunks_storage : JsonKVStorage = JsonKVStorage (
73+ self .working_dir , namespace = "chunks "
7374 )
7475 self .graph_storage : NetworkXStorage = NetworkXStorage (
7576 self .working_dir , namespace = "graph"
@@ -96,70 +97,122 @@ async def insert(self, read_config: Dict, split_config: Dict):
9697 logger .warning ("No data to process" )
9798 return
9899
100+ assert isinstance (data , list ) and isinstance (data [0 ], dict )
101+
99102 # TODO: configurable whether to use coreference resolution
100103
101- # Step 2: Split chunks and filter existing ones
102- assert isinstance (data , list ) and isinstance (data [0 ], dict )
103- new_docs = {
104- compute_content_hash (doc ["content" ], prefix = "doc-" ): {
105- "content" : doc ["content" ]
106- }
107- for doc in data
108- if doc .get ("type" , "text" ) == "text"
109- }
104+ new_docs = {compute_mm_hash (doc , prefix = "doc-" ): doc for doc in data }
110105 _add_doc_keys = await self .full_docs_storage .filter_keys (list (new_docs .keys ()))
111106 new_docs = {k : v for k , v in new_docs .items () if k in _add_doc_keys }
107+ new_text_docs = {k : v for k , v in new_docs .items () if v .get ("type" ) == "text" }
108+ new_mm_docs = {k : v for k , v in new_docs .items () if v .get ("type" ) != "text" }
112109
113- if len (new_docs ) == 0 :
114- logger .warning ("All docs are already in the storage" )
115- return
116- logger .info ("[New Docs] inserting %d docs" , len (new_docs ))
110+ await self .full_docs_storage .upsert (new_docs )
117111
118- inserting_chunks = await chunk_documents (
119- new_docs ,
120- split_config ["chunk_size" ],
121- split_config ["chunk_overlap" ],
122- self .tokenizer_instance ,
123- self .progress_bar ,
124- )
112+ async def _insert_text_docs (text_docs ):
113+ if len (text_docs ) == 0 :
114+ logger .warning ("All text docs are already in the storage" )
115+ return
116+ logger .info ("[New Docs] inserting %d text docs" , len (text_docs ))
117+ # Step 2.1: Split chunks and filter existing ones
118+ inserting_chunks = await chunk_documents (
119+ text_docs ,
120+ split_config ["chunk_size" ],
121+ split_config ["chunk_overlap" ],
122+ self .tokenizer_instance ,
123+ self .progress_bar ,
124+ )
125125
126- _add_chunk_keys = await self .text_chunks_storage .filter_keys (
127- list (inserting_chunks .keys ())
128- )
129- inserting_chunks = {
130- k : v for k , v in inserting_chunks .items () if k in _add_chunk_keys
131- }
126+ _add_chunk_keys = await self .chunks_storage .filter_keys (
127+ list (inserting_chunks .keys ())
128+ )
129+ inserting_chunks = {
130+ k : v for k , v in inserting_chunks .items () if k in _add_chunk_keys
131+ }
132132
133- if len (inserting_chunks ) == 0 :
134- logger .warning ("All chunks are already in the storage" )
135- return
133+ if len (inserting_chunks ) == 0 :
134+ logger .warning ("All text chunks are already in the storage" )
135+ return
136+
137+ logger .info ("[New Chunks] inserting %d text chunks" , len (inserting_chunks ))
138+ await self .chunks_storage .upsert (inserting_chunks )
139+
140+ # Step 2.2: Extract entities and relations from text chunks
141+ logger .info ("[Text Entity and Relation Extraction] processing ..." )
142+ _add_entities_and_relations = await build_text_kg (
143+ llm_client = self .synthesizer_llm_client ,
144+ kg_instance = self .graph_storage ,
145+ chunks = [
146+ Chunk (id = k , content = v ["content" ], type = "text" )
147+ for k , v in inserting_chunks .items ()
148+ ],
149+ progress_bar = self .progress_bar ,
150+ )
151+ if not _add_entities_and_relations :
152+ logger .warning ("No entities or relations extracted from text chunks" )
153+ return
154+
155+ await self ._insert_done ()
156+ return _add_entities_and_relations
157+
158+ async def _insert_multi_modal_docs (mm_docs ):
159+ if len (mm_docs ) == 0 :
160+ logger .warning ("No multi-modal documents to insert" )
161+ return
162+
163+ logger .info ("[New Docs] inserting %d multi-modal docs" , len (mm_docs ))
164+
165+ # Step 3.1: Transform multi-modal documents into chunks and filter existing ones
166+ inserting_chunks = await chunk_documents (
167+ mm_docs ,
168+ split_config ["chunk_size" ],
169+ split_config ["chunk_overlap" ],
170+ self .tokenizer_instance ,
171+ self .progress_bar ,
172+ )
136173
137- logger .info ("[New Chunks] inserting %d chunks" , len (inserting_chunks ))
138- await self .full_docs_storage .upsert (new_docs )
139- await self .text_chunks_storage .upsert (inserting_chunks )
140-
141- # Step 3: Extract entities and relations from chunks
142- logger .info ("[Entity and Relation Extraction]..." )
143- _add_entities_and_relations = await build_kg (
144- llm_client = self .synthesizer_llm_client ,
145- kg_instance = self .graph_storage ,
146- chunks = [
147- Chunk (id = k , content = v ["content" ]) for k , v in inserting_chunks .items ()
148- ],
149- progress_bar = self .progress_bar ,
150- )
151- if not _add_entities_and_relations :
152- logger .warning ("No entities or relations extracted" )
153- return
174+ _add_chunk_keys = await self .chunks_storage .filter_keys (
175+ list (inserting_chunks .keys ())
176+ )
177+ inserting_chunks = {
178+ k : v for k , v in inserting_chunks .items () if k in _add_chunk_keys
179+ }
154180
155- await self ._insert_done ()
156- return _add_entities_and_relations
181+ if len (inserting_chunks ) == 0 :
182+ logger .warning ("All multi-modal chunks are already in the storage" )
183+ return
184+
185+ logger .info (
186+ "[New Chunks] inserting %d multimodal chunks" , len (inserting_chunks )
187+ )
188+ await self .chunks_storage .upsert (inserting_chunks )
189+
190+ # Step 3.2: Extract multi-modal entities and relations from chunks
191+ logger .info ("[Multi-modal Entity and Relation Extraction] processing ..." )
192+ _add_entities_and_relations = await build_mm_kg (
193+ llm_client = self .synthesizer_llm_client ,
194+ kg_instance = self .graph_storage ,
195+ chunks = [Chunk .from_dict (k , v ) for k , v in inserting_chunks .items ()],
196+ progress_bar = self .progress_bar ,
197+ )
198+ if not _add_entities_and_relations :
199+ logger .warning (
200+ "No entities or relations extracted from multi-modal chunks"
201+ )
202+ return
203+ await self ._insert_done ()
204+ return _add_entities_and_relations
205+
206+ # Step 2: Insert text documents
207+ await _insert_text_docs (new_text_docs )
208+ # Step 3: Insert multi-modal documents
209+ await _insert_multi_modal_docs (new_mm_docs )
157210
158211 async def _insert_done (self ):
159212 tasks = []
160213 for storage_instance in [
161214 self .full_docs_storage ,
162- self .text_chunks_storage ,
215+ self .chunks_storage ,
163216 self .graph_storage ,
164217 self .search_storage ,
165218 ]:
@@ -233,7 +286,10 @@ async def quiz_and_judge(self, quiz_and_judge_config: Dict):
233286 async def generate (self , partition_config : Dict , generate_config : Dict ):
234287 # Step 1: partition the graph
235288 batches = await partition_kg (
236- self .graph_storage , self .tokenizer_instance , partition_config
289+ self .graph_storage ,
290+ self .chunks_storage ,
291+ self .tokenizer_instance ,
292+ partition_config ,
237293 )
238294
239295 # Step 2: generate QA pairs
@@ -255,7 +311,7 @@ async def generate(self, partition_config: Dict, generate_config: Dict):
255311 @async_to_sync_method
256312 async def clear (self ):
257313 await self .full_docs_storage .drop ()
258- await self .text_chunks_storage .drop ()
314+ await self .chunks_storage .drop ()
259315 await self .search_storage .drop ()
260316 await self .graph_storage .clear ()
261317 await self .rephrase_storage .drop ()
0 commit comments