Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions openviking/storage/queuefs/semantic_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
# SPDX-License-Identifier: AGPL-3.0
"""Helpers for directory-level semantic summary caches."""

import json
from typing import Dict, List

SUMMARY_CACHE_FILENAME = ".summary_cache.json"
MANAGED_HIDDEN_SEMANTIC_FILES = frozenset(
{
".abstract.md",
".overview.md",
SUMMARY_CACHE_FILENAME,
}
)


def build_summary_cache(file_summaries: List[Dict[str, str]]) -> Dict[str, str]:
"""Build a filename -> summary map from generated file summaries."""
cache: Dict[str, str] = {}
for item in file_summaries:
name = item.get("name", "").strip()
if not name:
continue
summary = item.get("summary", "")
cache[name] = summary if isinstance(summary, str) else str(summary)
return cache


def serialize_summary_cache(file_summaries: List[Dict[str, str]]) -> str:
"""Serialize summary cache content for storage on disk."""
return json.dumps(build_summary_cache(file_summaries), ensure_ascii=False, sort_keys=True)


def parse_summary_cache(content: str) -> Dict[str, str]:
"""Parse a stored summary cache.

Returns an empty mapping for missing or invalid content.
"""
if not content or not content.strip():
return {}

try:
payload = json.loads(content)
except (TypeError, ValueError, json.JSONDecodeError):
return {}

if not isinstance(payload, dict):
return {}

cache: Dict[str, str] = {}
for raw_name, raw_summary in payload.items():
if not isinstance(raw_name, str):
continue
name = raw_name.strip()
if not name:
continue
if isinstance(raw_summary, str):
cache[name] = raw_summary
elif raw_summary is None:
cache[name] = ""
else:
cache[name] = str(raw_summary)
return cache
47 changes: 38 additions & 9 deletions openviking/storage/queuefs/semantic_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
from typing import Awaitable, Callable, Dict, List, Optional

from openviking.server.identity import RequestContext
from openviking.storage.queuefs.semantic_cache import (
SUMMARY_CACHE_FILENAME,
parse_summary_cache,
serialize_summary_cache,
)
from openviking.storage.viking_fs import get_viking_fs
from openviking.telemetry.request_wait_tracker import get_request_wait_tracker
from openviking_cli.utils import VikingURI
Expand Down Expand Up @@ -360,16 +365,27 @@ async def _read_existing_summary(self, file_path: str) -> Optional[Dict[str, str
pass
async with self._overview_cache_lock:
if parent_uri not in self._overview_cache:
overview_path = f"{parent_uri}/.overview.md"
overview_content = await self._viking_fs.read_file(
overview_path, ctx=self._ctx
)
if overview_content:
self._overview_cache[parent_uri] = self._processor._parse_overview_md(
overview_content
summary_cache: Dict[str, str] = {}
try:
cache_content = await self._viking_fs.read_file(
f"{parent_uri}/{SUMMARY_CACHE_FILENAME}",
ctx=self._ctx,
)
summary_cache = parse_summary_cache(cache_content)
except Exception as e:
logger.debug(
f"Failed to read {SUMMARY_CACHE_FILENAME} for {parent_uri}: {e}"
)

if not summary_cache:
overview_path = f"{parent_uri}/.overview.md"
overview_content = await self._viking_fs.read_file(
overview_path, ctx=self._ctx
)
else:
self._overview_cache[parent_uri] = {}
if overview_content:
summary_cache = self._processor._parse_overview_md(overview_content)

self._overview_cache[parent_uri] = summary_cache
else:
try:
from openviking.metrics.datasources.cache import CacheEventDataSource
Expand Down Expand Up @@ -579,6 +595,19 @@ async def _overview_task(self, dir_uri: str) -> None:
except Exception:
logger.info(f"[SemanticDag] {dir_uri} write failed, skipping")

try:
async with node.lock:
file_summaries = self._finalize_file_summaries(node)
await self._viking_fs.write_file(
f"{dir_uri}/{SUMMARY_CACHE_FILENAME}",
serialize_summary_cache(file_summaries),
ctx=self._ctx,
)
except Exception as e:
logger.info(
f"[SemanticDag] Failed to write {SUMMARY_CACHE_FILENAME} for {dir_uri}: {e}"
)

try:
if need_vectorize:
task = VectorizeTask(
Expand Down
55 changes: 45 additions & 10 deletions openviking/storage/queuefs/semantic_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
from openviking.prompts import render_prompt
from openviking.server.identity import RequestContext, Role
from openviking.storage.queuefs.named_queue import DequeueHandlerBase
from openviking.storage.queuefs.semantic_cache import (
MANAGED_HIDDEN_SEMANTIC_FILES,
SUMMARY_CACHE_FILENAME,
parse_summary_cache,
serialize_summary_cache,
)
from openviking.storage.queuefs.semantic_dag import DagStats, SemanticDagExecutor
from openviking.storage.queuefs.semantic_msg import SemanticMsg
from openviking.storage.viking_fs import get_viking_fs
Expand Down Expand Up @@ -218,6 +224,24 @@ async def _check_file_content_changed(
except Exception:
return True

async def _read_directory_summary_cache(
self, dir_uri: str, ctx: Optional[RequestContext] = None
) -> Dict[str, str]:
"""Read cached file summaries for a directory."""
viking_fs = get_viking_fs()
try:
cache_content = await viking_fs.read_file(
f"{dir_uri}/{SUMMARY_CACHE_FILENAME}", ctx=ctx
)
except Exception as e:
logger.debug(f"No summary cache found for {dir_uri}: {e}")
return {}

cache = parse_summary_cache(cache_content)
if cache:
logger.info(f"Loaded {len(cache)} cached summaries from {SUMMARY_CACHE_FILENAME}")
return cache

async def _reenqueue_semantic_msg(self, msg: SemanticMsg) -> None:
"""Re-enqueue a semantic message for later processing.

Expand Down Expand Up @@ -475,15 +499,17 @@ def _mark_failed(message: str) -> None:
existing_summaries: Dict[str, str] = {}

if msg.changes:
try:
old_overview = await viking_fs.read_file(f"{dir_uri}/.overview.md", ctx=ctx)
if old_overview:
existing_summaries = self._parse_overview_md(old_overview)
logger.info(
f"Parsed {len(existing_summaries)} existing summaries from overview.md"
)
except Exception as e:
logger.debug(f"No existing overview.md found for {dir_uri}: {e}")
existing_summaries = await self._read_directory_summary_cache(dir_uri, ctx=ctx)
if not existing_summaries:
try:
old_overview = await viking_fs.read_file(f"{dir_uri}/.overview.md", ctx=ctx)
if old_overview:
existing_summaries = self._parse_overview_md(old_overview)
logger.info(
f"Parsed {len(existing_summaries)} existing summaries from overview.md"
)
except Exception as e:
logger.debug(f"No existing overview.md found for {dir_uri}: {e}")

changed_files: Set[str] = set()
if msg.changes:
Expand Down Expand Up @@ -573,6 +599,15 @@ async def _gen(idx: int, file_path: str) -> None:
await self._release_memory_lifecycle_lock(msg.lifecycle_lock_handle_id)
return

try:
await viking_fs.write_file(
f"{dir_uri}/{SUMMARY_CACHE_FILENAME}",
serialize_summary_cache(file_summaries),
ctx=ctx,
)
except Exception as e:
logger.warning(f"Failed to write {SUMMARY_CACHE_FILENAME} for {dir_uri}: {e}")

try:
if msg.telemetry_id and msg.id:
from openviking.storage.queuefs.embedding_tracker import EmbeddingTaskTracker
Expand Down Expand Up @@ -640,7 +675,7 @@ async def list_children(dir_uri: str) -> Tuple[Dict[str, str], Dict[str, str]]:
name = entry.get("name", "")
if not name or name in [".", ".."]:
continue
if name.startswith(".") and name not in [".abstract.md", ".overview.md"]:
if name.startswith(".") and name not in MANAGED_HIDDEN_SEMANTIC_FILES:
continue
item_uri = VikingURI(dir_uri).join(name).uri
if entry.get("isDir", False):
Expand Down
42 changes: 41 additions & 1 deletion tests/storage/test_semantic_dag_incremental_missing_summary.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
# SPDX-License-Identifier: AGPL-3.0

import json
import re
from unittest.mock import AsyncMock, MagicMock

Expand Down Expand Up @@ -163,6 +164,45 @@ async def test_incremental_missing_summary_triggers_overview_regen(monkeypatch):
assert len(processor.summarized_files) == first_run_calls


@pytest.mark.asyncio
async def test_incremental_uses_summary_cache_when_overview_titles_are_descriptive(monkeypatch):
_mock_transaction_layer(monkeypatch)

root_uri = "viking://resources/root"
target_uri = "viking://resources/target"
tree = {
root_uri: [{"name": "a.txt", "isDir": False}],
target_uri: [{"name": "a.txt", "isDir": False}],
}

fake_fs = _FakeVikingFS(
tree=tree,
file_contents={
f"{root_uri}/a.txt": "hello",
f"{target_uri}/a.txt": "hello",
f"{target_uri}/.overview.md": "# root\n\n## Detailed Description\n### Session Context Management\nCached summary",
f"{target_uri}/.abstract.md": "old-abstract",
f"{target_uri}/.summary_cache.json": json.dumps({"a.txt": "Cached summary"}),
},
)
monkeypatch.setattr("openviking.storage.queuefs.semantic_dag.get_viking_fs", lambda: fake_fs)

processor = _FakeProcessor(fake_fs)
ctx = RequestContext(user=UserIdentifier("acc1", "user1", "agent1"), role=Role.USER)

executor = SemanticDagExecutor(
processor=processor,
context_type="resource",
max_concurrent_llm=2,
ctx=ctx,
incremental_update=True,
target_uri=target_uri,
)
monkeypatch.setattr(executor, "_add_vectorize_task", AsyncMock())
await executor.run(root_uri)

assert processor.summarized_files == []


if __name__ == "__main__":
pytest.main([__file__])

76 changes: 76 additions & 0 deletions tests/storage/test_semantic_processor_summary_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
# SPDX-License-Identifier: AGPL-3.0

import json
from unittest.mock import AsyncMock

import pytest

from openviking.storage.queuefs.semantic_msg import SemanticMsg
from openviking.storage.queuefs.semantic_processor import SemanticProcessor


class _FakeVikingFS:
def __init__(self):
self.entries = {
"viking://user/default/memories/preferences": [
{"name": "a.txt", "isDir": False},
],
}
self.files = {
"viking://user/default/memories/preferences/.overview.md": "# preferences\n\n## Detailed Description\n### Session Context Management\nCached summary",
"viking://user/default/memories/preferences/.summary_cache.json": json.dumps(
{"a.txt": "Cached summary"}
),
"viking://user/default/memories/preferences/a.txt": "hello",
}
self.writes = []

async def ls(self, uri, ctx=None):
return self.entries.get(uri, [])

async def read_file(self, path, ctx=None):
return self.files.get(path, "")

async def write_file(self, path, content, ctx=None):
self.files[path] = content
self.writes.append((path, content))

async def stat(self, path, ctx=None):
content = self.files.get(path, "")
return {"size": len(content)}


@pytest.mark.asyncio
async def test_process_memory_directory_reuses_summary_cache(monkeypatch):
fake_fs = _FakeVikingFS()
monkeypatch.setattr(
"openviking.storage.queuefs.semantic_processor.get_viking_fs",
lambda: fake_fs,
)

processor = SemanticProcessor(max_concurrent_llm=1)
generate_summary = AsyncMock()
generate_overview = AsyncMock(return_value="# preferences\n\nCached overview")
vectorize_directory = AsyncMock()
monkeypatch.setattr(processor, "_generate_single_file_summary", generate_summary)
monkeypatch.setattr(processor, "_generate_overview", generate_overview)
monkeypatch.setattr(processor, "_vectorize_directory", vectorize_directory)
monkeypatch.setattr(
processor, "_enforce_size_limits", lambda overview, abstract: (overview, abstract)
)

msg = SemanticMsg(
uri="viking://user/default/memories/preferences",
context_type="memory",
recursive=False,
changes={"added": [], "modified": [], "deleted": []},
)

await processor._process_memory_directory(msg)

generate_summary.assert_not_called()
generate_overview.assert_awaited_once()
file_summaries = generate_overview.await_args.args[1]
assert file_summaries == [{"name": "a.txt", "summary": "Cached summary"}]
assert any(path.endswith("/.summary_cache.json") for path, _ in fake_fs.writes)
Loading