-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathutils.py
More file actions
477 lines (404 loc) · 14.9 KB
/
utils.py
File metadata and controls
477 lines (404 loc) · 14.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
import logging
import yaml
from yaml.loader import SafeLoader
import os
from pathlib import Path
import re
import shutil
import urllib
import unicodedata
import tiktoken
import docx
import PyPDF2
import html2text
import html
import json
import pandas as pd
from bs4 import BeautifulSoup
ROOT = os.path.dirname(os.path.abspath(__file__))
USER = os.path.join(os.path.expanduser("~"), "braindoor/")
if not os.path.exists(USER):
os.makedirs(USER)
log_path = os.path.join(USER, "run.log")
temp_path = os.path.join(ROOT, "temp/")
config_path = os.path.join(USER, "config.yaml")
HISTORY = os.path.join(USER, "history/")
TEMP = os.path.join(ROOT, "temp")
HOST = "http://127.0.0.1:7860"
def get_logger(log_path=log_path):
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
file_handler = logging.FileHandler(log_path,encoding='utf-8')
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
file_handler.setFormatter(formatter)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
logger = get_logger()
tiktoken_encoder = tiktoken.get_encoding("cl100k_base")
def remove_markdown(text):
text = re.sub(r"#\s+", "", text)
text = re.sub(r"(\*|_)\w+(\*|_)", "", text)
text = re.sub(r"(\*\*|__)\w+(\*\*|__)", "", text)
text = re.sub(r"~~\w+~~", "", text)
text = re.sub(r"```.*```", "", text, flags=re.DOTALL)
text = re.sub(r"`.*`", "", text)
text = re.sub(r"\[.*\]\(.*\)", "", text)
text = re.sub(r">\s+", "", text)
text = re.sub(r"-\s+", "", text)
text = re.sub(r"\d+\.\s+", "", text)
return text
def remove_asklink(html):
html = re.sub(r'<a[^>]*class="asklink"[^>]*>.*?</a>', "", html)
return html
# This tool copies html files to a temporary directory for viewing
def copy_html(html_path, save_root=temp_path):
try:
html_path = Path(html_path)
html_dir = html_path.parent
save_root = Path(save_root)
with open(html_path, "r", encoding="utf-8") as f:
html = f.read()
soup = BeautifulSoup(html, "html.parser")
img_tags = soup.find_all("img")
img_urls = []
for img_tag in img_tags:
img_url = img_tag["src"]
img_url = urllib.parse.unquote(img_url)
if not img_url.startswith("http"):
img_urls.append(img_url)
if not os.path.exists(save_root):
os.makedirs(save_root)
for img_url in img_urls:
try:
dst_dir = save_root.joinpath(Path(img_url).parent)
if not os.path.exists(dst_dir):
os.makedirs(dst_dir)
img_dst_path = os.path.join(save_root, img_url)
img_src_path = html_dir.joinpath(img_url)
shutil.copyfile(img_src_path, img_dst_path)
except Exception as e:
pass
shutil.copy2(html_path, save_root)
except Exception as e:
print(f"copy html error: {e}")
def set_proxy():
with open(config_path, "r", encoding="utf-8") as f:
opt = yaml.load(f, Loader=SafeLoader)
proxy_address = opt.get("proxy", None)
if proxy_address:
os.environ["http_proxy"] = f"{proxy_address}"
os.environ["https_proxy"] = f"{proxy_address}"
def del_proxy():
if os.environ.get("http_proxy"):
del os.environ["http_proxy"]
del os.environ["https_proxy"]
def html_escape(text):
text = html.escape(text)
text = text.replace(" ", " ")
return text
def txt2html(text):
p = re.compile(r"(```)(.*?)(```)", re.DOTALL)
# text = p.sub(lambda m: m.group(1) + html.escape(m.group(2)) + m.group(3), text)
text = p.sub(lambda m: m.group(1) + html_escape(m.group(2)) + m.group(3), text)
# text = text.replace(" ", " ")
text = text.replace("\n", "<br>")
text = re.sub(
r"```(.+?)```",
r"<code><div class='codebox'>\1</div></code>",
text,
flags=re.DOTALL,
)
# text = re.sub(r"`(.+?)`", r"<code>\1</code>", text, flags=re.DOTALL)
return text
def html2txt(text):
text = text.replace("<code><div class='codebox'>", "```")
text = text.replace("</div></code>", "```")
text = text.replace("<br>", "\n")
text = text.replace(" ", " ")
text = html.unescape(text)
text = unicodedata.normalize("NFKC", text)
return text
def read_docx(filename):
doc = docx.Document(filename)
fullText = []
for para in doc.paragraphs:
fullText.append(para.text)
return "\n".join(fullText)
def read_pdf(filename):
with open(filename, "rb") as file:
reader = PyPDF2.PdfReader(file)
num_pages = len(reader.pages)
contents = []
for i in range(0, num_pages):
page = reader.pages[i]
contents.append(page.extract_text())
return "\n".join(contents)
def read_html(filename):
with open(filename, "r", encoding="utf-8") as f:
html = f.read()
text = html2text.HTML2Text().handle(html)
return text
def read_text_file(file_path):
file_type = (Path(file_path).suffix).lower()
if file_type in [".md", ".txt"]:
with open(file_path, "r", encoding="utf-8") as f:
text = f.read()
elif file_type in [".docx"]:
text = read_docx(file_path)
elif file_type in [".pdf"]:
text = read_pdf(file_path)
elif file_type in [".html"]:
text = read_html(file_path)
else:
raise TypeError("Expected a md,txt,docx,pdf or html file")
return text
def get_last_log():
try:
with open(log_path, "r", encoding='utf-8') as f:
lines = f.readlines()
last_line = lines[-1]
except Exception as e:
last_line = f"get log error: {e}"
return last_line
def cutoff_localtext(local_text, max_len=2000):
code = tiktoken_encoder.encode(local_text)
if len(code) > max_len:
code = code[:max_len]
local_text = tiktoken_encoder.decode(code)
return local_text
def save_page(chat_id, context, dir="ask"):
path = Path(f"{HISTORY}/ask")
if not os.path.exists(path):
os.makedirs(path)
with open(Path(f"{HISTORY}/ask/{chat_id}.json"), "w", encoding="utf-8") as f:
json.dump(context, f, ensure_ascii=False, indent=4)
if dir == "review":
path = Path(f"{HISTORY}/{dir}")
if not os.path.exists(path):
os.makedirs(path)
with open(Path(f"{HISTORY}/{dir}/{chat_id}.json"), "w", encoding="utf-8") as f:
json.dump(context, f, ensure_ascii=False, indent=4)
def save_review_chunk(chat_id, chunks):
path = Path(f"{HISTORY}/review")
if not os.path.exists(path):
os.makedirs(path)
with open(Path(f"{HISTORY}/review/{chat_id}.chunk"), "w", encoding="utf-8") as f:
json.dump(chunks, f, ensure_ascii=False, indent=4)
def get_history_pages(dir="ask"):
history_dir = Path(f"{HISTORY}/{dir}")
if not os.path.exists(history_dir):
os.makedirs(history_dir)
history_jsons = sorted(
[f for f in os.listdir(history_dir) if f.endswith(".json")],
key=lambda x: os.path.getmtime(os.path.join(history_dir, x)),
reverse=True,
)
return history_jsons
def load_context(chat_id, dir="ask"):
try:
with open(Path(f"{HISTORY}/{dir}/{chat_id}.json"), "r", encoding="utf-8") as f:
context = json.load(f)
except:
context = []
return context
def load_review_chunk(chat_id):
try:
with open(
Path(f"{HISTORY}/review/{chat_id}.chunk"), "r", encoding="utf-8"
) as f:
chunks = json.load(f)
except:
chunks = []
return chunks
def del_page(chat_id, dir="ask"):
if os.path.exists(Path(f"{HISTORY}/ask/{chat_id}.json")):
os.remove(Path(f"{HISTORY}/ask/{chat_id}.json"))
if os.path.exists(Path(f"{HISTORY}/review/{chat_id}.json")):
os.remove(Path(f"{HISTORY}/review/{chat_id}.json"))
if os.path.exists(Path(f"{HISTORY}/review/{chat_id}.chunk")):
os.remove(Path(f"{HISTORY}/review/{chat_id}.chunk"))
return True
else:
return False
def parse_codeblock(text):
lines = text.split("\n")
for i, line in enumerate(lines):
if "```" in line:
if line != "```":
lines[i] = f'<pre><code class="{lines[i][3:]}">'
else:
lines[i] = "</code></pre>"
else:
if i > 0:
lines[i] = "<br/>" + line.replace("<", "<").replace(">", ">")
return "".join(lines)
def format_chat_text(text):
if isinstance(text, str):
text = parse_codeblock(text)
# text = md2html(text)
return text
elif isinstance(text, list):
for i, line in enumerate(text):
text[i][0] = format_chat_text(line[0])
text[i][1] = format_chat_text(line[1])
return text
def cutoff_context(context, mygpt):
truncated_context = []
context_len = 0
for q, a in reversed(context):
q = str(q)
a = str(a)
# remove local link
a = remove_asklink(a)
# remove etag
for etag in mygpt.all_etags["name"]:
q = q.replace(f"#{etag}", "")
# remove frontslot
a = re.sub(r"<frontslot>.*?</frontslot>", "", a)
# remove rearslot
a = re.sub(r"<rearslot>.*?</rearslot>", "", a)
qa_len = len(tiktoken_encoder.encode(q + a))
if qa_len + context_len < mygpt.opt["max_context"]:
context_len += qa_len
truncated_context.insert(0, (q, a))
else:
break
return truncated_context
def create_links(mydocs, frontend, dir_name, mygpt):
links = list()
i = 1
path_list = list()
for doc in mydocs:
score = doc[1]
content = doc[0].page_content
if score < float(mygpt.opt["max_l2"]):
file_path = Path(doc[0].metadata["file_path"])
if not os.path.exists(TEMP):
os.mkdir(TEMP)
reference_path = os.path.join(TEMP, f"reference-{i}.txt")
with open(reference_path, "w", encoding="utf-8") as f:
f.write(content)
if not file_path in path_list:
try:
if file_path.suffix == ".html":
copy_html(file_path)
else:
shutil.copy2(file_path, dir_name)
except Exception as e:
logger.error(e)
links.append(
f'<a href="{HOST}/file/temp/reference-{i}.txt" class="asklink" title="Open text snippet {score:.3}">[{i}] </a> '
)
links.append(
f'<a href="{HOST}/file/temp/{file_path.name}" class="asklink" title="Open full text">{file_path.stem}</a><br>'
)
path_list.append(file_path)
else:
index = links.index(
f'<a href="{HOST}/file/temp/{file_path.name}" class="asklink" title="Open full text">{file_path.stem}</a><br>'
)
links.insert(
index,
f'<a href="{HOST}/file/temp/reference-{i}.txt" class="asklink" title="Open text snippet {score:.3}">[{i}]</a> ',
)
i += 1
if len(links) > 0:
links = "".join(links)
links = f"<rearslot>{links}</rearslot>"
else:
links = ""
return links
# 更新新版本的配置文件
def update_config(dir_a, dir_b, filename="config.yaml"):
path_a = os.path.join(dir_a, filename)
path_b = os.path.join(dir_b, filename)
if not os.path.exists(path_a):
print(f"{path_a} does not exist.")
return
if not os.path.exists(path_b):
shutil.copy(path_a, path_b)
else:
with open(path_a, "r", encoding='utf-8') as f:
config_a = yaml.safe_load(f)
with open(path_b, "r", encoding='utf-8') as f:
config_b = yaml.safe_load(f)
# 如果b中存在和a相同的字段,保持不变
# 如果a中存在b中不存在的字段,那么给b添加该字段和值
for key, value in config_a.items():
if key not in config_b:
config_b[key] = value
with open(path_b, "w", encoding='utf-8') as f:
yaml.safe_dump(config_b, f, allow_unicode=True)
# 把根目录下的默认etag文件复制到用户目录下
def update_etag(src_dir, dst_dir):
filelist = Path(src_dir).rglob("*.*")
for file in filelist:
if (
file.name == ".DS_Store"
or ".ipynb_checkpoints" in str(file)
or "__pycache__" in str(file)
):
continue
if file.name == "config.yaml":
parent = file.parts[-2]
if not os.path.exists(Path(dst_dir)/parent):
os.makedirs(Path(dst_dir)/parent)
update_config(Path(src_dir)/parent, Path(dst_dir)/parent)
continue
src_file = file
dst_file = str(src_file).replace(src_dir, dst_dir)
dst_file = Path(dst_file)
os.makedirs(str(dst_file.parent), exist_ok=True)
try:
shutil.copy(src_file, dst_file)
except Exception as e:
logger.error(e)
class TokenSplitter:
def __init__(self, chunk_size, chunk_overlap, len_fn):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.len_fn = len_fn
def split_string(self, string, n):
size = len(string) // n
pieces = [string[i : i + size] for i in range(0, len(string), size)]
if len(pieces) > n:
pieces[-2] += pieces[-1]
pieces = pieces[:-1]
return pieces
def split_text(self, text):
token_len = self.len_fn(text)
n_chunk = int(token_len / self.chunk_size) + 1
n_piece = n_chunk * 10
pieces = self.split_string(text, n_piece)
chunks = []
_chunk = ""
for i, p in enumerate(pieces):
if len(tiktoken_encoder.encode(_chunk + p)) > self.chunk_size:
chunks.append(_chunk)
_chunk = p
if self.chunk_overlap > 0:
while (
len(tiktoken_encoder.encode(pieces[i - 1] + _chunk))
< self.chunk_overlap
):
_chunk = pieces[i - 1] + _chunk
i = i - 1
else:
_chunk += p
chunks.append(_chunk)
return chunks
# 历史记录查询,从所有历史记录中过滤出包含query的记录
def histroy_filter(query):
pages = get_history_pages()
result = []
for page in pages:
json_path = os.path.join(HISTORY, 'ask',page)
with open(json_path, "r", encoding="utf-8") as f:
data = json.load(f)
if query in str(data):
result.append((page, data[0][0], pages.index(page)))
result = pd.DataFrame(result, columns=["page", "title", "index"])
return result