Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions examples/openclaw-plugin/text-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ export function sanitizeUserTextForCapture(text: string): string {
// 格式: "System: [时间] Compacted ... Context ... [时间] 实际内容"
if (COMPACTED_SYSTEM_MSG_RE.test(text)) {
// 提取最后一个 ] 之后的内容(即实际用户输入)
const match = text.match(/\]\s*(.+)$/);
if (match && match[1]) {
return match[1].replace(/\s+/g, " ").trim();
const lastBracketIndex = text.lastIndexOf("]");
if (lastBracketIndex !== -1) {
const content = text.slice(lastBracketIndex + 1);
return content.replace(/\s+/g, " ").trim();
}
return "";
}
Expand Down
72 changes: 5 additions & 67 deletions openviking/session/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,24 +182,6 @@ def __init__(

logger.info(f"Session created: {self.session_id} for user {self.user}")

async def _get_latest_archive_last_msg_time(self) -> Optional[float]:
"""获取上一次归档的最后一条消息的时间戳(毫秒),用于过滤需要提取记忆的消息。"""
# 使用 compression_index - 1 获取上一次归档的索引
if self._compression.compression_index <= 1:
return None
# 获取上一次归档的目录
archive_uri = (
f"{self._session_uri}/history/archive_{self._compression.compression_index - 1:03d}"
)
messages = await self._read_archive_messages(archive_uri)
if messages and messages[-1].created_at:
# 解析 ISO 时间戳为毫秒
from dateutil import parser

dt = parser.parse(messages[-1].created_at)
return dt.timestamp() * 1000
return None

async def load(self):
"""Load session data from storage."""
if self._loaded:
Expand Down Expand Up @@ -468,11 +450,6 @@ async def commit_async(self) -> Dict[str, Any]:
await self._save_meta()

self._compression.original_count += len(messages_to_archive)
# 从最新归档中获取上一次归档的时间戳(用于过滤需要提取记忆的消息)
previous_archive_time = await self._get_latest_archive_last_msg_time()
logger.info(
f"commit_async: previous_archive_time from archive={previous_archive_time}, compression_index={self._compression.compression_index}"
)
logger.info(
f"Archived: {len(messages_to_archive)} messages → "
f"history/archive_{self._compression.compression_index:03d}/"
Expand All @@ -490,38 +467,14 @@ async def commit_async(self) -> Dict[str, Any]:
owner_user_id=self.ctx.user.user_id,
)

# 只传递上一次归档之后的新消息给 memory extraction
# 使用 previous_archive_time 过滤(在设置新时间之前)
messages_for_extraction = messages_to_archive
logger.info(
f"Memory extraction filter: previous_archive_time={previous_archive_time}, "
f"messages_count={len(messages_to_archive)}"
)
if previous_archive_time:
archive_time_str = datetime.fromtimestamp(
previous_archive_time / 1000, timezone.utc
).isoformat()
# 打印前几条消息的 created_at 用于调试
if messages_to_archive:
logger.info(
f"Debug: first msg created_at={messages_to_archive[0].created_at}, "
f"archive_time_str={archive_time_str}"
)
messages_for_extraction = [
m for m in messages_to_archive if m.created_at and m.created_at > archive_time_str
]
logger.info(
f"Memory extraction: {len(messages_to_archive)} total -> {len(messages_for_extraction)} new (after {previous_archive_time})"
)

asyncio.create_task(
self._run_memory_extraction(
task_id=task.task_id,
archive_uri=archive_uri,
messages=messages_for_extraction,
messages=messages_to_archive,
usage_records=usage_snapshot,
first_message_id=messages_for_extraction[0].id if messages_for_extraction else "",
last_message_id=messages_for_extraction[-1].id if messages_for_extraction else "",
first_message_id=messages_to_archive[0].id if messages_to_archive else "",
last_message_id=messages_to_archive[-1].id if messages_to_archive else "",
)
)

Expand Down Expand Up @@ -790,21 +743,6 @@ async def get_session_context(self, token_budget: int = 128_000) -> Dict[str, An
context = await self._collect_session_context_components()
merged_messages = context["messages"]

# 过滤:只返回上一次归档之后的最新消息(从最新归档获取时间戳)
previous_time = await self._get_latest_archive_last_msg_time()
if previous_time:
original_count = len(merged_messages)
archive_time_str = datetime.fromtimestamp(
previous_time / 1000, timezone.utc
).isoformat()
merged_messages = [
m for m in merged_messages if m.created_at and m.created_at > archive_time_str
]
logger.info(
f"[get_session_context] filtered messages: {original_count} -> {len(merged_messages)}, "
f"after archive time={previous_time}"
)

message_tokens = sum(m.estimated_tokens for m in merged_messages)

# 精简日志:只打印关键信息
Expand All @@ -823,7 +761,7 @@ async def get_session_context(self, token_budget: int = 128_000) -> Dict[str, An
if include_latest_overview:
remaining_budget -= latest_archive_tokens

# 不再返回 pre_archive_abstracts(只保留 latest_archive_overview)
# pre_archive_abstracts: 保留字段返回空数组,保持 API 向下兼容
included_pre_archive_abstracts: List[Dict[str, str]] = []
pre_archive_tokens = 0

Expand All @@ -837,7 +775,7 @@ async def get_session_context(self, token_budget: int = 128_000) -> Dict[str, An
"latest_archive_overview": (
latest_archive["overview"] if include_latest_overview else ""
),
# 不再返回 pre_archive_abstracts
"pre_archive_abstracts": [], # 保持 API 向后兼容,返回空数组
"messages": [m.to_dict() for m in merged_messages],
"estimatedTokens": message_tokens + archive_tokens,
"stats": {
Expand Down
Loading