From 7e36842b88e5e3916b4f4686f091beb94afcd07b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=B5=B5=E7=A6=B9?= Date: Thu, 7 May 2026 14:44:59 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E8=AE=B0=E5=BF=86=E8=AF=BB?= =?UTF-8?q?=E5=86=99=E6=B5=81=E7=A8=8B=E4=B8=AD=E7=9A=84=E5=A4=9A=E4=B8=AA?= =?UTF-8?q?=E7=BC=BA=E9=99=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 修复空QA批次导致跨批次连续性引用过期的问题 (updater.py) - 清理search_sessions中永久为空的关键词相似度死代码 (mid_term.py) - 修复热度触发仅处理单个session的问题,改为循环处理所有达到阈值的session (memoryos.py) - 修复热session无未分析页面时持续占据堆顶不释放的问题 (memoryos.py) - 合并多次save()调用为循环结束后单次保存,减少冗余I/O (memoryos.py) - 修正System Prompt中将conversation metadata误标为User's profile的命名混淆 (prompts.py) Co-Authored-By: Claude Opus 4.6 --- memoryos-pypi/memoryos.py | 192 ++++++++++++++++++++------------------ memoryos-pypi/mid_term.py | 29 ++---- memoryos-pypi/prompts.py | 2 +- memoryos-pypi/updater.py | 4 +- 4 files changed, 114 insertions(+), 113 deletions(-) diff --git a/memoryos-pypi/memoryos.py b/memoryos-pypi/memoryos.py index 24ebc4c..94810a2 100644 --- a/memoryos-pypi/memoryos.py +++ b/memoryos-pypi/memoryos.py @@ -125,103 +125,117 @@ def __init__(self, user_id: str, def _trigger_profile_and_knowledge_update_if_needed(self): """ - Checks mid-term memory for hot segments and triggers profile/knowledge update if threshold is met. - Adapted from main_memoybank.py's update_user_profile_from_top_segment. - Enhanced with parallel LLM processing for better performance. + Checks mid-term memory for hot segments and triggers profile/knowledge + update if threshold is met. Processes ALL sessions above the threshold + (not just the single hottest), so that a burst of activity on multiple + topics does not leave any session indefinitely waiting for analysis. """ if not self.mid_term_memory.heap: return - # Peek at the top of the heap (hottest segment) - # MidTermMemory heap stores (-H_segment, sid) - neg_heat, sid = self.mid_term_memory.heap[0] - current_heat = -neg_heat + processed_any = False + + # Loop while the hottest session is above threshold, so that every + # session that has crossed the line gets analysed in one pass. + while self.mid_term_memory.heap: + neg_heat, sid = self.mid_term_memory.heap[0] + current_heat = -neg_heat + + if current_heat < self.mid_term_heat_threshold: + break # nothing else is hot enough - if current_heat >= self.mid_term_heat_threshold: session = self.mid_term_memory.sessions.get(sid) if not session: - self.mid_term_memory.rebuild_heap() # Clean up if session is gone - return - - # Get unanalyzed pages from this hot session - # A page is a dict: {"user_input": ..., "agent_response": ..., "timestamp": ..., "analyzed": False, ...} - unanalyzed_pages = [p for p in session.get("details", []) if not p.get("analyzed", False)] - - if unanalyzed_pages: - print(f"Memoryos: Mid-term session {sid} heat ({current_heat:.2f}) exceeded threshold. Analyzing {len(unanalyzed_pages)} pages for profile/knowledge update.") - - # 并行执行两个LLM任务:用户画像分析(已包含更新)、知识提取 - def task_user_profile_analysis(): - print("Memoryos: Starting parallel user profile analysis and update...") - # 获取现有用户画像 - existing_profile = self.user_long_term_memory.get_raw_user_profile(self.user_id) - if not existing_profile or existing_profile.lower() == "none": - existing_profile = "No existing profile data." - - # 直接输出更新后的完整画像 - return gpt_user_profile_analysis(unanalyzed_pages, self.client, model=self.llm_model, existing_user_profile=existing_profile) - - def task_knowledge_extraction(): - print("Memoryos: Starting parallel knowledge extraction...") - return gpt_knowledge_extraction(unanalyzed_pages, self.client, model=self.llm_model) - - # 使用并行任务执行 - with ThreadPoolExecutor(max_workers=2) as executor: - # 提交两个主要任务 - future_profile = executor.submit(task_user_profile_analysis) - future_knowledge = executor.submit(task_knowledge_extraction) - - # 等待结果 - try: - updated_user_profile = future_profile.result() # 直接是更新后的完整画像 - knowledge_result = future_knowledge.result() - except Exception as e: - print(f"Error in parallel LLM processing: {e}") - return - - new_user_private_knowledge = knowledge_result.get("private") - new_assistant_knowledge = knowledge_result.get("assistant_knowledge") - - # 直接使用更新后的完整用户画像 - if (updated_user_profile and - updated_user_profile.lower() != "none" and - len(updated_user_profile.strip()) >= 30): - print("Memoryos: Updating user profile with integrated analysis...") - self.user_long_term_memory.update_user_profile(self.user_id, updated_user_profile, merge=False) # 直接替换为新的完整画像 - else: - print("Memoryos: Skipping user profile update due to insufficient content.") - - # Add User Private Knowledge to user's LTM - if new_user_private_knowledge and new_user_private_knowledge.lower() != "none": - for line in new_user_private_knowledge.split('\n'): - if line.strip() and line.strip().lower() not in ["none", "- none", "- none."]: - self.user_long_term_memory.add_user_knowledge(line.strip()) - - # Add Assistant Knowledge to assistant's LTM - if new_assistant_knowledge and new_assistant_knowledge.lower() != "none": - for line in new_assistant_knowledge.split('\n'): - if line.strip() and line.strip().lower() not in ["none", "- none", "- none."]: - self.assistant_long_term_memory.add_assistant_knowledge(line.strip()) # Save to dedicated assistant LTM - - # Mark pages as analyzed and reset session heat contributors - for p in session["details"]: - p["analyzed"] = True # Mark all pages in session, or just unanalyzed_pages? - # Original code marked all pages in session - - session["N_visit"] = 0 # Reset visits after analysis - session["L_interaction"] = 0 # Reset interaction length contribution - # session["R_recency"] = 1.0 # Recency will re-calculate naturally - session["H_segment"] = compute_segment_heat(session) # Recompute heat with reset factors - session["last_visit_time"] = get_timestamp() # Update last visit time - - self.mid_term_memory.rebuild_heap() # Heap needs rebuild due to H_segment change - self.mid_term_memory.save() - print(f"Memoryos: Profile/Knowledge update for session {sid} complete. Heat reset.") + self.mid_term_memory.rebuild_heap() + continue # stale heap entry – rebuild and re-check + + unanalyzed_pages = [p for p in session.get("details", []) + if not p.get("analyzed", False)] + + if not unanalyzed_pages: + # Session is hot but everything was already analysed. + # Reset its heat contributors so it does not keep occupying + # the top of the heap on every subsequent call. + print(f"Memoryos: Hot session {sid} has no unanalyzed pages. " + f"Resetting heat to allow other sessions to surface.") + session["N_visit"] = 0 + session["H_segment"] = compute_segment_heat(session) + self.mid_term_memory.rebuild_heap() + processed_any = True + continue + + print(f"Memoryos: Mid-term session {sid} heat ({current_heat:.2f}) " + f"exceeded threshold. Analyzing {len(unanalyzed_pages)} pages " + f"for profile/knowledge update.") + + # 并行执行两个LLM任务:用户画像分析(已包含更新)、知识提取 + def task_user_profile_analysis(): + print("Memoryos: Starting parallel user profile analysis and update...") + existing_profile = self.user_long_term_memory.get_raw_user_profile(self.user_id) + if not existing_profile or existing_profile.lower() == "none": + existing_profile = "No existing profile data." + return gpt_user_profile_analysis(unanalyzed_pages, self.client, + model=self.llm_model, + existing_user_profile=existing_profile) + + def task_knowledge_extraction(): + print("Memoryos: Starting parallel knowledge extraction...") + return gpt_knowledge_extraction(unanalyzed_pages, self.client, + model=self.llm_model) + + with ThreadPoolExecutor(max_workers=2) as executor: + future_profile = executor.submit(task_user_profile_analysis) + future_knowledge = executor.submit(task_knowledge_extraction) + + try: + updated_user_profile = future_profile.result() + knowledge_result = future_knowledge.result() + except Exception as e: + print(f"Error in parallel LLM processing: {e}") + break # do not process further sessions on error + + new_user_private_knowledge = knowledge_result.get("private") + new_assistant_knowledge = knowledge_result.get("assistant_knowledge") + + # 直接使用更新后的完整用户画像 + if (updated_user_profile + and updated_user_profile.lower() != "none" + and len(updated_user_profile.strip()) >= 30): + print("Memoryos: Updating user profile with integrated analysis...") + self.user_long_term_memory.update_user_profile( + self.user_id, updated_user_profile, merge=False) else: - print(f"Memoryos: Hot session {sid} has no unanalyzed pages. Skipping profile update.") - else: - # print(f"Memoryos: Top session {sid} heat ({current_heat:.2f}) below threshold. No profile update.") - pass # No action if below threshold + print("Memoryos: Skipping user profile update due to " + "insufficient content.") + + # Add User Private Knowledge to user's LTM + if new_user_private_knowledge and new_user_private_knowledge.lower() != "none": + for line in new_user_private_knowledge.split('\n'): + if line.strip() and line.strip().lower() not in ["none", "- none", "- none."]: + self.user_long_term_memory.add_user_knowledge(line.strip()) + + # Add Assistant Knowledge to assistant's LTM + if new_assistant_knowledge and new_assistant_knowledge.lower() != "none": + for line in new_assistant_knowledge.split('\n'): + if line.strip() and line.strip().lower() not in ["none", "- none", "- none."]: + self.assistant_long_term_memory.add_assistant_knowledge(line.strip()) + + # Mark pages as analyzed and reset session heat contributors + for p in session["details"]: + p["analyzed"] = True + + session["N_visit"] = 0 + session["L_interaction"] = 0 + session["H_segment"] = compute_segment_heat(session) + session["last_visit_time"] = get_timestamp() + + self.mid_term_memory.rebuild_heap() + processed_any = True + print(f"Memoryos: Profile/Knowledge update for session {sid} " + f"complete. Heat reset.") + + if processed_any: + self.mid_term_memory.save() def add_memory(self, user_input: str, agent_response: str, timestamp: str = None, meta_data: dict = None): """ diff --git a/memoryos-pypi/mid_term.py b/memoryos-pypi/mid_term.py index e686a19..a63512a 100644 --- a/memoryos-pypi/mid_term.py +++ b/memoryos-pypi/mid_term.py @@ -278,8 +278,8 @@ def insert_pages_into_session(self, summary_for_new_pages, keywords_for_new_page print(f"MidTermMemory: No suitable session to merge (best score {best_overall_score:.2f} < threshold {similarity_threshold}). Creating new session.") return self.add_session(summary_for_new_pages, pages_to_insert, keywords_for_new_pages) - def search_sessions(self, query_text, segment_similarity_threshold=0.1, page_similarity_threshold=0.1, - top_k_sessions=5, keyword_alpha=1.0, recency_tau_search=3600): + def search_sessions(self, query_text, segment_similarity_threshold=0.1, page_similarity_threshold=0.1, + top_k_sessions=5): if not self.sessions: return [] @@ -289,9 +289,7 @@ def search_sessions(self, query_text, segment_similarity_threshold=0.1, page_sim **self.embedding_model_kwargs ) query_vec = normalize_vector(query_vec) - query_keywords = set() # Keywords extraction removed, relying on semantic similarity - candidate_sessions = [] session_ids = list(self.sessions.keys()) if not session_ids: return [] @@ -301,7 +299,7 @@ def search_sessions(self, query_text, segment_similarity_threshold=0.1, page_sim dim = summary_embeddings_np.shape[1] index = faiss.IndexFlatIP(dim) # Inner product for similarity index.add(summary_embeddings_np) - + query_arr_np = np.array([query_vec], dtype=np.float32) distances, indices = index.search(query_arr_np, min(top_k_sessions, len(session_ids))) @@ -310,33 +308,20 @@ def search_sessions(self, query_text, segment_similarity_threshold=0.1, page_sim for i, idx in enumerate(indices[0]): if idx == -1: continue - + session_id = session_ids[idx] session = self.sessions[session_id] semantic_sim_score = float(distances[0][i]) # This is the dot product - # Keyword similarity for session summary - session_keywords = set(session.get("summary_keywords", [])) - s_topic_keywords = 0 - if query_keywords and session_keywords: - intersection = len(query_keywords.intersection(session_keywords)) - union = len(query_keywords.union(session_keywords)) - if union > 0: s_topic_keywords = intersection / union - - # Time decay for session recency in search scoring - # time_decay_factor = compute_time_decay(session["timestamp"], current_time_str, tau_hours=recency_tau_search) - - # Combined score for session relevance - session_relevance_score = (semantic_sim_score + keyword_alpha * s_topic_keywords) + # Session relevance is based purely on semantic similarity (dot product of + # normalized summary embeddings = cosine similarity). + session_relevance_score = semantic_sim_score if session_relevance_score >= segment_similarity_threshold: matched_pages_in_session = [] for page in session.get("details", []): page_embedding = np.array(page["page_embedding"], dtype=np.float32) - # page_keywords = set(page.get("page_keywords", [])) - page_sim_score = float(np.dot(page_embedding, query_vec)) - # Can also add keyword sim for pages if needed, but keeping it simpler for now if page_sim_score >= page_similarity_threshold: matched_pages_in_session.append({"page_data": page, "score": page_sim_score}) diff --git a/memoryos-pypi/prompts.py b/memoryos-pypi/prompts.py index 48fbbd6..650228a 100644 --- a/memoryos-pypi/prompts.py +++ b/memoryos-pypi/prompts.py @@ -6,7 +6,7 @@ GENERATE_SYSTEM_RESPONSE_SYSTEM_PROMPT = ( "As a communication expert with outstanding communication habits, you embody the role of {relationship} throughout the following dialogues.\n" "Here are some of your distinctive personal traits and knowledge:\n{assistant_knowledge_text}\n" - "User's profile:\n" + "Current conversation metadata:\n" "{meta_data_text}\n" "Your task is to generate responses that align with these traits and maintain the tone.\n" ) diff --git a/memoryos-pypi/updater.py b/memoryos-pypi/updater.py index 831fe8a..1045263 100644 --- a/memoryos-pypi/updater.py +++ b/memoryos-pypi/updater.py @@ -105,7 +105,9 @@ def process_short_term_to_mid_term(self): evicted_qas.append(qa) if not evicted_qas: - print("Updater: No QAs evicted from short-term memory.") + print("Updater: No valid QAs evicted from short-term memory. " + "Resetting cross-batch continuity tracker to prevent stale linking.") + self.last_evicted_page_for_continuity = None return print(f"Updater: Processing {len(evicted_qas)} QAs from short-term to mid-term.")