Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions api/app/core/memory/agent/utils/write_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from app.core.logging_config import get_agent_logger
from app.core.memory.agent.utils.get_dialogs import get_chunked_dialogs
from app.core.memory.storage_services.extraction_engine.deduplication.deduped_and_disamb import _USER_PLACEHOLDER_NAMES
from app.core.memory.storage_services.extraction_engine.extraction_orchestrator import ExtractionOrchestrator
from app.core.memory.storage_services.extraction_engine.knowledge_extraction.memory_summary import \
memory_summary_generation
Expand Down Expand Up @@ -191,31 +192,51 @@ async def write(
if success:
logger.info("Successfully saved all data to Neo4j")

# 使用 Celery 异步任务触发聚类(不阻塞主流程)
if all_entity_nodes:
end_user_id = all_entity_nodes[0].end_user_id

# Neo4j 写入完成后,用 PgSQL 权威 aliases 覆盖 Neo4j 用户实体
try:
from app.repositories.end_user_info_repository import EndUserInfoRepository
if end_user_id:
with get_db_context() as db_session:
info = EndUserInfoRepository(db_session).get_by_end_user_id(uuid.UUID(end_user_id))
pg_aliases = info.aliases if info and info.aliases else []
if info is not None:
# 将 Python 侧占位名集合作为参数传入,避免 Cypher 硬编码
placeholder_names = list(_USER_PLACEHOLDER_NAMES)
await neo4j_connector.execute_query(
"""
MATCH (e:ExtractedEntity)
WHERE e.end_user_id = $end_user_id AND toLower(e.name) IN $placeholder_names
SET e.aliases = $aliases
""",
end_user_id=end_user_id, aliases=pg_aliases,
placeholder_names=placeholder_names,
)
logger.info(f"[AliasSync] Neo4j 用户实体 aliases 已用 PgSQL 权威源覆盖: {pg_aliases}")
except Exception as sync_err:
logger.warning(f"[AliasSync] PgSQL→Neo4j aliases 同步失败(不影响主流程): {sync_err}")

# 使用 Celery 异步任务触发聚类(不阻塞主流程)
try:
from app.tasks import run_incremental_clustering

end_user_id = all_entity_nodes[0].end_user_id
new_entity_ids = [e.id for e in all_entity_nodes]

# 异步提交 Celery 任务
task = run_incremental_clustering.apply_async(
kwargs={
"end_user_id": end_user_id,
"new_entity_ids": new_entity_ids,
"llm_model_id": str(memory_config.llm_model_id) if memory_config.llm_model_id else None,
"embedding_model_id": str(memory_config.embedding_model_id) if memory_config.embedding_model_id else None,
},
# 设置任务优先级(低优先级,不影响主业务)
priority=3,
)
logger.info(
f"[Clustering] 增量聚类任务已提交到 Celery - "
f"task_id={task.id}, end_user_id={end_user_id}, entity_count={len(new_entity_ids)}"
)
except Exception as e:
# 聚类任务提交失败不影响主流程
logger.error(f"[Clustering] 提交聚类任务失败(不影响主流程): {e}", exc_info=True)

break
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,51 +82,38 @@ def _merge_attribute(canonical: ExtractedEntityNode, ent: ExtractedEntityNode):
canonical.connect_strength = next(iter(pair))

# 别名合并(去重保序,使用标准化工具)
# 用户实体的 aliases 由 PgSQL end_user_info 作为唯一权威源,去重合并时不修改
try:
canonical_name = (getattr(canonical, "name", "") or "").strip()
incoming_name = (getattr(ent, "name", "") or "").strip()

# 收集所有需要合并的别名
all_aliases = []

# 1. 添加canonical现有的别名
existing = getattr(canonical, "aliases", []) or []
all_aliases.extend(existing)

# 2. 添加incoming实体的名称(如果不同于canonical的名称)
if incoming_name and incoming_name != canonical_name:
all_aliases.append(incoming_name)

# 3. 添加incoming实体的所有别名
incoming = getattr(ent, "aliases", []) or []
all_aliases.extend(incoming)

# 4. 标准化并去重(优先使用alias_utils工具函数)
try:
from app.core.memory.utils.alias_utils import normalize_aliases
canonical.aliases = normalize_aliases(canonical_name, all_aliases)
except Exception:
# 如果导入失败,使用增强的去重逻辑
seen_normalized = set()
unique_aliases = []
if canonical_name.lower() not in _USER_PLACEHOLDER_NAMES:
incoming_name = (getattr(ent, "name", "") or "").strip()

for alias in all_aliases:
if not alias:
continue

alias_stripped = str(alias).strip()
if not alias_stripped or alias_stripped == canonical_name:
continue

# 标准化:转小写用于去重判断
alias_normalized = alias_stripped.lower()

if alias_normalized not in seen_normalized:
seen_normalized.add(alias_normalized)
unique_aliases.append(alias_stripped)
# 收集所有需要合并的别名,过滤掉用户占位名避免污染非用户实体
all_aliases = list(getattr(canonical, "aliases", []) or [])
if incoming_name and incoming_name != canonical_name and incoming_name.lower() not in _USER_PLACEHOLDER_NAMES:
all_aliases.append(incoming_name)
all_aliases.extend(
a for a in (getattr(ent, "aliases", []) or [])
if a and a.strip().lower() not in _USER_PLACEHOLDER_NAMES
)

# 排序并赋值
canonical.aliases = sorted(unique_aliases)
try:
from app.core.memory.utils.alias_utils import normalize_aliases
canonical.aliases = normalize_aliases(canonical_name, all_aliases)
except Exception:
seen_normalized = set()
unique_aliases = []
for alias in all_aliases:
if not alias:
continue
alias_stripped = str(alias).strip()
if not alias_stripped or alias_stripped == canonical_name:
continue
alias_normalized = alias_stripped.lower()
if alias_normalized not in seen_normalized:
seen_normalized.add(alias_normalized)
unique_aliases.append(alias_stripped)
canonical.aliases = sorted(unique_aliases)
except Exception:
pass

Expand Down Expand Up @@ -733,66 +720,37 @@ def _type_similarity(t1: str, t2: str) -> float:


def _merge_entities_with_aliases(canonical: ExtractedEntityNode, losing: ExtractedEntityNode):
""" 模糊匹配中的实体合并。
"""模糊匹配中的实体合并(别名部分)

合并策略:
1. 保留canonical的主名称不变
2. 将losing的主名称添加为alias(如果不同)
3. 合并两个实体的所有aliases
4. 自动去重(case-insensitive)并排序

Args:
canonical: 规范实体(保留)
losing: 被合并实体(删除)

Note:
使用alias_utils.normalize_aliases进行标准化去重
用户实体的 aliases 由 PgSQL end_user_info 作为唯一权威源,跳过合并。
"""
# 获取规范实体的名称
canonical_name = (getattr(canonical, "name", "") or "").strip()
if canonical_name.lower() in _USER_PLACEHOLDER_NAMES:
return

losing_name = (getattr(losing, "name", "") or "").strip()

# 收集所有需要合并的别名
all_aliases = []

# 1. 添加canonical现有的别名
current_aliases = getattr(canonical, "aliases", []) or []
all_aliases.extend(current_aliases)

# 2. 添加losing实体的名称(如果不同于canonical的名称)
all_aliases = list(getattr(canonical, "aliases", []) or [])
if losing_name and losing_name != canonical_name:
all_aliases.append(losing_name)
all_aliases.extend(getattr(losing, "aliases", []) or [])

# 3. 添加losing实体的所有别名
losing_aliases = getattr(losing, "aliases", []) or []
all_aliases.extend(losing_aliases)

# 4. 标准化并去重(使用标准化后的字符串进行去重)
try:
from app.core.memory.utils.alias_utils import normalize_aliases
canonical.aliases = normalize_aliases(canonical_name, all_aliases)
except Exception:
# 如果导入失败,使用增强的去重逻辑
# 使用标准化后的字符串作为key进行去重
seen_normalized = set()
unique_aliases = []

for alias in all_aliases:
if not alias:
continue

alias_stripped = str(alias).strip()
if not alias_stripped or alias_stripped == canonical_name:
continue

# 标准化:转小写用于去重判断
alias_normalized = alias_stripped.lower()

if alias_normalized not in seen_normalized:
seen_normalized.add(alias_normalized)
unique_aliases.append(alias_stripped)

# 排序并赋值
canonical.aliases = sorted(unique_aliases)

# ========== 主循环:遍历所有实体对进行模糊匹配 ==========
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1391,18 +1391,18 @@ async def _update_end_user_other_name(
"""
将本轮提取的用户别名同步到 end_user 和 end_user_info 表。

注意:此方法在 Neo4j 写入之前调用,因此不能依赖 Neo4j 作为别名的权威数据源。
改为直接使用内存中去重后的 entity_nodes 的 aliases,与 PgSQL 已有的 aliases 合并。
PgSQL end_user_info.aliases 是用户别名的唯一权威源。
此方法仅将本轮 LLM 从对话中新提取的别名增量追加到 PgSQL,
不再从 Neo4j 二层去重合并历史别名,避免脏数据反向污染 PgSQL。

策略:
1. 从内存中的 entity_nodes 提取本轮用户别名(current_aliases)
2. 从去重后的 entity_nodes 中提取完整别名(含 Neo4j 二层去重合并的历史别名)
3. 从 PgSQL end_user_info 读取已有的 aliases(db_aliases)
4. 合并 db_aliases + deduped_aliases + current_aliases,去重保序
5. 写回 PgSQL
1. 从本轮对话原始发言中提取用户别名(current_aliases)
2. 从 PgSQL end_user_info 读取已有的 aliases(db_aliases)
3. 合并 db_aliases + current_aliases,去重保序
4. 写回 PgSQL

Args:
entity_nodes: 去重后的实体节点列表(内存中,含二层去重合并结果
entity_nodes: 去重后的实体节点列表(内存中)
dialog_data_list: 对话数据列表
"""
try:
Expand All @@ -1418,11 +1418,6 @@ async def _update_end_user_other_name(
# 1. 提取本轮对话的用户别名(保持 LLM 提取的原始顺序,不排序)
current_aliases = self._extract_current_aliases(entity_nodes, dialog_data_list)

# 1.5 从去重后的 entity_nodes 中提取完整别名
# 二层去重会将 Neo4j 中已有的历史别名合并到 entity_nodes 中,
# 这里提取出来确保 PgSQL 与 Neo4j 的别名保持同步
deduped_aliases = self._extract_deduped_entity_aliases(entity_nodes)

# 1.6 从 Neo4j 查询已有的 AI 助手别名,作为额外的排除源
# (防止 LLM 未提取出 AI 助手实体时,AI 别名泄漏到用户别名中)
neo4j_assistant_aliases = await self._fetch_neo4j_assistant_aliases(end_user_id)
Expand All @@ -1434,19 +1429,12 @@ async def _update_end_user_other_name(
]
if len(current_aliases) < before_count:
logger.info(f"通过 Neo4j AI 助手别名排除了 {before_count - len(current_aliases)} 个误归属别名")
# 同样过滤 deduped_aliases
deduped_aliases = [
a for a in deduped_aliases
if a.strip().lower() not in neo4j_assistant_aliases
]

if not current_aliases and not deduped_aliases:
if not current_aliases:
logger.debug(f"本轮未提取到用户别名,跳过同步: end_user_id={end_user_id}")
return

logger.info(f"本轮对话提取的 aliases: {current_aliases}")
if deduped_aliases:
logger.info(f"去重后实体的完整 aliases(含历史): {deduped_aliases}")

# 2. 同步到数据库
end_user_uuid = uuid.UUID(end_user_id)
Expand All @@ -1457,21 +1445,15 @@ async def _update_end_user_other_name(
logger.warning(f"未找到 end_user_id={end_user_id} 的用户记录")
return

# 3. 从 PgSQL 读取已有 aliases 并与本轮合并
# 3. 从 PgSQL 读取已有 aliases 并与本轮新增合并
info = EndUserInfoRepository(db).get_by_end_user_id(end_user_uuid)
db_aliases = (info.aliases if info and info.aliases else [])
# 过滤掉占位名称
db_aliases = [a for a in db_aliases if a.strip().lower() not in self.USER_PLACEHOLDER_NAMES]

# 合并:已有 + 去重后完整别名 + 本轮新增,去重保序
# 合并:PgSQL 已有 + 本轮新增,去重保序(不再合并 Neo4j 历史别名)
merged_aliases = list(db_aliases)
seen_lower = {a.strip().lower() for a in merged_aliases}
# 先合并去重后实体的完整别名(含 Neo4j 历史别名)
for alias in deduped_aliases:
if alias.strip().lower() not in seen_lower:
merged_aliases.append(alias)
seen_lower.add(alias.strip().lower())
# 再合并本轮新提取的别名
for alias in current_aliases:
if alias.strip().lower() not in seen_lower:
merged_aliases.append(alias)
Expand Down Expand Up @@ -1505,9 +1487,7 @@ async def _update_end_user_other_name(
info.aliases = merged_aliases
logger.info(f"同步合并后 aliases 到 end_user_info: {merged_aliases}")
else:
first_alias = current_aliases[0].strip() if current_aliases else (
deduped_aliases[0].strip() if deduped_aliases else ""
)
first_alias = current_aliases[0].strip() if current_aliases else ""
# 确保 first_alias 不是占位名称
if first_alias and first_alias.lower() not in self.USER_PLACEHOLDER_NAMES:
db.add(EndUserInfo(
Expand Down
2 changes: 2 additions & 0 deletions api/app/repositories/neo4j/cypher_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@
END,
e.statement_id = CASE WHEN entity.statement_id IS NOT NULL AND entity.statement_id <> '' THEN entity.statement_id ELSE e.statement_id END,
e.aliases = CASE
// 用户实体的 aliases 由 PgSQL end_user_info 作为唯一权威源,知识抽取完全不写入
WHEN entity.name IN ['用户', '我', 'User', 'I'] THEN e.aliases
WHEN entity.aliases IS NOT NULL AND size(entity.aliases) > 0
THEN CASE
WHEN e.aliases IS NULL THEN entity.aliases
Expand Down
4 changes: 2 additions & 2 deletions api/app/services/emotion_analytics_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -679,9 +679,9 @@ async def _get_simple_user_profile(self, end_user_id: str) -> Dict[str, Any]:

# 查询用户的实体和标签
query = """
MATCH (e:Entity)
MATCH (e:ExtractedEntity)
WHERE e.end_user_id = $end_user_id
RETURN e.name as name, e.type as type
RETURN e.name as name, e.entity_type as type
ORDER BY e.created_at DESC
LIMIT 20
"""
Expand Down
3 changes: 2 additions & 1 deletion api/app/services/user_memory_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from sqlalchemy.orm import Session

from app.core.logging_config import get_logger
from app.core.memory.storage_services.extraction_engine.deduplication.deduped_and_disamb import _USER_PLACEHOLDER_NAMES
from app.core.memory.utils.llm.llm_utils import MemoryClientFactory
from app.db import get_db_context
from app.repositories.conversation_repository import ConversationRepository
Expand Down Expand Up @@ -473,7 +474,7 @@ def update_end_user_info(
allowed_fields = {'other_name', 'aliases', 'meta_data'}

# 用户占位名称黑名单,不允许作为 other_name 或出现在 aliases 中
_user_placeholder_names = {'用户', '我', 'User', 'I'}
_user_placeholder_names = _USER_PLACEHOLDER_NAMES

# 过滤 other_name:不允许设置为占位名称
if 'other_name' in update_data and update_data['other_name'] and update_data['other_name'].strip() in _user_placeholder_names:
Expand Down