3
3
import logging
4
4
from asyncio import run
5
5
from threading import Thread
6
+ import threading
6
7
7
8
from fastapi import HTTPException , status
9
+ from langchain_core .documents import Document
8
10
9
11
from admin_api_lib .api_endpoints .confluence_loader import ConfluenceLoader
10
12
from admin_api_lib .api_endpoints .document_deleter import DocumentDeleter
@@ -81,7 +83,6 @@ def __init__(
81
83
self ._extractor_api = extractor_api
82
84
self ._rag_api = rag_api
83
85
self ._settings = settings
84
- self ._sanitize_document_name ()
85
86
self ._key_value_store = key_value_store
86
87
self ._information_mapper = information_mapper
87
88
self ._information_enhancer = information_enhancer
@@ -100,10 +101,16 @@ async def aload_from_confluence(self) -> None:
100
101
HTTPException
101
102
If the Confluence loader is not configured or if a load is already in progress.
102
103
"""
103
- if not (self ._settings .url .strip () and self ._settings .space_key .strip () and self ._settings .token .strip ()):
104
- raise HTTPException (
105
- status .HTTP_501_NOT_IMPLEMENTED , "The confluence loader is not configured! Required fields are missing."
106
- )
104
+ for index in range (len (self ._settings .url )):
105
+ if not (
106
+ self ._settings .url [index ].strip ()
107
+ and self ._settings .space_key [index ].strip ()
108
+ and self ._settings .token [index ].strip ()
109
+ ):
110
+ raise HTTPException (
111
+ status .HTTP_501_NOT_IMPLEMENTED ,
112
+ "The confluence loader is not configured! Required fields are missing." ,
113
+ )
107
114
108
115
if self ._background_thread is not None and self ._background_thread .is_alive ():
109
116
raise HTTPException (
@@ -113,51 +120,76 @@ async def aload_from_confluence(self) -> None:
113
120
self ._background_thread .start ()
114
121
115
122
async def _aload_from_confluence (self ) -> None :
116
- params = self ._settings_mapper .map_settings_to_params (self ._settings )
123
+ async def process_confluence (index ):
124
+ logger .info ("Loading from Confluence %s" , self ._settings .url [index ])
125
+ self ._sanitize_document_name (index = index )
126
+
127
+ params = self ._settings_mapper .map_settings_to_params (self ._settings , index )
128
+ try :
129
+ self ._key_value_store .upsert (self ._settings .document_name [index ], Status .PROCESSING )
130
+ information_pieces = self ._extractor_api .extract_from_confluence_post (params )
131
+ documents = [
132
+ self ._information_mapper .extractor_information_piece2document (x ) for x in information_pieces
133
+ ]
134
+ documents = await self ._aenhance_langchain_documents (documents )
135
+ chunked_documents = self ._chunker .chunk (documents )
136
+ rag_information_pieces = [
137
+ self ._information_mapper .document2rag_information_piece (doc ) for doc in chunked_documents
138
+ ]
139
+ except Exception as e :
140
+ self ._key_value_store .upsert (self ._settings .document_name [index ], Status .ERROR )
141
+
142
+ logger .error ("Error while loading from Confluence: %s" , str (e ))
143
+ raise HTTPException (
144
+ status .HTTP_500_INTERNAL_SERVER_ERROR , f"Error loading from Confluence: { str (e )} "
145
+ ) from e
146
+
147
+ await self ._delete_previous_information_pieces (index = index )
148
+ self ._key_value_store .upsert (self ._settings .document_name [index ], Status .UPLOADING )
149
+ self ._upload_information_pieces (rag_information_pieces , index = index )
150
+
151
+ threads = []
152
+ for idx in range (len (self ._settings .url )):
153
+ t = threading .Thread (target = lambda idx = idx : run (process_confluence (idx )))
154
+ threads .append (t )
155
+ t .start ()
156
+ for t in threads :
157
+ t .join ()
158
+
159
+ async def _aenhance_langchain_documents (self , documents : list [Document ]):
117
160
try :
118
- self ._key_value_store .upsert (self ._settings .document_name , Status .PROCESSING )
119
- information_pieces = self ._extractor_api .extract_from_confluence_post (params )
120
- documents = [self ._information_mapper .extractor_information_piece2document (x ) for x in information_pieces ]
121
- chunked_documents = self ._chunker .chunk (documents )
122
- rag_information_pieces = [
123
- self ._information_mapper .document2rag_information_piece (doc ) for doc in chunked_documents
124
- ]
161
+ return await self ._information_enhancer .ainvoke (documents )
125
162
except Exception as e :
126
- self ._key_value_store .upsert (self ._settings .document_name , Status .ERROR )
127
- logger .error ("Error while loading from Confluence: %s" , str (e ))
128
- raise HTTPException (
129
- status .HTTP_500_INTERNAL_SERVER_ERROR , f"Error loading from Confluence: { str (e )} "
130
- ) from e
131
-
132
- await self ._delete_previous_information_pieces ()
133
- self ._key_value_store .upsert (self ._settings .document_name , Status .UPLOADING )
134
- self ._upload_information_pieces (rag_information_pieces )
163
+ logger .error ("Exception occured while enhancing confluence langchain document %s" % e )
164
+ raise e
135
165
136
- async def _delete_previous_information_pieces (self ):
166
+ async def _delete_previous_information_pieces (self , index = 0 ):
137
167
try :
138
- await self ._document_deleter .adelete_document (self ._settings .document_name )
168
+ await self ._document_deleter .adelete_document (self ._settings .document_name [ index ] )
139
169
except HTTPException as e :
140
170
logger .error (
141
171
(
142
172
"Error while trying to delete documents with id: %s before uploading %s."
143
173
"NOTE: Still continuing with upload."
144
174
),
145
- self ._settings .document_name ,
175
+ self ._settings .document_name [ index ] ,
146
176
e ,
147
177
)
148
178
149
- def _upload_information_pieces (self , rag_api_documents ):
179
+ def _upload_information_pieces (self , rag_api_documents , index = 0 ):
150
180
try :
151
181
self ._rag_api .upload_information_piece (rag_api_documents )
152
- self ._key_value_store .upsert (self ._settings .document_name , Status .READY )
182
+ self ._key_value_store .upsert (self ._settings .document_name [ index ] , Status .READY )
153
183
logger .info ("Confluence loaded successfully" )
154
184
except Exception as e :
155
- self ._key_value_store .upsert (self ._settings .document_name , Status .ERROR )
185
+ self ._key_value_store .upsert (self ._settings .document_name [ index ] , Status .ERROR )
156
186
logger .error ("Error while uploading Confluence to the database: %s" , str (e ))
157
187
raise HTTPException (500 , f"Error loading from Confluence: { str (e )} " ) from e
158
188
159
- def _sanitize_document_name (self ) -> None :
160
- document_name = self ._settings .document_name if self ._settings .document_name else self ._settings .url
189
+ def _sanitize_document_name (self , index ) -> None :
190
+ document_name = (
191
+ self ._settings .document_name [index ] if self ._settings .document_name [index ] else self ._settings .url [index ]
192
+ )
161
193
document_name = document_name .replace ("http://" , "" ).replace ("https://" , "" )
162
194
163
- self ._settings .document_name = sanitize_document_name (document_name )
195
+ self ._settings .document_name [ index ] = sanitize_document_name (document_name )
0 commit comments