diff --git a/examples/openclaw-plugin/text-utils.ts b/examples/openclaw-plugin/text-utils.ts index 8e11c6214..5a4ec90fe 100644 --- a/examples/openclaw-plugin/text-utils.ts +++ b/examples/openclaw-plugin/text-utils.ts @@ -58,9 +58,10 @@ export function sanitizeUserTextForCapture(text: string): string { // 格式: "System: [时间] Compacted ... Context ... [时间] 实际内容" if (COMPACTED_SYSTEM_MSG_RE.test(text)) { // 提取最后一个 ] 之后的内容(即实际用户输入) - const match = text.match(/\]\s*(.+)$/); - if (match && match[1]) { - return match[1].replace(/\s+/g, " ").trim(); + const lastBracketIndex = text.lastIndexOf("]"); + if (lastBracketIndex !== -1) { + const content = text.slice(lastBracketIndex + 1); + return content.replace(/\s+/g, " ").trim(); } return ""; } diff --git a/openviking/session/session.py b/openviking/session/session.py index 3bcf276b4..b6ec14965 100644 --- a/openviking/session/session.py +++ b/openviking/session/session.py @@ -182,24 +182,6 @@ def __init__( logger.info(f"Session created: {self.session_id} for user {self.user}") - async def _get_latest_archive_last_msg_time(self) -> Optional[float]: - """获取上一次归档的最后一条消息的时间戳(毫秒),用于过滤需要提取记忆的消息。""" - # 使用 compression_index - 1 获取上一次归档的索引 - if self._compression.compression_index <= 1: - return None - # 获取上一次归档的目录 - archive_uri = ( - f"{self._session_uri}/history/archive_{self._compression.compression_index - 1:03d}" - ) - messages = await self._read_archive_messages(archive_uri) - if messages and messages[-1].created_at: - # 解析 ISO 时间戳为毫秒 - from dateutil import parser - - dt = parser.parse(messages[-1].created_at) - return dt.timestamp() * 1000 - return None - async def load(self): """Load session data from storage.""" if self._loaded: @@ -468,11 +450,6 @@ async def commit_async(self) -> Dict[str, Any]: await self._save_meta() self._compression.original_count += len(messages_to_archive) - # 从最新归档中获取上一次归档的时间戳(用于过滤需要提取记忆的消息) - previous_archive_time = await self._get_latest_archive_last_msg_time() - logger.info( - f"commit_async: previous_archive_time from archive={previous_archive_time}, compression_index={self._compression.compression_index}" - ) logger.info( f"Archived: {len(messages_to_archive)} messages → " f"history/archive_{self._compression.compression_index:03d}/" @@ -490,38 +467,14 @@ async def commit_async(self) -> Dict[str, Any]: owner_user_id=self.ctx.user.user_id, ) - # 只传递上一次归档之后的新消息给 memory extraction - # 使用 previous_archive_time 过滤(在设置新时间之前) - messages_for_extraction = messages_to_archive - logger.info( - f"Memory extraction filter: previous_archive_time={previous_archive_time}, " - f"messages_count={len(messages_to_archive)}" - ) - if previous_archive_time: - archive_time_str = datetime.fromtimestamp( - previous_archive_time / 1000, timezone.utc - ).isoformat() - # 打印前几条消息的 created_at 用于调试 - if messages_to_archive: - logger.info( - f"Debug: first msg created_at={messages_to_archive[0].created_at}, " - f"archive_time_str={archive_time_str}" - ) - messages_for_extraction = [ - m for m in messages_to_archive if m.created_at and m.created_at > archive_time_str - ] - logger.info( - f"Memory extraction: {len(messages_to_archive)} total -> {len(messages_for_extraction)} new (after {previous_archive_time})" - ) - asyncio.create_task( self._run_memory_extraction( task_id=task.task_id, archive_uri=archive_uri, - messages=messages_for_extraction, + messages=messages_to_archive, usage_records=usage_snapshot, - first_message_id=messages_for_extraction[0].id if messages_for_extraction else "", - last_message_id=messages_for_extraction[-1].id if messages_for_extraction else "", + first_message_id=messages_to_archive[0].id if messages_to_archive else "", + last_message_id=messages_to_archive[-1].id if messages_to_archive else "", ) ) @@ -790,21 +743,6 @@ async def get_session_context(self, token_budget: int = 128_000) -> Dict[str, An context = await self._collect_session_context_components() merged_messages = context["messages"] - # 过滤:只返回上一次归档之后的最新消息(从最新归档获取时间戳) - previous_time = await self._get_latest_archive_last_msg_time() - if previous_time: - original_count = len(merged_messages) - archive_time_str = datetime.fromtimestamp( - previous_time / 1000, timezone.utc - ).isoformat() - merged_messages = [ - m for m in merged_messages if m.created_at and m.created_at > archive_time_str - ] - logger.info( - f"[get_session_context] filtered messages: {original_count} -> {len(merged_messages)}, " - f"after archive time={previous_time}" - ) - message_tokens = sum(m.estimated_tokens for m in merged_messages) # 精简日志:只打印关键信息 @@ -823,7 +761,7 @@ async def get_session_context(self, token_budget: int = 128_000) -> Dict[str, An if include_latest_overview: remaining_budget -= latest_archive_tokens - # 不再返回 pre_archive_abstracts(只保留 latest_archive_overview) + # pre_archive_abstracts: 保留字段返回空数组,保持 API 向下兼容 included_pre_archive_abstracts: List[Dict[str, str]] = [] pre_archive_tokens = 0 @@ -837,7 +775,7 @@ async def get_session_context(self, token_budget: int = 128_000) -> Dict[str, An "latest_archive_overview": ( latest_archive["overview"] if include_latest_overview else "" ), - # 不再返回 pre_archive_abstracts + "pre_archive_abstracts": [], # 保持 API 向后兼容,返回空数组 "messages": [m.to_dict() for m in merged_messages], "estimatedTokens": message_tokens + archive_tokens, "stats": {