@@ -66,3 +66,57 @@ async def partition_kg(
6666 if image_data :
6767 node_data ["images" ] = image_data
6868 return batches
69+
70+
71+ async def attach_additional_data_to_node (
72+ batches : list [
73+ tuple [
74+ list [tuple [str , dict ]], list [tuple [Any , Any , dict ] | tuple [Any , Any , Any ]]
75+ ]
76+ ],
77+ chunk_storage : BaseKVStorage ,
78+ ) -> list [
79+ tuple [list [tuple [str , dict ]], list [tuple [Any , Any , dict ] | tuple [Any , Any , Any ]]]
80+ ]:
81+ """
82+ Attach additional data from chunk_storage to nodes in the batches.
83+ :param batches:
84+ :param chunk_storage:
85+ :return:
86+ """
87+ for batch in batches :
88+ for node_id , node_data in batch [0 ]:
89+ await _attach_by_type (node_id , node_data , chunk_storage )
90+ return batches
91+
92+
93+ async def _attach_by_type (
94+ node_id : str ,
95+ node_data : dict ,
96+ chunk_storage : BaseKVStorage ,
97+ ) -> None :
98+ """
99+ Attach additional data to the node based on its entity type.
100+ """
101+ entity_type = (node_data .get ("entity_type" ) or "" ).lower ()
102+ if not entity_type :
103+ return
104+
105+ source_ids = [
106+ sid .strip ()
107+ for sid in node_data .get ("source_id" , "" ).split ("<SEP>" )
108+ if sid .strip ()
109+ ]
110+
111+ # Handle images
112+ if "image" in entity_type :
113+ image_chunks = [
114+ data
115+ for sid in source_ids
116+ if "image" in sid .lower () and (data := await chunk_storage .get_by_id (sid ))
117+ ]
118+ if image_chunks :
119+ # The generator expects a dictionary with an 'img_path' key, not a list of captions.
120+ # We'll use the first image chunk found for this node.
121+ node_data ["images" ] = image_chunks [0 ]
122+ logger .debug ("Attached image data to node %s" , node_id )
0 commit comments