Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,37 @@ python3 export_messages.py -s "关键词"

# 导出所有会话
python3 export_messages.py --all

# 按日期范围导出(支持个人聊天 + 群聊,可组合使用)
python3 export_messages.py --all --since 2026-03-02
python3 export_messages.py --all --since 2026-03-01 --until 2026-03-31

# 只导出个人聊天和群聊(过滤公众号)
python3 export_messages.py --all --personal

# 组合:导出 3 月 2 日至今的个人聊天,仅保留有消息的会话
python3 export_messages.py --all --personal --since 2026-03-02 -o exported_personal
```

> **注意**:微信将个人消息存储在 `message_N.db`,公众号消息存储在 `biz_message_N.db`。首次运行 `find_key_memscan.py` 时,只有已在微信中打开过的对话对应的数据库密钥会被提取到内存中。若发现导出的个人聊天记录不完整,请在微信中打开更多历史对话后重新运行密钥提取脚本,再重新解密。

### 5. 合并并清理聊天记录

将导出目录中的所有 txt 文件合并为单个文件,同时自动清理图片、语音、视频等消息后附带的二进制乱码:

```bash
# 合并到默认输出文件(<目录名>_merged.txt)
python3 merge_and_clean.py exported_personal

# 指定输出文件名
python3 merge_and_clean.py exported_personal output.txt

# 合并后自动删除原导出目录
python3 merge_and_clean.py exported_personal --clean
python3 merge_and_clean.py exported_personal output.txt --clean
```

### 5. MCP Server(让 AI 直接查询)
### 6. MCP Server(让 AI 直接查询)

安装依赖并注册到 Claude Code:

Expand Down
87 changes: 71 additions & 16 deletions export_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,14 @@ def resolve_username(chat_name, contacts):


def get_all_msg_dbs(decrypted_dir):
"""Find all message_N.db files (N = 0, 1, 2, ...)."""
"""Find all message_N.db or biz_message_N.db files (N = 0, 1, 2, ...)."""
import re
msg_dir = os.path.join(decrypted_dir, "message")
if not os.path.isdir(msg_dir):
return []
dbs = []
for f in sorted(os.listdir(msg_dir)):
if re.match(r"^message_\d+\.db$", f):
if re.match(r"^(biz_)?message_\d+\.db$", f):
dbs.append(os.path.join(msg_dir, f))
Comment on lines 106 to 109
Copy link

Copilot AI Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The message DB filename matching logic is duplicated elsewhere (e.g., mcp_server.py:get_msg_dbs still only matches ^message\d+.db$). This change fixes export_messages.py but leaves other entry points potentially broken on newer WeChat macOS versions; consider updating the other helper(s) as well or centralizing the pattern to avoid future divergence.

Copilot uses AI. Check for mistakes.
return dbs
Comment on lines 102 to 110

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This function can be refactored for better readability and to follow Python best practices:

  • The import re statement is redundant because re is already imported at the top of the file. Per PEP 8, imports should be at the module level.
  • The loop that builds the dbs list can be replaced with a more concise list comprehension.

Here's a suggested implementation that applies these improvements.

Suggested change
import re
msg_dir = os.path.join(decrypted_dir, "message")
if not os.path.isdir(msg_dir):
return []
dbs = []
for f in sorted(os.listdir(msg_dir)):
if re.match(r"^message_\d+\.db$", f):
if re.match(r"^(biz_)?message_\d+\.db$", f):
dbs.append(os.path.join(msg_dir, f))
return dbs
msg_dir = os.path.join(decrypted_dir, "message")
if not os.path.isdir(msg_dir):
return []
dbs = [
os.path.join(msg_dir, f)
for f in sorted(os.listdir(msg_dir))
if re.match(r"^(biz_)?message_\d+\.db$", f)
]
return dbs
References
  1. PEP 8 style guide suggests that imports should be placed at the top of the module, not inside functions, to improve clarity and avoid issues with module loading. (link)


Expand Down Expand Up @@ -249,7 +249,7 @@ def list_conversations(msg_dbs, session_db_path, contacts):
return results


def export_chat(msg_dbs, username, contacts, limit=None):
def export_chat(msg_dbs, username, contacts, limit=None, since_ts=None, until_ts=None):
"""Export messages for a specific conversation from all message DBs."""
table = username_to_table(username)
is_group = "@chatroom" in username
Expand All @@ -263,18 +263,32 @@ def export_chat(msg_dbs, username, contacts, limit=None):
try:
total = conn.execute(f"SELECT count(*) FROM [{table}]").fetchone()[0]

query = (
f"SELECT local_id, local_type, create_time, real_sender_id, "
f"message_content, source FROM [{table}] ORDER BY create_time ASC"
)
if limit:
where_clauses = []
params = []
if since_ts is not None:
where_clauses.append("create_time >= ?")
params.append(since_ts)
if until_ts is not None:
where_clauses.append("create_time <= ?")
params.append(until_ts)
where_sql = ("WHERE " + " AND ".join(where_clauses)) if where_clauses else ""

if limit and not where_clauses:
query = (
f"SELECT * FROM (SELECT local_id, local_type, create_time, "
f"real_sender_id, message_content, source FROM [{table}] "
f"ORDER BY create_time DESC LIMIT {limit}) ORDER BY create_time ASC"
)
rows = conn.execute(query).fetchall()
else:
query = (
f"SELECT local_id, local_type, create_time, real_sender_id, "
f"message_content, source FROM [{table}] {where_sql} ORDER BY create_time ASC"
)
if limit:
query += f" LIMIT {limit}"
rows = conn.execute(query, params).fetchall()

rows = conn.execute(query).fetchall()
lines = [format_message(r, is_group, contacts) for r in rows]

display_name = contacts.get(username, username)
Expand All @@ -297,12 +311,15 @@ def safe_filename(display_name, username):
return name


def export_to_file(msg_dbs, username, output_dir, contacts, limit=None):
def export_to_file(msg_dbs, username, output_dir, contacts, limit=None, since_ts=None, until_ts=None):
"""Export messages to a text file named by display name."""
lines, info = export_chat(msg_dbs, username, contacts, limit)
lines, info = export_chat(msg_dbs, username, contacts, limit, since_ts, until_ts)
if lines is None:
return False, info

if not lines:
return False, f"skipped (no messages in range) | {info}"

os.makedirs(output_dir, exist_ok=True)

display_name = contacts.get(username, "")
Expand Down Expand Up @@ -340,8 +357,31 @@ def main():
parser.add_argument(
"-s", "--search", help="Search keyword across all conversations",
)
parser.add_argument(
"--since", help="Start date (inclusive), format: YYYY-MM-DD",
)
parser.add_argument(
"--until", help="End date (inclusive), format: YYYY-MM-DD (default: today)",
)
parser.add_argument(
"--personal", action="store_true",
help="Only export personal chats and groups (exclude public accounts starting with gh_)",
)
args = parser.parse_args()

# Parse date range into Unix timestamps
since_ts = None
until_ts = None
if args.since:
since_ts = int(datetime.strptime(args.since, "%Y-%m-%d").timestamp())
if args.until:
from datetime import timedelta
until_ts = int((datetime.strptime(args.until, "%Y-%m-%d") + timedelta(days=1)).timestamp()) - 1
elif args.since:
# if --since given but no --until, default until = end of today
from datetime import timedelta
until_ts = int((datetime.now().replace(hour=23, minute=59, second=59, microsecond=0)).timestamp())

# Load databases
msg_dbs = get_all_msg_dbs(args.dir)
if not msg_dbs:
Expand Down Expand Up @@ -371,11 +411,20 @@ def main():
).fetchone()[0]
if not exists:
continue
extra_where = []
extra_params = [f"%{args.search}%"]
if since_ts is not None:
extra_where.append("create_time >= ?")
extra_params.append(since_ts)
if until_ts is not None:
extra_where.append("create_time <= ?")
extra_params.append(until_ts)
date_sql = (" AND " + " AND ".join(extra_where)) if extra_where else ""
rows = conn.execute(
f"SELECT local_id, local_type, create_time, real_sender_id, "
f"message_content, source FROM [{table}] "
f"WHERE message_content LIKE ? ORDER BY create_time DESC LIMIT 10",
(f"%{args.search}%",),
f"WHERE message_content LIKE ?{date_sql} ORDER BY create_time DESC LIMIT 10",
extra_params,
).fetchall()
if rows:
display = contacts.get(username, username)
Expand All @@ -400,7 +449,7 @@ def main():
display = contacts.get(username, username)
print(f"[*] Matched '{args.chat}' -> {display} ({username})")

lines, info = export_chat(msg_dbs, username, contacts, args.limit)
lines, info = export_chat(msg_dbs, username, contacts, args.limit, since_ts, until_ts)
if lines is None:
print(f"[-] {info}")
sys.exit(1)
Expand All @@ -409,23 +458,29 @@ def main():
for line in lines:
print(line)

success, result_info = export_to_file(msg_dbs, username, args.output, contacts, args.limit)
success, result_info = export_to_file(msg_dbs, username, args.output, contacts, args.limit, since_ts, until_ts)
print(f"\n[*] Saved: {result_info}")

elif args.all:
# Export all conversations
convos = list_conversations(msg_dbs, session_db, contacts)
os.makedirs(args.output, exist_ok=True)
exported = 0
skipped = 0
for c in convos:
if not c["has_msgs"]:
continue
if args.personal and c["username"].startswith("gh_"):
skipped += 1
continue
success, info = export_to_file(
msg_dbs, c["username"], args.output, contacts, args.limit,
msg_dbs, c["username"], args.output, contacts, args.limit, since_ts, until_ts,
)
if success:
print(f" ✅ {info}")
exported += 1
if skipped:
print(f"[*] Skipped {skipped} public accounts (gh_*)")
print(f"\n[*] Exported {exported} conversations to {args.output}/")

else:
Expand Down
148 changes: 148 additions & 0 deletions merge_and_clean.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
#!/usr/bin/env python3
"""
合并导出的聊天记录 txt 文件并清理乱码。

用法:
python3 merge_and_clean.py <导出目录> [输出文件] [--clean]

示例:
python3 merge_and_clean.py exported_personal
python3 merge_and_clean.py exported_personal merged.txt
python3 merge_and_clean.py exported_personal --clean
python3 merge_and_clean.py exported_personal merged.txt --clean

选项:
--clean 合并完成后删除原导出目录
"""

import os
import re
import shutil
import sys

TIMESTAMP_RE = re.compile(r'^\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\]')


def merge(src_dir, out_file):
files = sorted(f for f in os.listdir(src_dir) if f.endswith('.txt'))
if not files:
print(f"[-] 目录 {src_dir} 中没有找到 txt 文件")
sys.exit(1)

print(f"[*] 合并 {len(files)} 个文件...")
with open(out_file, 'wb') as out:
for fname in files:
with open(os.path.join(src_dir, fname), 'rb') as f:
out.write(f.read())
out.write(b'\n' + b'=' * 80 + b'\n\n')

print(f"[*] 已合并到 {out_file}")


def clean_image_garbage(content: bytes) -> tuple[bytes, int]:
"""清理 [image]/[voice]/[video]/[emoji]/[type:xxx] 等标签后的二进制乱码。"""
# 先处理 "[tag] (乱码)" 有括号形式
cleaned, n1 = re.subn(rb'\[(?!\d{4}-\d{2}-\d{2})([^\]]+)\] \([^\n]*\)', rb'[\1]', content)
# 再处理 "[image]乱码到行尾" 无括号形式([image] 后仍有非换行内容)
cleaned, n2 = re.subn(rb'\[image\][^\n]+', b'[image]', cleaned)
return cleaned, n1 + n2


def clean_garbage_lines(content: str) -> tuple[str, int]:
"""删除含 Unicode 替换字符(乱码)的整条消息块。"""
lines = content.split('\n')
output = []
i = 0
removed = 0

while i < len(lines):
line = lines[i]

# 保留真正的注释行(# 开头加空格)、纯等号分隔线、空行
is_comment = line.startswith('# ')
is_separator = line.startswith('=') and all(c == '=' for c in line.strip())
if is_comment or is_separator or line.strip() == '':
output.append(line)
i += 1
continue

# 消息块:以时间戳开头,收集到下一条消息/注释/分隔线为止
if TIMESTAMP_RE.match(line):
block = [line]
j = i + 1
while j < len(lines):
nxt = lines[j]
if (TIMESTAMP_RE.match(nxt) or
nxt.startswith('# ') or
(nxt.startswith('=') and all(c == '=' for c in nxt.strip()))):
break
block.append(nxt)
j += 1

block_text = '\n'.join(block)
if '\ufffd' in block_text:
removed += 1
else:
output.append(block_text)
i = j
continue

# 其他孤立行(不以时间戳开头)
if '\ufffd' in line:
removed += 1
else:
output.append(line)
i += 1

return '\n'.join(output), removed


def main():
args = sys.argv[1:]
if not args:
print(__doc__)
sys.exit(1)

src_dir = args[0]
if not os.path.isdir(src_dir):
print(f"[-] 目录不存在:{src_dir}")
sys.exit(1)

do_clean = '--clean' in args
positional = [a for a in args[1:] if not a.startswith('--')]
out_file = positional[0] if positional else src_dir.rstrip('/') + '_merged.txt'

# 步骤 1:合并
merge(src_dir, out_file)

# 步骤 2:清理标签后的二进制乱码
with open(out_file, 'rb') as f:
content = f.read()

cleaned_bytes, n_tag = clean_image_garbage(content)

with open(out_file, 'wb') as f:
f.write(cleaned_bytes)

print(f"[*] 清理标签乱码:{n_tag} 处")

# 步骤 3:清理含替换字符的消息块
with open(out_file, 'r', encoding='utf-8', errors='replace') as f:
text = f.read()

cleaned_text, n_msg = clean_garbage_lines(text)

with open(out_file, 'w', encoding='utf-8') as f:
f.write(cleaned_text)

print(f"[*] 清理乱码消息块:{n_msg} 条")

if do_clean:
shutil.rmtree(src_dir)
print(f"[*] 已删除原目录:{src_dir}")

print(f"[✓] 完成:{out_file}")


if __name__ == '__main__':
main()