diff --git a/README.md b/README.md index 4c0183c..bd49798 100644 --- a/README.md +++ b/README.md @@ -45,9 +45,37 @@ python3 export_messages.py -s "关键词" # 导出所有会话 python3 export_messages.py --all + +# 按日期范围导出(支持个人聊天 + 群聊,可组合使用) +python3 export_messages.py --all --since 2026-03-02 +python3 export_messages.py --all --since 2026-03-01 --until 2026-03-31 + +# 只导出个人聊天和群聊(过滤公众号) +python3 export_messages.py --all --personal + +# 组合:导出 3 月 2 日至今的个人聊天,仅保留有消息的会话 +python3 export_messages.py --all --personal --since 2026-03-02 -o exported_personal +``` + +> **注意**:微信将个人消息存储在 `message_N.db`,公众号消息存储在 `biz_message_N.db`。首次运行 `find_key_memscan.py` 时,只有已在微信中打开过的对话对应的数据库密钥会被提取到内存中。若发现导出的个人聊天记录不完整,请在微信中打开更多历史对话后重新运行密钥提取脚本,再重新解密。 + +### 5. 合并并清理聊天记录 + +将导出目录中的所有 txt 文件合并为单个文件,同时自动清理图片、语音、视频等消息后附带的二进制乱码: + +```bash +# 合并到默认输出文件(<目录名>_merged.txt) +python3 merge_and_clean.py exported_personal + +# 指定输出文件名 +python3 merge_and_clean.py exported_personal output.txt + +# 合并后自动删除原导出目录 +python3 merge_and_clean.py exported_personal --clean +python3 merge_and_clean.py exported_personal output.txt --clean ``` -### 5. MCP Server(让 AI 直接查询) +### 6. MCP Server(让 AI 直接查询) 安装依赖并注册到 Claude Code: diff --git a/export_messages.py b/export_messages.py index 7ea3e04..3cfe514 100644 --- a/export_messages.py +++ b/export_messages.py @@ -98,14 +98,14 @@ def resolve_username(chat_name, contacts): def get_all_msg_dbs(decrypted_dir): - """Find all message_N.db files (N = 0, 1, 2, ...).""" + """Find all message_N.db or biz_message_N.db files (N = 0, 1, 2, ...).""" import re msg_dir = os.path.join(decrypted_dir, "message") if not os.path.isdir(msg_dir): return [] dbs = [] for f in sorted(os.listdir(msg_dir)): - if re.match(r"^message_\d+\.db$", f): + if re.match(r"^(biz_)?message_\d+\.db$", f): dbs.append(os.path.join(msg_dir, f)) return dbs @@ -249,7 +249,7 @@ def list_conversations(msg_dbs, session_db_path, contacts): return results -def export_chat(msg_dbs, username, contacts, limit=None): +def export_chat(msg_dbs, username, contacts, limit=None, since_ts=None, until_ts=None): """Export messages for a specific conversation from all message DBs.""" table = username_to_table(username) is_group = "@chatroom" in username @@ -263,18 +263,32 @@ def export_chat(msg_dbs, username, contacts, limit=None): try: total = conn.execute(f"SELECT count(*) FROM [{table}]").fetchone()[0] - query = ( - f"SELECT local_id, local_type, create_time, real_sender_id, " - f"message_content, source FROM [{table}] ORDER BY create_time ASC" - ) - if limit: + where_clauses = [] + params = [] + if since_ts is not None: + where_clauses.append("create_time >= ?") + params.append(since_ts) + if until_ts is not None: + where_clauses.append("create_time <= ?") + params.append(until_ts) + where_sql = ("WHERE " + " AND ".join(where_clauses)) if where_clauses else "" + + if limit and not where_clauses: query = ( f"SELECT * FROM (SELECT local_id, local_type, create_time, " f"real_sender_id, message_content, source FROM [{table}] " f"ORDER BY create_time DESC LIMIT {limit}) ORDER BY create_time ASC" ) + rows = conn.execute(query).fetchall() + else: + query = ( + f"SELECT local_id, local_type, create_time, real_sender_id, " + f"message_content, source FROM [{table}] {where_sql} ORDER BY create_time ASC" + ) + if limit: + query += f" LIMIT {limit}" + rows = conn.execute(query, params).fetchall() - rows = conn.execute(query).fetchall() lines = [format_message(r, is_group, contacts) for r in rows] display_name = contacts.get(username, username) @@ -297,12 +311,15 @@ def safe_filename(display_name, username): return name -def export_to_file(msg_dbs, username, output_dir, contacts, limit=None): +def export_to_file(msg_dbs, username, output_dir, contacts, limit=None, since_ts=None, until_ts=None): """Export messages to a text file named by display name.""" - lines, info = export_chat(msg_dbs, username, contacts, limit) + lines, info = export_chat(msg_dbs, username, contacts, limit, since_ts, until_ts) if lines is None: return False, info + if not lines: + return False, f"skipped (no messages in range) | {info}" + os.makedirs(output_dir, exist_ok=True) display_name = contacts.get(username, "") @@ -340,8 +357,31 @@ def main(): parser.add_argument( "-s", "--search", help="Search keyword across all conversations", ) + parser.add_argument( + "--since", help="Start date (inclusive), format: YYYY-MM-DD", + ) + parser.add_argument( + "--until", help="End date (inclusive), format: YYYY-MM-DD (default: today)", + ) + parser.add_argument( + "--personal", action="store_true", + help="Only export personal chats and groups (exclude public accounts starting with gh_)", + ) args = parser.parse_args() + # Parse date range into Unix timestamps + since_ts = None + until_ts = None + if args.since: + since_ts = int(datetime.strptime(args.since, "%Y-%m-%d").timestamp()) + if args.until: + from datetime import timedelta + until_ts = int((datetime.strptime(args.until, "%Y-%m-%d") + timedelta(days=1)).timestamp()) - 1 + elif args.since: + # if --since given but no --until, default until = end of today + from datetime import timedelta + until_ts = int((datetime.now().replace(hour=23, minute=59, second=59, microsecond=0)).timestamp()) + # Load databases msg_dbs = get_all_msg_dbs(args.dir) if not msg_dbs: @@ -371,11 +411,20 @@ def main(): ).fetchone()[0] if not exists: continue + extra_where = [] + extra_params = [f"%{args.search}%"] + if since_ts is not None: + extra_where.append("create_time >= ?") + extra_params.append(since_ts) + if until_ts is not None: + extra_where.append("create_time <= ?") + extra_params.append(until_ts) + date_sql = (" AND " + " AND ".join(extra_where)) if extra_where else "" rows = conn.execute( f"SELECT local_id, local_type, create_time, real_sender_id, " f"message_content, source FROM [{table}] " - f"WHERE message_content LIKE ? ORDER BY create_time DESC LIMIT 10", - (f"%{args.search}%",), + f"WHERE message_content LIKE ?{date_sql} ORDER BY create_time DESC LIMIT 10", + extra_params, ).fetchall() if rows: display = contacts.get(username, username) @@ -400,7 +449,7 @@ def main(): display = contacts.get(username, username) print(f"[*] Matched '{args.chat}' -> {display} ({username})") - lines, info = export_chat(msg_dbs, username, contacts, args.limit) + lines, info = export_chat(msg_dbs, username, contacts, args.limit, since_ts, until_ts) if lines is None: print(f"[-] {info}") sys.exit(1) @@ -409,7 +458,7 @@ def main(): for line in lines: print(line) - success, result_info = export_to_file(msg_dbs, username, args.output, contacts, args.limit) + success, result_info = export_to_file(msg_dbs, username, args.output, contacts, args.limit, since_ts, until_ts) print(f"\n[*] Saved: {result_info}") elif args.all: @@ -417,15 +466,21 @@ def main(): convos = list_conversations(msg_dbs, session_db, contacts) os.makedirs(args.output, exist_ok=True) exported = 0 + skipped = 0 for c in convos: if not c["has_msgs"]: continue + if args.personal and c["username"].startswith("gh_"): + skipped += 1 + continue success, info = export_to_file( - msg_dbs, c["username"], args.output, contacts, args.limit, + msg_dbs, c["username"], args.output, contacts, args.limit, since_ts, until_ts, ) if success: print(f" ✅ {info}") exported += 1 + if skipped: + print(f"[*] Skipped {skipped} public accounts (gh_*)") print(f"\n[*] Exported {exported} conversations to {args.output}/") else: diff --git a/merge_and_clean.py b/merge_and_clean.py new file mode 100644 index 0000000..12d6a57 --- /dev/null +++ b/merge_and_clean.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python3 +""" +合并导出的聊天记录 txt 文件并清理乱码。 + +用法: + python3 merge_and_clean.py <导出目录> [输出文件] [--clean] + +示例: + python3 merge_and_clean.py exported_personal + python3 merge_and_clean.py exported_personal merged.txt + python3 merge_and_clean.py exported_personal --clean + python3 merge_and_clean.py exported_personal merged.txt --clean + +选项: + --clean 合并完成后删除原导出目录 +""" + +import os +import re +import shutil +import sys + +TIMESTAMP_RE = re.compile(r'^\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\]') + + +def merge(src_dir, out_file): + files = sorted(f for f in os.listdir(src_dir) if f.endswith('.txt')) + if not files: + print(f"[-] 目录 {src_dir} 中没有找到 txt 文件") + sys.exit(1) + + print(f"[*] 合并 {len(files)} 个文件...") + with open(out_file, 'wb') as out: + for fname in files: + with open(os.path.join(src_dir, fname), 'rb') as f: + out.write(f.read()) + out.write(b'\n' + b'=' * 80 + b'\n\n') + + print(f"[*] 已合并到 {out_file}") + + +def clean_image_garbage(content: bytes) -> tuple[bytes, int]: + """清理 [image]/[voice]/[video]/[emoji]/[type:xxx] 等标签后的二进制乱码。""" + # 先处理 "[tag] (乱码)" 有括号形式 + cleaned, n1 = re.subn(rb'\[(?!\d{4}-\d{2}-\d{2})([^\]]+)\] \([^\n]*\)', rb'[\1]', content) + # 再处理 "[image]乱码到行尾" 无括号形式([image] 后仍有非换行内容) + cleaned, n2 = re.subn(rb'\[image\][^\n]+', b'[image]', cleaned) + return cleaned, n1 + n2 + + +def clean_garbage_lines(content: str) -> tuple[str, int]: + """删除含 Unicode 替换字符(乱码)的整条消息块。""" + lines = content.split('\n') + output = [] + i = 0 + removed = 0 + + while i < len(lines): + line = lines[i] + + # 保留真正的注释行(# 开头加空格)、纯等号分隔线、空行 + is_comment = line.startswith('# ') + is_separator = line.startswith('=') and all(c == '=' for c in line.strip()) + if is_comment or is_separator or line.strip() == '': + output.append(line) + i += 1 + continue + + # 消息块:以时间戳开头,收集到下一条消息/注释/分隔线为止 + if TIMESTAMP_RE.match(line): + block = [line] + j = i + 1 + while j < len(lines): + nxt = lines[j] + if (TIMESTAMP_RE.match(nxt) or + nxt.startswith('# ') or + (nxt.startswith('=') and all(c == '=' for c in nxt.strip()))): + break + block.append(nxt) + j += 1 + + block_text = '\n'.join(block) + if '\ufffd' in block_text: + removed += 1 + else: + output.append(block_text) + i = j + continue + + # 其他孤立行(不以时间戳开头) + if '\ufffd' in line: + removed += 1 + else: + output.append(line) + i += 1 + + return '\n'.join(output), removed + + +def main(): + args = sys.argv[1:] + if not args: + print(__doc__) + sys.exit(1) + + src_dir = args[0] + if not os.path.isdir(src_dir): + print(f"[-] 目录不存在:{src_dir}") + sys.exit(1) + + do_clean = '--clean' in args + positional = [a for a in args[1:] if not a.startswith('--')] + out_file = positional[0] if positional else src_dir.rstrip('/') + '_merged.txt' + + # 步骤 1:合并 + merge(src_dir, out_file) + + # 步骤 2:清理标签后的二进制乱码 + with open(out_file, 'rb') as f: + content = f.read() + + cleaned_bytes, n_tag = clean_image_garbage(content) + + with open(out_file, 'wb') as f: + f.write(cleaned_bytes) + + print(f"[*] 清理标签乱码:{n_tag} 处") + + # 步骤 3:清理含替换字符的消息块 + with open(out_file, 'r', encoding='utf-8', errors='replace') as f: + text = f.read() + + cleaned_text, n_msg = clean_garbage_lines(text) + + with open(out_file, 'w', encoding='utf-8') as f: + f.write(cleaned_text) + + print(f"[*] 清理乱码消息块:{n_msg} 条") + + if do_clean: + shutil.rmtree(src_dir) + print(f"[*] 已删除原目录:{src_dir}") + + print(f"[✓] 完成:{out_file}") + + +if __name__ == '__main__': + main()