diff --git a/openviking/storage/queuefs/semantic_cache.py b/openviking/storage/queuefs/semantic_cache.py new file mode 100644 index 000000000..d1b49a803 --- /dev/null +++ b/openviking/storage/queuefs/semantic_cache.py @@ -0,0 +1,64 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 +"""Helpers for directory-level semantic summary caches.""" + +import json +from typing import Dict, List + +SUMMARY_CACHE_FILENAME = ".summary_cache.json" +MANAGED_HIDDEN_SEMANTIC_FILES = frozenset( + { + ".abstract.md", + ".overview.md", + SUMMARY_CACHE_FILENAME, + } +) + + +def build_summary_cache(file_summaries: List[Dict[str, str]]) -> Dict[str, str]: + """Build a filename -> summary map from generated file summaries.""" + cache: Dict[str, str] = {} + for item in file_summaries: + name = item.get("name", "").strip() + if not name: + continue + summary = item.get("summary", "") + cache[name] = summary if isinstance(summary, str) else str(summary) + return cache + + +def serialize_summary_cache(file_summaries: List[Dict[str, str]]) -> str: + """Serialize summary cache content for storage on disk.""" + return json.dumps(build_summary_cache(file_summaries), ensure_ascii=False, sort_keys=True) + + +def parse_summary_cache(content: str) -> Dict[str, str]: + """Parse a stored summary cache. + + Returns an empty mapping for missing or invalid content. + """ + if not content or not content.strip(): + return {} + + try: + payload = json.loads(content) + except (TypeError, ValueError, json.JSONDecodeError): + return {} + + if not isinstance(payload, dict): + return {} + + cache: Dict[str, str] = {} + for raw_name, raw_summary in payload.items(): + if not isinstance(raw_name, str): + continue + name = raw_name.strip() + if not name: + continue + if isinstance(raw_summary, str): + cache[name] = raw_summary + elif raw_summary is None: + cache[name] = "" + else: + cache[name] = str(raw_summary) + return cache diff --git a/openviking/storage/queuefs/semantic_dag.py b/openviking/storage/queuefs/semantic_dag.py index 9b4a784ff..2bbc460c4 100644 --- a/openviking/storage/queuefs/semantic_dag.py +++ b/openviking/storage/queuefs/semantic_dag.py @@ -7,6 +7,11 @@ from typing import Awaitable, Callable, Dict, List, Optional from openviking.server.identity import RequestContext +from openviking.storage.queuefs.semantic_cache import ( + SUMMARY_CACHE_FILENAME, + parse_summary_cache, + serialize_summary_cache, +) from openviking.storage.viking_fs import get_viking_fs from openviking.telemetry.request_wait_tracker import get_request_wait_tracker from openviking_cli.utils import VikingURI @@ -360,16 +365,27 @@ async def _read_existing_summary(self, file_path: str) -> Optional[Dict[str, str pass async with self._overview_cache_lock: if parent_uri not in self._overview_cache: - overview_path = f"{parent_uri}/.overview.md" - overview_content = await self._viking_fs.read_file( - overview_path, ctx=self._ctx - ) - if overview_content: - self._overview_cache[parent_uri] = self._processor._parse_overview_md( - overview_content + summary_cache: Dict[str, str] = {} + try: + cache_content = await self._viking_fs.read_file( + f"{parent_uri}/{SUMMARY_CACHE_FILENAME}", + ctx=self._ctx, + ) + summary_cache = parse_summary_cache(cache_content) + except Exception as e: + logger.debug( + f"Failed to read {SUMMARY_CACHE_FILENAME} for {parent_uri}: {e}" + ) + + if not summary_cache: + overview_path = f"{parent_uri}/.overview.md" + overview_content = await self._viking_fs.read_file( + overview_path, ctx=self._ctx ) - else: - self._overview_cache[parent_uri] = {} + if overview_content: + summary_cache = self._processor._parse_overview_md(overview_content) + + self._overview_cache[parent_uri] = summary_cache else: try: from openviking.metrics.datasources.cache import CacheEventDataSource @@ -579,6 +595,19 @@ async def _overview_task(self, dir_uri: str) -> None: except Exception: logger.info(f"[SemanticDag] {dir_uri} write failed, skipping") + try: + async with node.lock: + file_summaries = self._finalize_file_summaries(node) + await self._viking_fs.write_file( + f"{dir_uri}/{SUMMARY_CACHE_FILENAME}", + serialize_summary_cache(file_summaries), + ctx=self._ctx, + ) + except Exception as e: + logger.info( + f"[SemanticDag] Failed to write {SUMMARY_CACHE_FILENAME} for {dir_uri}: {e}" + ) + try: if need_vectorize: task = VectorizeTask( diff --git a/openviking/storage/queuefs/semantic_processor.py b/openviking/storage/queuefs/semantic_processor.py index 32031b536..9989e565e 100644 --- a/openviking/storage/queuefs/semantic_processor.py +++ b/openviking/storage/queuefs/semantic_processor.py @@ -28,6 +28,12 @@ from openviking.prompts import render_prompt from openviking.server.identity import RequestContext, Role from openviking.storage.queuefs.named_queue import DequeueHandlerBase +from openviking.storage.queuefs.semantic_cache import ( + MANAGED_HIDDEN_SEMANTIC_FILES, + SUMMARY_CACHE_FILENAME, + parse_summary_cache, + serialize_summary_cache, +) from openviking.storage.queuefs.semantic_dag import DagStats, SemanticDagExecutor from openviking.storage.queuefs.semantic_msg import SemanticMsg from openviking.storage.viking_fs import get_viking_fs @@ -218,6 +224,24 @@ async def _check_file_content_changed( except Exception: return True + async def _read_directory_summary_cache( + self, dir_uri: str, ctx: Optional[RequestContext] = None + ) -> Dict[str, str]: + """Read cached file summaries for a directory.""" + viking_fs = get_viking_fs() + try: + cache_content = await viking_fs.read_file( + f"{dir_uri}/{SUMMARY_CACHE_FILENAME}", ctx=ctx + ) + except Exception as e: + logger.debug(f"No summary cache found for {dir_uri}: {e}") + return {} + + cache = parse_summary_cache(cache_content) + if cache: + logger.info(f"Loaded {len(cache)} cached summaries from {SUMMARY_CACHE_FILENAME}") + return cache + async def _reenqueue_semantic_msg(self, msg: SemanticMsg) -> None: """Re-enqueue a semantic message for later processing. @@ -475,15 +499,17 @@ def _mark_failed(message: str) -> None: existing_summaries: Dict[str, str] = {} if msg.changes: - try: - old_overview = await viking_fs.read_file(f"{dir_uri}/.overview.md", ctx=ctx) - if old_overview: - existing_summaries = self._parse_overview_md(old_overview) - logger.info( - f"Parsed {len(existing_summaries)} existing summaries from overview.md" - ) - except Exception as e: - logger.debug(f"No existing overview.md found for {dir_uri}: {e}") + existing_summaries = await self._read_directory_summary_cache(dir_uri, ctx=ctx) + if not existing_summaries: + try: + old_overview = await viking_fs.read_file(f"{dir_uri}/.overview.md", ctx=ctx) + if old_overview: + existing_summaries = self._parse_overview_md(old_overview) + logger.info( + f"Parsed {len(existing_summaries)} existing summaries from overview.md" + ) + except Exception as e: + logger.debug(f"No existing overview.md found for {dir_uri}: {e}") changed_files: Set[str] = set() if msg.changes: @@ -573,6 +599,15 @@ async def _gen(idx: int, file_path: str) -> None: await self._release_memory_lifecycle_lock(msg.lifecycle_lock_handle_id) return + try: + await viking_fs.write_file( + f"{dir_uri}/{SUMMARY_CACHE_FILENAME}", + serialize_summary_cache(file_summaries), + ctx=ctx, + ) + except Exception as e: + logger.warning(f"Failed to write {SUMMARY_CACHE_FILENAME} for {dir_uri}: {e}") + try: if msg.telemetry_id and msg.id: from openviking.storage.queuefs.embedding_tracker import EmbeddingTaskTracker @@ -640,7 +675,7 @@ async def list_children(dir_uri: str) -> Tuple[Dict[str, str], Dict[str, str]]: name = entry.get("name", "") if not name or name in [".", ".."]: continue - if name.startswith(".") and name not in [".abstract.md", ".overview.md"]: + if name.startswith(".") and name not in MANAGED_HIDDEN_SEMANTIC_FILES: continue item_uri = VikingURI(dir_uri).join(name).uri if entry.get("isDir", False): diff --git a/tests/storage/test_semantic_dag_incremental_missing_summary.py b/tests/storage/test_semantic_dag_incremental_missing_summary.py index a3a69098c..cf8ef5bb9 100644 --- a/tests/storage/test_semantic_dag_incremental_missing_summary.py +++ b/tests/storage/test_semantic_dag_incremental_missing_summary.py @@ -1,6 +1,7 @@ # Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. # SPDX-License-Identifier: AGPL-3.0 +import json import re from unittest.mock import AsyncMock, MagicMock @@ -163,6 +164,45 @@ async def test_incremental_missing_summary_triggers_overview_regen(monkeypatch): assert len(processor.summarized_files) == first_run_calls +@pytest.mark.asyncio +async def test_incremental_uses_summary_cache_when_overview_titles_are_descriptive(monkeypatch): + _mock_transaction_layer(monkeypatch) + + root_uri = "viking://resources/root" + target_uri = "viking://resources/target" + tree = { + root_uri: [{"name": "a.txt", "isDir": False}], + target_uri: [{"name": "a.txt", "isDir": False}], + } + + fake_fs = _FakeVikingFS( + tree=tree, + file_contents={ + f"{root_uri}/a.txt": "hello", + f"{target_uri}/a.txt": "hello", + f"{target_uri}/.overview.md": "# root\n\n## Detailed Description\n### Session Context Management\nCached summary", + f"{target_uri}/.abstract.md": "old-abstract", + f"{target_uri}/.summary_cache.json": json.dumps({"a.txt": "Cached summary"}), + }, + ) + monkeypatch.setattr("openviking.storage.queuefs.semantic_dag.get_viking_fs", lambda: fake_fs) + + processor = _FakeProcessor(fake_fs) + ctx = RequestContext(user=UserIdentifier("acc1", "user1", "agent1"), role=Role.USER) + + executor = SemanticDagExecutor( + processor=processor, + context_type="resource", + max_concurrent_llm=2, + ctx=ctx, + incremental_update=True, + target_uri=target_uri, + ) + monkeypatch.setattr(executor, "_add_vectorize_task", AsyncMock()) + await executor.run(root_uri) + + assert processor.summarized_files == [] + + if __name__ == "__main__": pytest.main([__file__]) - diff --git a/tests/storage/test_semantic_processor_summary_cache.py b/tests/storage/test_semantic_processor_summary_cache.py new file mode 100644 index 000000000..7b8fcd8d5 --- /dev/null +++ b/tests/storage/test_semantic_processor_summary_cache.py @@ -0,0 +1,76 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 + +import json +from unittest.mock import AsyncMock + +import pytest + +from openviking.storage.queuefs.semantic_msg import SemanticMsg +from openviking.storage.queuefs.semantic_processor import SemanticProcessor + + +class _FakeVikingFS: + def __init__(self): + self.entries = { + "viking://user/default/memories/preferences": [ + {"name": "a.txt", "isDir": False}, + ], + } + self.files = { + "viking://user/default/memories/preferences/.overview.md": "# preferences\n\n## Detailed Description\n### Session Context Management\nCached summary", + "viking://user/default/memories/preferences/.summary_cache.json": json.dumps( + {"a.txt": "Cached summary"} + ), + "viking://user/default/memories/preferences/a.txt": "hello", + } + self.writes = [] + + async def ls(self, uri, ctx=None): + return self.entries.get(uri, []) + + async def read_file(self, path, ctx=None): + return self.files.get(path, "") + + async def write_file(self, path, content, ctx=None): + self.files[path] = content + self.writes.append((path, content)) + + async def stat(self, path, ctx=None): + content = self.files.get(path, "") + return {"size": len(content)} + + +@pytest.mark.asyncio +async def test_process_memory_directory_reuses_summary_cache(monkeypatch): + fake_fs = _FakeVikingFS() + monkeypatch.setattr( + "openviking.storage.queuefs.semantic_processor.get_viking_fs", + lambda: fake_fs, + ) + + processor = SemanticProcessor(max_concurrent_llm=1) + generate_summary = AsyncMock() + generate_overview = AsyncMock(return_value="# preferences\n\nCached overview") + vectorize_directory = AsyncMock() + monkeypatch.setattr(processor, "_generate_single_file_summary", generate_summary) + monkeypatch.setattr(processor, "_generate_overview", generate_overview) + monkeypatch.setattr(processor, "_vectorize_directory", vectorize_directory) + monkeypatch.setattr( + processor, "_enforce_size_limits", lambda overview, abstract: (overview, abstract) + ) + + msg = SemanticMsg( + uri="viking://user/default/memories/preferences", + context_type="memory", + recursive=False, + changes={"added": [], "modified": [], "deleted": []}, + ) + + await processor._process_memory_directory(msg) + + generate_summary.assert_not_called() + generate_overview.assert_awaited_once() + file_summaries = generate_overview.await_args.args[1] + assert file_summaries == [{"name": "a.txt", "summary": "Cached summary"}] + assert any(path.endswith("/.summary_cache.json") for path, _ in fake_fs.writes)