refactor(memory): consolidate search services and unify model client initialization#916
Open
refactor(memory): consolidate search services and unify model client initialization#916
Conversation
- Replace storage_services/search with new read_services/memory_search structure - Implement content_search and perceptual_search strategies - Add query_preprocessor for search optimization - Create memory_service as unified interface - Update celery_app and graph_search for new architecture - Add enums for memory operations - Implement base_pipeline and memory_read pipeline patterns
Contributor
Reviewer's Guide在 Neo4j 中重构记忆搜索,使用基于枚举的节点类型和集中式查询映射;新增基于 Neo4j 的搜索/读取流水线与提示词基础设施;统一模型客户端初始化,并替换旧的搜索策略类。 用于快速 Neo4j 记忆搜索读取路径的时序图sequenceDiagram
actor Caller
participant MemoryService
participant ReadPipeLine
participant QueryPreprocessor
participant Neo4jSearchService
participant Embedder as RedBearEmbeddings
participant GraphSearch
participant Neo4jConnector
participant Neo4jDB
Caller->>MemoryService: read(query, search_switch=QUICK, limit)
MemoryService->>ReadPipeLine: run(query, search_switch, limit, includes)
ReadPipeLine->>QueryPreprocessor: process(query)
QueryPreprocessor-->>ReadPipeLine: cleaned_query
alt storage_type == RAG
ReadPipeLine->>RAGSearchService: search() async
RAGSearchService-->>ReadPipeLine: MemorySearchResult
ReadPipeLine-->>MemoryService: MemorySearchResult
MemoryService-->>Caller: MemorySearchResult
else storage_type == NEO4J and search_switch == QUICK
ReadPipeLine->>ModelClientMixin: get_embedding_client(db, embedding_model_id)
ModelClientMixin-->>ReadPipeLine: embedder_client
ReadPipeLine->>Neo4jSearchService: Neo4jSearchService(ctx, embedder_client, includes)
ReadPipeLine->>Neo4jSearchService: search(cleaned_query, limit)
par keyword search
Neo4jSearchService->>Neo4jConnector: __aenter__()
Neo4jConnector-->>Neo4jSearchService: connector
Neo4jSearchService->>GraphSearch: search_graph(connector, query, end_user_id, limit, includes)
loop for node_type in includes
GraphSearch->>Neo4jConnector: execute_query(cypher=FULLTEXT_QUERY_CYPHER_MAPPING[node_type], json_format=True, end_user_id, query, limit)
Neo4jConnector->>Neo4jDB: execute_query(cypher, params)
Neo4jDB-->>Neo4jConnector: records
Neo4jConnector-->>GraphSearch: formatted_records
end
GraphSearch-->>Neo4jSearchService: keyword_results per node_type
and embedding search
Neo4jSearchService->>Embedder: embed_documents([cleaned_query])
Embedder-->>Neo4jSearchService: query_embedding
Neo4jSearchService->>GraphSearch: search_graph_by_embedding(connector, embedder_client, query_text, end_user_id, limit, includes)
loop for node_type in includes
GraphSearch->>Neo4jConnector: execute_query(cypher=USER_ID_QUERY_CYPHER_MAPPING[node_type], end_user_id)
Neo4jConnector->>Neo4jDB: execute_query(cypher, params)
Neo4jDB-->>Neo4jConnector: embedding_records
Neo4jConnector-->>GraphSearch: embedding_records
GraphSearch->>GraphSearch: cosine_similarity_search(query_embedding, vectors, limit)
GraphSearch->>Neo4jConnector: execute_query(cypher=NODE_ID_QUERY_CYPHER_MAPPING[node_type], ids, json_format=True)
Neo4jConnector->>Neo4jDB: execute_query(cypher, params)
Neo4jDB-->>Neo4jConnector: node_records
Neo4jConnector-->>GraphSearch: node_records with scores
end
GraphSearch-->>Neo4jSearchService: embedding_results per node_type
end
Neo4jSearchService->>Neo4jConnector: __aexit__()
loop for node_type in includes
Neo4jSearchService->>Neo4jSearchService: _rerank(keyword_results[node_type], embedding_results[node_type], limit)
Neo4jSearchService->>data_builder_factory: data_builder_factory(node_type, record)
data_builder_factory-->>Neo4jSearchService: builder
Neo4jSearchService->>Neo4jSearchService: build Memory objects
end
Neo4jSearchService-->>ReadPipeLine: MemorySearchResult
ReadPipeLine-->>MemoryService: MemorySearchResult
MemoryService-->>Caller: MemorySearchResult
end
新记忆读取流水线与搜索服务的类图classDiagram
class MemoryContext {
+str end_user_id
+MemoryConfig memory_config
+StorageType storage_type
+str user_rag_memory_id
+str language
}
class Memory {
+Neo4jNodeType source
+float score
+str content
+dict data
+str query
+serialize_source(v)
}
class MemorySearchResult {
+list~Memory~ memories
+str content
+int count
}
class StorageType {
<<enumeration>>
NEO4J
RAG
}
class SearchStrategy {
<<enumeration>>
DEEP
NORMAL
QUICK
}
class Neo4jNodeType {
<<enumeration>>
CHUNK
COMMUNITY
DIALOGUE
EXTRACTEDENTITY
MEMORYSUMMARY
PERCEPTUAL
STATEMENT
}
class MemoryService {
-MemoryContext ctx
+MemoryService(db, config_id, end_user_id, workspace_id, storage_type, user_rag_memory_id, language)
+write(messages) async
+read(query, search_switch, limit) async MemorySearchResult
+forget(max_batch, min_days) async dict
+reflect() async dict
+cluster(new_entity_ids) async None
}
class BasePipeline {
+MemoryContext ctx
+run(args, kwargs) async
}
class DBRequiredPipeline {
+Session db
}
class ModelClientMixin {
+get_llm_client(db, model_id) RedBearLLM
+get_embedding_client(db, model_id) RedBearEmbeddings
}
class ReadPipeLine {
+run(query, search_switch, limit, includes) async MemorySearchResult
+_rag_read(query, limit) async MemorySearchResult
+_deep_read(query, limit, includes) async MemorySearchResult
+_normal_read(query, limit, includes) async MemorySearchResult
+_quick_read(query, limit, includes) async MemorySearchResult
}
class Neo4jSearchService {
-MemoryContext ctx
-RedBearEmbeddings embedder
-Neo4jConnector connector
-list~Neo4jNodeType~ includes
-float alpha
-float fulltext_score_threshold
-float cosine_score_threshold
-float content_score_threshold
+Neo4jSearchService(ctx, embedder, includes, alpha, fulltext_score_threshold, cosine_score_threshold, content_score_threshold)
+search(query, limit) async MemorySearchResult
-_keyword_search(query, limit) async
-_embedding_search(query, limit) async
-_rerank(keyword_results, embedding_results, limit) list~dict~
-_normalize_kw_scores(items) list~dict~
}
class RAGSearchService {
+RAGSearchService(ctx)
+search() async MemorySearchResult
}
class QueryPreprocessor {
+process(query) str
+split(query, llm_client) async
+extension(query, llm_client) async
}
class BaseBuilder {
+dict record
+data dict
+content str
+score float
}
class ChunkBuilder {
+data dict
+content str
}
class StatementBuiler {
+data dict
+content str
}
class EntityBuilder {
+data dict
+content str
}
class SummaryBuilder {
+data dict
+content str
}
class PerceptualBuilder {
+data dict
+content str
}
class CommunityBuilder {
+data dict
+content str
}
class data_builder_factory {
+data_builder_factory(node_type, data) BaseBuilder
}
class PromptManager {
+get(name) str
+render(name, kwargs) str
+list_templates() list~str~
}
class QueryPreprocessorDependencies {
RedBearLLM
AgentMemoryDataset
}
MemoryService --> MemoryContext
MemoryService ..> MemorySearchResult
MemoryService ..> ReadPipeLine
BasePipeline <|-- DBRequiredPipeline
BasePipeline <|-- ReadPipeLine
ModelClientMixin <|-- ReadPipeLine
ReadPipeLine --> MemoryContext
ReadPipeLine --> Neo4jSearchService
ReadPipeLine --> RAGSearchService
ReadPipeLine ..> QueryPreprocessor
Neo4jSearchService --> MemoryContext
Neo4jSearchService --> Neo4jNodeType
Neo4jSearchService ..> MemorySearchResult
RAGSearchService --> MemoryContext
MemoryContext --> MemoryConfig
MemoryContext --> StorageType
MemorySearchResult --> Memory
BaseBuilder <|-- ChunkBuilder
BaseBuilder <|-- StatementBuiler
BaseBuilder <|-- EntityBuilder
BaseBuilder <|-- SummaryBuilder
BaseBuilder <|-- PerceptualBuilder
BaseBuilder <|-- CommunityBuilder
data_builder_factory ..> BaseBuilder
data_builder_factory ..> Neo4jNodeType
QueryPreprocessor ..> PromptManager
QueryPreprocessor ..> QueryPreprocessorDependencies
StructResponse ..> RedBearLLM
class StructResponse {
+mode
+model
+StructResponse(mode, model)
+__ror__(other)
}
重构后的 Neo4j 图搜索工具类图classDiagram
class Neo4jConnector {
+Neo4jConnector()
+__aenter__() async Neo4jConnector
+__aexit__(exc_type, exc_val, exc_tb) async
+close() async
+execute_query(cypher, json_format, kwargs) async list~dict~
}
class GraphSearchModule {
+cosine_similarity_search(query, vectors, limit) dict~int,float~
+search_perceptual_by_fulltext(connector, query, end_user_id, limit) async dict
+search_perceptual_by_embedding(connector, embedder_client, query_text, end_user_id, limit) async dict
+search_by_fulltext(connector, node_type, end_user_id, query, limit) async list~dict~
+search_by_embedding(connector, node_type, end_user_id, query_embedding, limit) async list~dict~
+search_graph(connector, query, end_user_id, limit, include) async dict
+search_graph_by_embedding(connector, embedder_client, query_text, end_user_id, limit, include) async dict
+search_graph_community_expand(args) async dict
}
class Neo4jNodeType {
<<enumeration>>
CHUNK
COMMUNITY
DIALOGUE
EXTRACTEDENTITY
MEMORYSUMMARY
PERCEPTUAL
STATEMENT
}
class FULLTEXT_QUERY_CYPHER_MAPPING {
+STATEMENT
+EXTRACTEDENTITY
+CHUNK
+MEMORYSUMMARY
+COMMUNITY
+PERCEPTUAL
}
class USER_ID_QUERY_CYPHER_MAPPING {
+STATEMENT
+EXTRACTEDENTITY
+CHUNK
+MEMORYSUMMARY
+COMMUNITY
+PERCEPTUAL
}
class NODE_ID_QUERY_CYPHER_MAPPING {
+STATEMENT
+EXTRACTEDENTITY
+CHUNK
+MEMORYSUMMARY
+COMMUNITY
+PERCEPTUAL
}
class OpenAIEmbedderClient {
+response(texts) async list~list~float~~
}
class RedBearEmbeddings {
+embed_documents(texts) list~list~float~~
}
class SearchDeduplication {
+deduplicate_results(items) list~dict~
}
GraphSearchModule --> Neo4jConnector
GraphSearchModule --> Neo4jNodeType
GraphSearchModule --> FULLTEXT_QUERY_CYPHER_MAPPING
GraphSearchModule --> USER_ID_QUERY_CYPHER_MAPPING
GraphSearchModule --> NODE_ID_QUERY_CYPHER_MAPPING
GraphSearchModule ..> OpenAIEmbedderClient
GraphSearchModule ..> RedBearEmbeddings
GraphSearchModule ..> SearchDeduplication
SearchDeduplication <.. Neo4jSearchService
FULLTEXT_QUERY_CYPHER_MAPPING --> Neo4jNodeType
USER_ID_QUERY_CYPHER_MAPPING --> Neo4jNodeType
NODE_ID_QUERY_CYPHER_MAPPING --> Neo4jNodeType
Neo4jConnector ..> Neo4jDB
class Neo4jDB {
<<external>>
}
文件级变更
Tips and commandsInteracting with Sourcery
Customizing Your Experience访问你的 控制面板 可以:
Getting HelpOriginal review guide in EnglishReviewer's GuideRefactors memory search across Neo4j to use enum-based node types and centralized query mappings, adds a new Neo4j-based search/read pipeline and prompt infrastructure, and unifies model client initialization while replacing old search strategy classes. Sequence diagram for quick Neo4j memory search read pathsequenceDiagram
actor Caller
participant MemoryService
participant ReadPipeLine
participant QueryPreprocessor
participant Neo4jSearchService
participant Embedder as RedBearEmbeddings
participant GraphSearch
participant Neo4jConnector
participant Neo4jDB
Caller->>MemoryService: read(query, search_switch=QUICK, limit)
MemoryService->>ReadPipeLine: run(query, search_switch, limit, includes)
ReadPipeLine->>QueryPreprocessor: process(query)
QueryPreprocessor-->>ReadPipeLine: cleaned_query
alt storage_type == RAG
ReadPipeLine->>RAGSearchService: search() async
RAGSearchService-->>ReadPipeLine: MemorySearchResult
ReadPipeLine-->>MemoryService: MemorySearchResult
MemoryService-->>Caller: MemorySearchResult
else storage_type == NEO4J and search_switch == QUICK
ReadPipeLine->>ModelClientMixin: get_embedding_client(db, embedding_model_id)
ModelClientMixin-->>ReadPipeLine: embedder_client
ReadPipeLine->>Neo4jSearchService: Neo4jSearchService(ctx, embedder_client, includes)
ReadPipeLine->>Neo4jSearchService: search(cleaned_query, limit)
par keyword search
Neo4jSearchService->>Neo4jConnector: __aenter__()
Neo4jConnector-->>Neo4jSearchService: connector
Neo4jSearchService->>GraphSearch: search_graph(connector, query, end_user_id, limit, includes)
loop for node_type in includes
GraphSearch->>Neo4jConnector: execute_query(cypher=FULLTEXT_QUERY_CYPHER_MAPPING[node_type], json_format=True, end_user_id, query, limit)
Neo4jConnector->>Neo4jDB: execute_query(cypher, params)
Neo4jDB-->>Neo4jConnector: records
Neo4jConnector-->>GraphSearch: formatted_records
end
GraphSearch-->>Neo4jSearchService: keyword_results per node_type
and embedding search
Neo4jSearchService->>Embedder: embed_documents([cleaned_query])
Embedder-->>Neo4jSearchService: query_embedding
Neo4jSearchService->>GraphSearch: search_graph_by_embedding(connector, embedder_client, query_text, end_user_id, limit, includes)
loop for node_type in includes
GraphSearch->>Neo4jConnector: execute_query(cypher=USER_ID_QUERY_CYPHER_MAPPING[node_type], end_user_id)
Neo4jConnector->>Neo4jDB: execute_query(cypher, params)
Neo4jDB-->>Neo4jConnector: embedding_records
Neo4jConnector-->>GraphSearch: embedding_records
GraphSearch->>GraphSearch: cosine_similarity_search(query_embedding, vectors, limit)
GraphSearch->>Neo4jConnector: execute_query(cypher=NODE_ID_QUERY_CYPHER_MAPPING[node_type], ids, json_format=True)
Neo4jConnector->>Neo4jDB: execute_query(cypher, params)
Neo4jDB-->>Neo4jConnector: node_records
Neo4jConnector-->>GraphSearch: node_records with scores
end
GraphSearch-->>Neo4jSearchService: embedding_results per node_type
end
Neo4jSearchService->>Neo4jConnector: __aexit__()
loop for node_type in includes
Neo4jSearchService->>Neo4jSearchService: _rerank(keyword_results[node_type], embedding_results[node_type], limit)
Neo4jSearchService->>data_builder_factory: data_builder_factory(node_type, record)
data_builder_factory-->>Neo4jSearchService: builder
Neo4jSearchService->>Neo4jSearchService: build Memory objects
end
Neo4jSearchService-->>ReadPipeLine: MemorySearchResult
ReadPipeLine-->>MemoryService: MemorySearchResult
MemoryService-->>Caller: MemorySearchResult
end
Class diagram for the new memory read pipeline and search servicesclassDiagram
class MemoryContext {
+str end_user_id
+MemoryConfig memory_config
+StorageType storage_type
+str user_rag_memory_id
+str language
}
class Memory {
+Neo4jNodeType source
+float score
+str content
+dict data
+str query
+serialize_source(v)
}
class MemorySearchResult {
+list~Memory~ memories
+str content
+int count
}
class StorageType {
<<enumeration>>
NEO4J
RAG
}
class SearchStrategy {
<<enumeration>>
DEEP
NORMAL
QUICK
}
class Neo4jNodeType {
<<enumeration>>
CHUNK
COMMUNITY
DIALOGUE
EXTRACTEDENTITY
MEMORYSUMMARY
PERCEPTUAL
STATEMENT
}
class MemoryService {
-MemoryContext ctx
+MemoryService(db, config_id, end_user_id, workspace_id, storage_type, user_rag_memory_id, language)
+write(messages) async
+read(query, search_switch, limit) async MemorySearchResult
+forget(max_batch, min_days) async dict
+reflect() async dict
+cluster(new_entity_ids) async None
}
class BasePipeline {
+MemoryContext ctx
+run(args, kwargs) async
}
class DBRequiredPipeline {
+Session db
}
class ModelClientMixin {
+get_llm_client(db, model_id) RedBearLLM
+get_embedding_client(db, model_id) RedBearEmbeddings
}
class ReadPipeLine {
+run(query, search_switch, limit, includes) async MemorySearchResult
+_rag_read(query, limit) async MemorySearchResult
+_deep_read(query, limit, includes) async MemorySearchResult
+_normal_read(query, limit, includes) async MemorySearchResult
+_quick_read(query, limit, includes) async MemorySearchResult
}
class Neo4jSearchService {
-MemoryContext ctx
-RedBearEmbeddings embedder
-Neo4jConnector connector
-list~Neo4jNodeType~ includes
-float alpha
-float fulltext_score_threshold
-float cosine_score_threshold
-float content_score_threshold
+Neo4jSearchService(ctx, embedder, includes, alpha, fulltext_score_threshold, cosine_score_threshold, content_score_threshold)
+search(query, limit) async MemorySearchResult
-_keyword_search(query, limit) async
-_embedding_search(query, limit) async
-_rerank(keyword_results, embedding_results, limit) list~dict~
-_normalize_kw_scores(items) list~dict~
}
class RAGSearchService {
+RAGSearchService(ctx)
+search() async MemorySearchResult
}
class QueryPreprocessor {
+process(query) str
+split(query, llm_client) async
+extension(query, llm_client) async
}
class BaseBuilder {
+dict record
+data dict
+content str
+score float
}
class ChunkBuilder {
+data dict
+content str
}
class StatementBuiler {
+data dict
+content str
}
class EntityBuilder {
+data dict
+content str
}
class SummaryBuilder {
+data dict
+content str
}
class PerceptualBuilder {
+data dict
+content str
}
class CommunityBuilder {
+data dict
+content str
}
class data_builder_factory {
+data_builder_factory(node_type, data) BaseBuilder
}
class PromptManager {
+get(name) str
+render(name, kwargs) str
+list_templates() list~str~
}
class QueryPreprocessorDependencies {
RedBearLLM
AgentMemoryDataset
}
MemoryService --> MemoryContext
MemoryService ..> MemorySearchResult
MemoryService ..> ReadPipeLine
BasePipeline <|-- DBRequiredPipeline
BasePipeline <|-- ReadPipeLine
ModelClientMixin <|-- ReadPipeLine
ReadPipeLine --> MemoryContext
ReadPipeLine --> Neo4jSearchService
ReadPipeLine --> RAGSearchService
ReadPipeLine ..> QueryPreprocessor
Neo4jSearchService --> MemoryContext
Neo4jSearchService --> Neo4jNodeType
Neo4jSearchService ..> MemorySearchResult
RAGSearchService --> MemoryContext
MemoryContext --> MemoryConfig
MemoryContext --> StorageType
MemorySearchResult --> Memory
BaseBuilder <|-- ChunkBuilder
BaseBuilder <|-- StatementBuiler
BaseBuilder <|-- EntityBuilder
BaseBuilder <|-- SummaryBuilder
BaseBuilder <|-- PerceptualBuilder
BaseBuilder <|-- CommunityBuilder
data_builder_factory ..> BaseBuilder
data_builder_factory ..> Neo4jNodeType
QueryPreprocessor ..> PromptManager
QueryPreprocessor ..> QueryPreprocessorDependencies
StructResponse ..> RedBearLLM
class StructResponse {
+mode
+model
+StructResponse(mode, model)
+__ror__(other)
}
Class diagram for refactored Neo4j graph search utilitiesclassDiagram
class Neo4jConnector {
+Neo4jConnector()
+__aenter__() async Neo4jConnector
+__aexit__(exc_type, exc_val, exc_tb) async
+close() async
+execute_query(cypher, json_format, kwargs) async list~dict~
}
class GraphSearchModule {
+cosine_similarity_search(query, vectors, limit) dict~int,float~
+search_perceptual_by_fulltext(connector, query, end_user_id, limit) async dict
+search_perceptual_by_embedding(connector, embedder_client, query_text, end_user_id, limit) async dict
+search_by_fulltext(connector, node_type, end_user_id, query, limit) async list~dict~
+search_by_embedding(connector, node_type, end_user_id, query_embedding, limit) async list~dict~
+search_graph(connector, query, end_user_id, limit, include) async dict
+search_graph_by_embedding(connector, embedder_client, query_text, end_user_id, limit, include) async dict
+search_graph_community_expand(args) async dict
}
class Neo4jNodeType {
<<enumeration>>
CHUNK
COMMUNITY
DIALOGUE
EXTRACTEDENTITY
MEMORYSUMMARY
PERCEPTUAL
STATEMENT
}
class FULLTEXT_QUERY_CYPHER_MAPPING {
+STATEMENT
+EXTRACTEDENTITY
+CHUNK
+MEMORYSUMMARY
+COMMUNITY
+PERCEPTUAL
}
class USER_ID_QUERY_CYPHER_MAPPING {
+STATEMENT
+EXTRACTEDENTITY
+CHUNK
+MEMORYSUMMARY
+COMMUNITY
+PERCEPTUAL
}
class NODE_ID_QUERY_CYPHER_MAPPING {
+STATEMENT
+EXTRACTEDENTITY
+CHUNK
+MEMORYSUMMARY
+COMMUNITY
+PERCEPTUAL
}
class OpenAIEmbedderClient {
+response(texts) async list~list~float~~
}
class RedBearEmbeddings {
+embed_documents(texts) list~list~float~~
}
class SearchDeduplication {
+deduplicate_results(items) list~dict~
}
GraphSearchModule --> Neo4jConnector
GraphSearchModule --> Neo4jNodeType
GraphSearchModule --> FULLTEXT_QUERY_CYPHER_MAPPING
GraphSearchModule --> USER_ID_QUERY_CYPHER_MAPPING
GraphSearchModule --> NODE_ID_QUERY_CYPHER_MAPPING
GraphSearchModule ..> OpenAIEmbedderClient
GraphSearchModule ..> RedBearEmbeddings
GraphSearchModule ..> SearchDeduplication
SearchDeduplication <.. Neo4jSearchService
FULLTEXT_QUERY_CYPHER_MAPPING --> Neo4jNodeType
USER_ID_QUERY_CYPHER_MAPPING --> Neo4jNodeType
NODE_ID_QUERY_CYPHER_MAPPING --> Neo4jNodeType
Neo4jConnector ..> Neo4jDB
class Neo4jDB {
<<external>>
}
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
Contributor
There was a problem hiding this comment.
Hey - 我发现了 8 个问题,并给出了一些高层次的反馈:
- 从基于字符串的类型键迁移到 Neo4jNodeType 枚举在多个地方不一致(例如
search_graph.search_graph、search_graph_by_embedding、execute_hybrid_search、rerank_with_activation):结果字典和 Cypher 映射仍然使用字符串键(node_type.value),而 include/循环变量使用的是枚举,因此像key in include和category in answer这样的成员检查现在会始终失败,从而跳过激活更新和结果聚合——需要统一键(要么全部用枚举,要么全部用字符串),或者在读/写时做一次规范化。 - 在
SearchService.extract_content_from_result中,你将if 'statement' in result改成了if Neo4jNodeType.STATEMENT in result,但结果字典仍然是来自 Cypher 的'statement'字符串键;这个条件永远不会为真,从而导致 statement 内容被丢弃——要么保留字符串键,要么先把字典键映射到基于枚举的结构后再访问。 cosine_similarity_search没有对零范数向量(查询或存储的 embedding)做保护,这会在退化或未初始化的 embedding 情况下导致除零和 NaN;建议在归一化前过滤掉零向量,或者在范数上加一个 epsilon。
给 AI Agent 的提示
Please address the comments from this code review:
## Overall Comments
- 从基于字符串的类型键迁移到 Neo4jNodeType 枚举在多个地方不一致(例如 search_graph.search_graph、search_graph_by_embedding、execute_hybrid_search、rerank_with_activation):结果字典和 Cypher 映射仍然使用字符串键(node_type.value),而 include/循环变量使用的是枚举,因此像 `key in include` 和 `category in answer` 这样的成员检查现在会始终失败,从而跳过激活更新和结果聚合——需要统一键(要么全部用枚举,要么全部用字符串),或者在读/写时做一次规范化。
- 在 SearchService.extract_content_from_result 中,你将 `if 'statement' in result` 改成了 `if Neo4jNodeType.STATEMENT in result`,但结果字典仍然是来自 Cypher 的 'statement' 字符串键;这个条件永远不会为真,从而导致 statement 内容被丢弃——要么保留字符串键,要么先把字典键映射到基于枚举的结构后再访问。
- cosine_similarity_search 没有对零范数向量(查询或存储的 embedding)做保护,这会在退化或未初始化的 embedding 情况下导致除零和 NaN;建议在归一化前过滤掉零向量,或者在范数上加一个 epsilon。
## Individual Comments
### Comment 1
<location path="api/app/repositories/neo4j/graph_search.py" line_range="38-47" />
<code_context>
logger = logging.getLogger(__name__)
+def cosine_similarity_search(
+ query: list[float],
+ vectors: list[list[float]],
+ limit: int
+) -> dict[int, float]:
+ if not vectors:
+ return {}
+ vectors: np.ndarray = np.array(vectors, dtype=np.float32)
+ vectors_norm = vectors / np.linalg.norm(vectors, axis=1, keepdims=True)
+ query: np.ndarray = np.array(query, dtype=np.float32)
+ query_norm = query / np.linalg.norm(query)
+
+ similarities = vectors_norm @ query_norm
+ similarities = np.clip(similarities, 0, 1)
+ top_k = min(limit, similarities.shape[0])
</code_context>
<issue_to_address>
**issue:** 在 cosine_similarity_search 中需要对零范数向量做保护,以避免除零和 NaN。
在 `cosine_similarity_search` 中,`vectors_norm = vectors / np.linalg.norm(...` 和 `query_norm = query / np.linalg.norm(query)` 都会在向量或查询全为 0 时发生除零,从而产生 NaN 并导致排序无效。请显式处理零范数情况,例如通过过滤掉零范数行,或在范数上加一个下限 epsilon(`norm = np.where(norm == 0, 1, norm)`),并在查询范数为 0 时提前返回。
</issue_to_address>
### Comment 2
<location path="api/app/repositories/neo4j/graph_search.py" line_range="358-362" />
<code_context>
+ limit: int = 10,
+) -> list[dict[str, Any]]:
+ try:
+ records = await connector.execute_query(
+ USER_ID_QUERY_CYPHER_MAPPING[node_type],
+ end_user_id=end_user_id,
+ )
+ records = [record for record in records if record if record["embedding"] is not None]
+ ids = [item['id'] for item in records]
+ vectors = [item['embedding'] for item in records]
</code_context>
<issue_to_address>
**issue (bug_risk):** 修正列表推导式的过滤逻辑,避免在 records 包含 None 或缺少 'embedding' 时崩溃。
当前推导式 `records = [record for record in records if record if record["embedding"] is not None]` 存在两个问题:
- `if record if record[...]` 的语法很怪异,而且仍然会在 `record` 为 falsy 时访问 `record["embedding"]`,如果 `record` 为 `None` 会失败。
- 直接访问 `record["embedding"]` 在键缺失时会抛出 `KeyError`。
可以改成:
```python
records = [
r for r in records
if r is not None and r.get("embedding") is not None
]
```
</issue_to_address>
### Comment 3
<location path="api/app/repositories/neo4j/graph_search.py" line_range="169-175" />
<code_context>
knowledge_node_types = {
'statements': 'Statement',
'entities': 'ExtractedEntity',
- 'summaries': 'MemorySummary'
+ 'summaries': 'MemorySummary',
+ Neo4jNodeType.STATEMENT: Neo4jNodeType.STATEMENT.value,
+ Neo4jNodeType.EXTRACTEDENTITY: Neo4jNodeType.EXTRACTEDENTITY.value,
+ Neo4jNodeType.MEMORYSUMMARY: Neo4jNodeType.MEMORYSUMMARY.value,
}
</code_context>
<issue_to_address>
**issue (bug_risk):** 在 knowledge_node_types 中混用字符串键和 Neo4jNodeType 键会导致激活更新永远不会触发。
在 `_update_search_results_activation` 中,`knowledge_node_types` 现在混用了字符串键(如 `'statements'`)和枚举键(如 `Neo4jNodeType.STATEMENT`)。但 `needs_activation_update` 的检查是:
```python
needs_activation_update = any(
key in include and key in results and results[key]
for key in ['statements', 'entities', 'chunks']
)
```
`include` 是 `List[Neo4jNodeType]`,而 `results` 的键是 `node_type.value`(例如 `'Statement'`),因此这些字符串字面量永远不会匹配,激活更新这条路径实际上被废掉了。需要通过采用单一的键方案来修复(例如到处都使用 `Neo4jNodeType`),并且始终通过 `node_type.value` 来索引 `results`。
</issue_to_address>
### Comment 4
<location path="api/app/repositories/neo4j/graph_search.py" line_range="431-433" />
<code_context>
- limit=limit,
- ))
- task_keys.append("communities")
+ for node_type in include:
+ tasks.append(search_by_fulltext(connector, node_type, end_user_id, escaped_query, limit))
+ task_keys.append(node_type.value)
# Execute all queries in parallel
</code_context>
<issue_to_address>
**issue (bug_risk):** 结果字典使用字符串作为键,但后续激活逻辑仍然期望枚举键。
由于 `task_keys` 使用的是 `node_type.value`,`results` 最终会得到字符串键,但后续逻辑使用 `Neo4jNodeType` 值来做成员检查。这个不匹配意味着像 `key in include and key in results` 这样的条件永远不会成立,因此激活更新代码块会被跳过。要么始终一致地使用枚举(用 `Neo4jNodeType` 作为 `results` 的键),要么始终使用字符串(把 `include` 转换成 `node_type.value`)。
</issue_to_address>
### Comment 5
<location path="api/app/core/memory/agent/services/search_service.py" line_range="114-115" />
<code_context>
content_parts = []
# Statements: extract statement field
- if 'statement' in result and result['statement']:
- content_parts.append(result['statement'])
+ if Neo4jNodeType.STATEMENT in result and result[Neo4jNodeType.STATEMENT]:
+ content_parts.append(result[Neo4jNodeType.STATEMENT])
</code_context>
<issue_to_address>
**issue (bug_risk):** 使用 Neo4jNodeType 枚举键访问 result 字段会失败;底层结果字典仍然使用字符串字段名。
`result` 是一个使用字符串键的普通字典(例如 `'statement'`),因此使用 `Neo4jNodeType.STATEMENT` 作为键不会匹配,要么跳过内容,要么抛出 `KeyError`。这里应继续使用基于字符串的访问,例如:
```python
if result.get("statement"):
content_parts.append(result["statement"])
```
</issue_to_address>
### Comment 6
<location path="api/app/core/memory/agent/services/search_service.py" line_range="232" />
<code_context>
reranked_results = answer.get('reranked_results', {})
# Priority order: summaries first (most contextual), then communities, statements, chunks, entities
</code_context>
<issue_to_address>
**issue (bug_risk):** 当前的优先级 / include 处理在混用枚举和值为字符串的键,会导致某些类别被跳过。
在 `execute_hybrid_search` 中,`include` 现在是 `List[Neo4jNodeType]`,但 `reranked_results` 和 `answer` 仍然以 `'summaries'`、`'communities'` 等字符串作为键。因此,这段循环:
```python
for category in priority_order:
if category in include and category in reranked_results:
...
```
永远不会匹配,因为 `category` 是枚举,而字典键是字符串。要么将所有内容统一为字符串(包括 `include` 和 `priority_order`),要么在构建和索引 `reranked_results`/`answer` 时一致地使用 `category.value`。
</issue_to_address>
### Comment 7
<location path="api/app/core/memory/src/search.py" line_range="242-244" />
<code_context>
reranked: Dict[str, List[Dict[str, Any]]] = {}
- for category in ["statements", "chunks", "entities", "summaries", "communities"]:
+ for category in [Neo4jNodeType.STATEMENT, Neo4jNodeType.CHUNK, Neo4jNodeType.EXTRACTEDENTITY, Neo4jNodeType.MEMORYSUMMARY, Neo4jNodeType.COMMUNITY]:
keyword_items = keyword_results.get(category, [])
embedding_items = embedding_results.get(category, [])
</code_context>
<issue_to_address>
**issue (bug_risk):** 在输入仍然以字符串为键时使用 Neo4jNodeType 作为字典键,会破坏 reranking。
`keyword_results` 和 `embedding_results` 仍然使用字符串类别名作为键(例如 `'statements'`、`'chunks'`),但循环现在使用的是 `Neo4jNodeType` 值。这意味着:
```python
keyword_items = keyword_results.get(category, [])
embedding_items = embedding_results.get(category, [])
```
将总是返回 `[]`。要么改用 `category.value` 来索引,要么继续遍历字符串键,并在流水线更早的阶段转换为枚举。
</issue_to_address>
### Comment 8
<location path="api/app/core/memory/read_services/content_search.py" line_range="155-157" />
<code_context>
+ logger.warning(f"[MemorySearch] embedding search error: {emb_results}")
+ emb_results = {}
+
+ memories = []
+ for node_type in self.includes:
+ reranked = self._rerank(
+ kw_results.get(node_type, []),
+ emb_results.get(node_type, []),
</code_context>
<issue_to_address>
**issue (bug_risk):** Neo4jSearchService 在 reranking 时把枚举键和以字符串为键的结果字典混在一起使用。
在 `search` 中,`self.includes` 包含 `Neo4jNodeType` 枚举,但 `search_graph` / `search_graph_by_embedding` 返回的字典是以 `node_type.value`(字符串)为键的。这意味着 `kw_results.get(node_type, [])` 和 `emb_results.get(node_type, [])` 总是取不到值,从而没有任何内容被 rerank 或返回。应改为使用枚举的 value 作为键:
```python
for node_type in self.includes:
key = node_type.value
reranked = self._rerank(
kw_results.get(key, []),
emb_results.get(key, []),
limit,
)
```
这样才能实际使用这些结果。
</issue_to_address>帮我变得更有用!请对每条评论点 👍 或 👎,我会根据这些反馈改进后续评审。
Original comment in English
Hey - I've found 8 issues, and left some high level feedback:
- The migration from string-based type keys to Neo4jNodeType enums is inconsistent in several places (e.g. search_graph.search_graph, search_graph_by_embedding, execute_hybrid_search, rerank_with_activation): results dicts and Cypher mappings still use string keys (node_type.value) while include/loop variables use enums, so membership checks like
key in includeandcategory in answerwill now always fail and skip activation updates / result aggregation—align keys (all enums or all strings) or normalize when reading/writing. - In SearchService.extract_content_from_result you changed
if 'statement' in resulttoif Neo4jNodeType.STATEMENT in result, but the result dicts still have a 'statement' string key from Cypher; this condition will never be true and statement content will be dropped—keep the string key or map the dict keys to enum-based structure first. - cosine_similarity_search does not guard against zero-norm vectors (either query or stored embeddings), which will cause division-by-zero and NaNs for degenerate or uninitialized embeddings; consider filtering out zero vectors or adding an epsilon when normalizing.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The migration from string-based type keys to Neo4jNodeType enums is inconsistent in several places (e.g. search_graph.search_graph, search_graph_by_embedding, execute_hybrid_search, rerank_with_activation): results dicts and Cypher mappings still use string keys (node_type.value) while include/loop variables use enums, so membership checks like `key in include` and `category in answer` will now always fail and skip activation updates / result aggregation—align keys (all enums or all strings) or normalize when reading/writing.
- In SearchService.extract_content_from_result you changed `if 'statement' in result` to `if Neo4jNodeType.STATEMENT in result`, but the result dicts still have a 'statement' string key from Cypher; this condition will never be true and statement content will be dropped—keep the string key or map the dict keys to enum-based structure first.
- cosine_similarity_search does not guard against zero-norm vectors (either query or stored embeddings), which will cause division-by-zero and NaNs for degenerate or uninitialized embeddings; consider filtering out zero vectors or adding an epsilon when normalizing.
## Individual Comments
### Comment 1
<location path="api/app/repositories/neo4j/graph_search.py" line_range="38-47" />
<code_context>
logger = logging.getLogger(__name__)
+def cosine_similarity_search(
+ query: list[float],
+ vectors: list[list[float]],
+ limit: int
+) -> dict[int, float]:
+ if not vectors:
+ return {}
+ vectors: np.ndarray = np.array(vectors, dtype=np.float32)
+ vectors_norm = vectors / np.linalg.norm(vectors, axis=1, keepdims=True)
+ query: np.ndarray = np.array(query, dtype=np.float32)
+ query_norm = query / np.linalg.norm(query)
+
+ similarities = vectors_norm @ query_norm
+ similarities = np.clip(similarities, 0, 1)
+ top_k = min(limit, similarities.shape[0])
</code_context>
<issue_to_address>
**issue:** Guard against zero‑norm vectors in cosine_similarity_search to avoid division by zero and NaNs.
In `cosine_similarity_search`, both `vectors_norm = vectors / np.linalg.norm(...` and `query_norm = query / np.linalg.norm(query)` will divide by zero for all‑zero vectors or query, producing NaNs and invalid rankings. Please handle zero‑norm cases explicitly, e.g. by filtering out zero‑norm rows or applying an epsilon floor (`norm = np.where(norm == 0, 1, norm)`) and early‑returning when the query norm is 0.
</issue_to_address>
### Comment 2
<location path="api/app/repositories/neo4j/graph_search.py" line_range="358-362" />
<code_context>
+ limit: int = 10,
+) -> list[dict[str, Any]]:
+ try:
+ records = await connector.execute_query(
+ USER_ID_QUERY_CYPHER_MAPPING[node_type],
+ end_user_id=end_user_id,
+ )
+ records = [record for record in records if record if record["embedding"] is not None]
+ ids = [item['id'] for item in records]
+ vectors = [item['embedding'] for item in records]
</code_context>
<issue_to_address>
**issue (bug_risk):** Fix list comprehension filtering to avoid crashing when records contain None or missing 'embedding'.
The current comprehension `records = [record for record in records if record if record["embedding"] is not None]` has two issues:
- `if record if record[...]` is syntactically odd and still evaluates `record["embedding"]` even when `record` is falsy, which will fail if `record` is `None`.
- Accessing `record["embedding"]` directly will raise `KeyError` when the key is missing.
Consider instead:
```python
records = [
r for r in records
if r is not None and r.get("embedding") is not None
]
```
</issue_to_address>
### Comment 3
<location path="api/app/repositories/neo4j/graph_search.py" line_range="169-175" />
<code_context>
knowledge_node_types = {
'statements': 'Statement',
'entities': 'ExtractedEntity',
- 'summaries': 'MemorySummary'
+ 'summaries': 'MemorySummary',
+ Neo4jNodeType.STATEMENT: Neo4jNodeType.STATEMENT.value,
+ Neo4jNodeType.EXTRACTEDENTITY: Neo4jNodeType.EXTRACTEDENTITY.value,
+ Neo4jNodeType.MEMORYSUMMARY: Neo4jNodeType.MEMORYSUMMARY.value,
}
</code_context>
<issue_to_address>
**issue (bug_risk):** Mixing string keys and Neo4jNodeType keys in knowledge_node_types causes activation updates to never trigger.
In `_update_search_results_activation`, `knowledge_node_types` now mixes string keys (e.g. `'statements'`) and enum keys (e.g. `Neo4jNodeType.STATEMENT`). But `needs_activation_update` checks:
```python
needs_activation_update = any(
key in include and key in results and results[key]
for key in ['statements', 'entities', 'chunks']
)
```
`include` is a `List[Neo4jNodeType]` and `results` is keyed by `node_type.value` (e.g. `'Statement'`), so these string literals never match and the activation update path is effectively dead. This needs to be fixed by using a single key scheme (e.g. `Neo4jNodeType` everywhere) and consistently indexing `results` via `node_type.value`.
</issue_to_address>
### Comment 4
<location path="api/app/repositories/neo4j/graph_search.py" line_range="431-433" />
<code_context>
- limit=limit,
- ))
- task_keys.append("communities")
+ for node_type in include:
+ tasks.append(search_by_fulltext(connector, node_type, end_user_id, escaped_query, limit))
+ task_keys.append(node_type.value)
# Execute all queries in parallel
</code_context>
<issue_to_address>
**issue (bug_risk):** Results dictionaries are keyed by strings while later activation logic still expects enum keys.
Since `task_keys` uses `node_type.value`, `results` ends up with string keys, but later logic checks membership using `Neo4jNodeType` values. This mismatch means conditions like `key in include and key in results` never pass, so the activation update block is skipped. Use either enums consistently (key `results` by `Neo4jNodeType`) or strings consistently (convert `include` to `node_type.value`).
</issue_to_address>
### Comment 5
<location path="api/app/core/memory/agent/services/search_service.py" line_range="114-115" />
<code_context>
content_parts = []
# Statements: extract statement field
- if 'statement' in result and result['statement']:
- content_parts.append(result['statement'])
+ if Neo4jNodeType.STATEMENT in result and result[Neo4jNodeType.STATEMENT]:
+ content_parts.append(result[Neo4jNodeType.STATEMENT])
</code_context>
<issue_to_address>
**issue (bug_risk):** Accessing result fields with Neo4jNodeType enum keys will fail; underlying result dictionaries still use string field names.
`result` is a plain dict with string keys (e.g. `'statement'`), so using `Neo4jNodeType.STATEMENT` as a key will not match and will either skip the content or raise a `KeyError`. Keep string-based access here, e.g.:
```python
if result.get("statement"):
content_parts.append(result["statement"])
```
</issue_to_address>
### Comment 6
<location path="api/app/core/memory/agent/services/search_service.py" line_range="232" />
<code_context>
reranked_results = answer.get('reranked_results', {})
# Priority order: summaries first (most contextual), then communities, statements, chunks, entities
</code_context>
<issue_to_address>
**issue (bug_risk):** Priority / include handling now mixes enums and string keys, which will cause categories to be skipped.
In `execute_hybrid_search`, `include` is now `List[Neo4jNodeType]`, but `reranked_results` and `answer` are still keyed by strings like `'summaries'` and `'communities'`. As a result, the loop:
```python
for category in priority_order:
if category in include and category in reranked_results:
...
```
will never match, since `category` is an enum and the dict keys are strings. Either standardize everything on strings (including `include` and `priority_order`) or use `category.value` consistently when building and indexing `reranked_results`/`answer`.
</issue_to_address>
### Comment 7
<location path="api/app/core/memory/src/search.py" line_range="242-244" />
<code_context>
reranked: Dict[str, List[Dict[str, Any]]] = {}
- for category in ["statements", "chunks", "entities", "summaries", "communities"]:
+ for category in [Neo4jNodeType.STATEMENT, Neo4jNodeType.CHUNK, Neo4jNodeType.EXTRACTEDENTITY, Neo4jNodeType.MEMORYSUMMARY, Neo4jNodeType.COMMUNITY]:
keyword_items = keyword_results.get(category, [])
embedding_items = embedding_results.get(category, [])
</code_context>
<issue_to_address>
**issue (bug_risk):** Using Neo4jNodeType as dict keys where the inputs are keyed by strings will break reranking.
`keyword_results` and `embedding_results` are still keyed by string category names (e.g. `'statements'`, `'chunks'`), but the loop now uses `Neo4jNodeType` values. This means:
```python
keyword_items = keyword_results.get(category, [])
embedding_items = embedding_results.get(category, [])
```
will always return `[]`. Either index with `category.value` or keep iterating over string keys and convert to enums earlier in the pipeline.
</issue_to_address>
### Comment 8
<location path="api/app/core/memory/read_services/content_search.py" line_range="155-157" />
<code_context>
+ logger.warning(f"[MemorySearch] embedding search error: {emb_results}")
+ emb_results = {}
+
+ memories = []
+ for node_type in self.includes:
+ reranked = self._rerank(
+ kw_results.get(node_type, []),
+ emb_results.get(node_type, []),
</code_context>
<issue_to_address>
**issue (bug_risk):** Neo4jSearchService mixes enum keys with string-keyed result dicts when reranking.
In `search`, `self.includes` contains `Neo4jNodeType` enums, but `search_graph` / `search_graph_by_embedding` return dicts keyed by `node_type.value` (strings). That means `kw_results.get(node_type, [])` and `emb_results.get(node_type, [])` will always miss, so nothing is reranked or returned. Use the enum value as the key instead:
```python
for node_type in self.includes:
key = node_type.value
reranked = self._rerank(
kw_results.get(key, []),
emb_results.get(key, []),
limit,
)
```
so the results are actually used.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
… client handling - Consolidate memory search services by removing separate content_search.py and perceptual_search.py - Update model client handling in base_pipeline.py to use ModelApiKeyService for LLM client initialization - Add new prompt files and modify existing services to support consolidated search architecture - Refactor memory read pipeline and related services to use updated model client approach
5777f7b to
a01525e
Compare
… client handling - Consolidate memory search services by removing separate content_search.py and perceptual_search.py - Update model client handling in base_pipeline.py to use ModelApiKeyService for LLM client initialization - Add new prompt files and modify existing services to support consolidated search architecture - Refactor memory read pipeline and related services to use updated model client approach
3f05549 to
33bfe88
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary by Sourcery
重构内存搜索栈以使用统一的枚举和模型客户端,集中管理 Neo4j 全文与向量检索逻辑(包括感知节点),并引入带有查询预处理和结果构建器的新 MemoryService 读取管线。
New Features:
Bug Fixes:
invalid_at和embeddings),以避免运行时错误。Enhancements:
execute_query的参数命名进行更清晰的调整。deduplicate_results辅助函数。Documentation:
prompt和desc字段必须为字符串类型。Original summary in English
Summary by Sourcery
Refactor the memory search stack to use unified enums and model clients, centralize Neo4j fulltext and embedding search logic (including perceptual nodes), and introduce a new MemoryService read pipeline with query preprocessing and result builders.
New Features:
Bug Fixes:
Enhancements:
Documentation: