From c8ef604f1d5b15a8a83459d1c77467f747d82a95 Mon Sep 17 00:00:00 2001 From: yeyitech Date: Tue, 14 Apr 2026 13:02:46 +0800 Subject: [PATCH 1/5] feat(migration): add OpenClaw memory and transcript migration tool --- examples/openclaw-migration/README.md | 46 ++ examples/openclaw-migration/migrate.py | 122 +++++ openviking/async_client.py | 20 + openviking/client/local.py | 28 +- openviking/migration/__init__.py | 23 + openviking/migration/openclaw.py | 515 ++++++++++++++++++ openviking/server/routers/content.py | 39 ++ openviking/service/fs_service.py | 21 + openviking/storage/content_write.py | 94 +++- openviking/sync_client.py | 21 + openviking_cli/client/base.py | 13 + openviking_cli/client/http.py | 26 + openviking_cli/client/sync_http.py | 21 + tests/client/test_filesystem.py | 33 ++ tests/client/test_http_client_local_upload.py | 23 + tests/migration/test_openclaw.py | 99 ++++ tests/server/test_api_content_write.py | 39 ++ tests/server/test_content_write_service.py | 55 ++ 18 files changed, 1233 insertions(+), 5 deletions(-) create mode 100644 examples/openclaw-migration/README.md create mode 100644 examples/openclaw-migration/migrate.py create mode 100644 openviking/migration/__init__.py create mode 100644 openviking/migration/openclaw.py create mode 100644 tests/migration/test_openclaw.py diff --git a/examples/openclaw-migration/README.md b/examples/openclaw-migration/README.md new file mode 100644 index 000000000..5ca7a2dcc --- /dev/null +++ b/examples/openclaw-migration/README.md @@ -0,0 +1,46 @@ +# OpenClaw Migration + +This tool imports existing OpenClaw data into OpenViking through two paths: + +- `memory`: import native OpenClaw memory markdown files directly into OpenViking memory URIs +- `transcript`: replay historical OpenClaw jsonl transcripts into OpenViking sessions and commit them +- `all`: run both paths in one pass + +By default it reads from `~/.openclaw` and connects to OpenViking over HTTP using the same config as `ovcli.conf`. You can also point it at an embedded local data path with `--ov-path`. + +## Examples + +```bash +python examples/openclaw-migration/migrate.py --mode memory --dry-run +``` + +```bash +python examples/openclaw-migration/migrate.py --mode all --wait +``` + +```bash +python examples/openclaw-migration/migrate.py \ + --mode transcript \ + --agent main \ + --url http://127.0.0.1:1933 \ + --api-key "$OPENVIKING_API_KEY" +``` + +```bash +python examples/openclaw-migration/migrate.py \ + --mode memory \ + --ov-path ~/.openviking +``` + +## Mapping + +Native OpenClaw memory files map to deterministic OpenViking memory URIs: + +- `workspace/MEMORY.md` -> `viking://user/memories/entities/openclaw-memory.md` +- `workspace/memory/YYYY-MM-DD.md` -> `viking://user/memories/events/openclaw-YYYY-MM-DD.md` +- `workspace/memory/YYYY-MM-DD-*.md` -> `viking://agent/memories/cases/openclaw-YYYY-MM-DD-*.md` + +Deterministic URIs and session ids make reruns naturally resumable: + +- existing memory targets are skipped unless `--overwrite` is set +- replayed transcript sessions use a stable `openclaw--` target id diff --git a/examples/openclaw-migration/migrate.py b/examples/openclaw-migration/migrate.py new file mode 100644 index 000000000..77e16596c --- /dev/null +++ b/examples/openclaw-migration/migrate.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python3 +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 +"""Migrate existing OpenClaw memory files and transcripts into OpenViking.""" + +from __future__ import annotations + +import argparse +import json +from pathlib import Path +from typing import Any + +import openviking as ov +from openviking.migration.openclaw import migrate_openclaw + + +def _build_client(args: argparse.Namespace) -> Any: + if args.ov_path: + client = ov.SyncOpenViking(path=args.ov_path) + client.initialize() + return client + + client = ov.SyncHTTPClient( + url=args.url, + api_key=args.api_key, + account=args.account, + user=args.user, + agent_id=args.agent_id, + timeout=args.http_timeout, + ) + client.initialize() + return client + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument( + "--openclaw-dir", + default=str(Path.home() / ".openclaw"), + help="OpenClaw state directory (default: ~/.openclaw)", + ) + parser.add_argument( + "--mode", + choices=("memory", "transcript", "all"), + default="memory", + help="What to migrate", + ) + parser.add_argument( + "--agent", + action="append", + dest="agent_ids", + help="Only replay transcripts for the given OpenClaw agent id (repeatable)", + ) + parser.add_argument( + "--category-override", + choices=("preferences", "entities", "events", "cases", "patterns", "tools", "skills"), + help="Override the inferred OpenViking memory category for native OpenClaw memory files", + ) + parser.add_argument("--dry-run", action="store_true", help="Preview planned imports only") + parser.add_argument("--overwrite", action="store_true", help="Overwrite existing OV targets") + parser.add_argument("--no-wait", action="store_true", help="Do not wait for async queue/task completion") + parser.add_argument( + "--timeout", + type=float, + default=300.0, + help="Queue/task wait timeout in seconds (default: 300)", + ) + parser.add_argument( + "--poll-interval", + type=float, + default=1.0, + help="Task poll interval in seconds (default: 1)", + ) + parser.add_argument( + "--url", + help="OpenViking server URL. If omitted, SyncHTTPClient falls back to ovcli.conf", + ) + parser.add_argument("--api-key", help="Optional OpenViking API key") + parser.add_argument("--account", help="Optional X-OpenViking-Account header") + parser.add_argument("--user", help="Optional X-OpenViking-User header") + parser.add_argument("--agent-id", help="Optional X-OpenViking-Agent header") + parser.add_argument( + "--http-timeout", + type=float, + default=60.0, + help="HTTP client timeout in seconds (default: 60)", + ) + parser.add_argument( + "--ov-path", + help="Use embedded SyncOpenViking against a local data path instead of HTTP mode", + ) + return parser.parse_args() + + +def main() -> int: + args = _parse_args() + if args.ov_path and any([args.url, args.api_key, args.account, args.user, args.agent_id]): + raise SystemExit("--ov-path cannot be combined with HTTP connection flags") + + client = _build_client(args) + try: + result = migrate_openclaw( + client, + args.openclaw_dir, + mode=args.mode, + dry_run=args.dry_run, + overwrite=args.overwrite, + wait=not args.no_wait, + timeout=args.timeout, + poll_interval=args.poll_interval, + agent_ids=args.agent_ids, + category_override=args.category_override, + ) + finally: + client.close() + + print(json.dumps(result, indent=2, ensure_ascii=False)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/openviking/async_client.py b/openviking/async_client.py index 7c564cb93..f929f441e 100644 --- a/openviking/async_client.py +++ b/openviking/async_client.py @@ -398,6 +398,26 @@ async def write( telemetry=telemetry, ) + async def import_memory( + self, + uri: str, + content: str, + mode: str = "replace", + wait: bool = False, + timeout: Optional[float] = None, + telemetry: TelemetryRequest = False, + ) -> Dict[str, Any]: + """Create or update a memory file and refresh semantics/vectors.""" + await self._ensure_initialized() + return await self._client.import_memory( + uri=uri, + content=content, + mode=mode, + wait=wait, + timeout=timeout, + telemetry=telemetry, + ) + async def ls(self, uri: str, **kwargs) -> List[Any]: """ List directory contents. diff --git a/openviking/client/local.py b/openviking/client/local.py index ab7ab6f51..c553194d9 100644 --- a/openviking/client/local.py +++ b/openviking/client/local.py @@ -254,6 +254,33 @@ async def write( execution.telemetry, ) + async def import_memory( + self, + uri: str, + content: str, + mode: str = "replace", + wait: bool = False, + timeout: Optional[float] = None, + telemetry: TelemetryRequest = False, + ) -> Dict[str, Any]: + """Create or update a memory file and refresh semantics/vectors.""" + execution = await run_with_telemetry( + operation="content.import_memory", + telemetry=telemetry, + fn=lambda: self._service.fs.import_memory( + uri=uri, + content=content, + ctx=self._ctx, + mode=mode, + wait=wait, + timeout=timeout, + ), + ) + return attach_telemetry_payload( + execution.result, + execution.telemetry, + ) + # ============= Search ============= async def find( @@ -443,7 +470,6 @@ async def add_message( If both content and parts are provided, parts takes precedence. """ - from openviking.message.part import Part, TextPart, part_from_dict session = self._service.sessions.session(self._ctx, session_id) diff --git a/openviking/migration/__init__.py b/openviking/migration/__init__.py new file mode 100644 index 000000000..d56afbee3 --- /dev/null +++ b/openviking/migration/__init__.py @@ -0,0 +1,23 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 +"""Migration helpers for importing external data into OpenViking.""" + +from openviking.migration.openclaw import ( + OpenClawMemoryArtifact, + OpenClawTranscriptMessage, + OpenClawTranscriptSession, + discover_openclaw_memory_artifacts, + discover_openclaw_transcript_sessions, + migrate_openclaw, + parse_openclaw_transcript, +) + +__all__ = [ + "OpenClawMemoryArtifact", + "OpenClawTranscriptMessage", + "OpenClawTranscriptSession", + "discover_openclaw_memory_artifacts", + "discover_openclaw_transcript_sessions", + "migrate_openclaw", + "parse_openclaw_transcript", +] diff --git a/openviking/migration/openclaw.py b/openviking/migration/openclaw.py new file mode 100644 index 000000000..62f2fd788 --- /dev/null +++ b/openviking/migration/openclaw.py @@ -0,0 +1,515 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 +"""OpenClaw migration helpers. + +Phase 1 focuses on two import paths: + +1. Native OpenClaw memory markdown files -> direct OpenViking memory import +2. Historical OpenClaw session transcripts -> session replay + commit +""" + +from __future__ import annotations + +import hashlib +import json +import re +import time +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Iterable, Sequence + +from openviking_cli.exceptions import NotFoundError + +_DATE_ONLY_RE = re.compile(r"^\d{4}-\d{2}-\d{2}$") +_DATE_PREFIX_RE = re.compile(r"^\d{4}-\d{2}-\d{2}-.+") +_SAFE_SEGMENT_RE = re.compile(r"[^a-zA-Z0-9._-]+") +_TEXT_TYPES = { + "text", + "input_text", + "output_text", + "markdown", + "output_markdown", + "input_markdown", +} + + +@dataclass(frozen=True) +class OpenClawMemoryArtifact: + """One OpenClaw memory file mapped onto a target OpenViking memory URI.""" + + source_path: Path + category: str + uri: str + kind: str + + +@dataclass(frozen=True) +class OpenClawTranscriptMessage: + """One replayable transcript message.""" + + role: str + content: str + created_at: str | None = None + + +@dataclass(frozen=True) +class OpenClawTranscriptSession: + """One OpenClaw transcript session discovered on disk.""" + + agent_id: str + session_id: str + session_key: str + transcript_path: Path + label: str = "" + updated_at: str = "" + channel: str = "" + + +def _sanitize_segment(value: str, *, fallback: str) -> str: + sanitized = _SAFE_SEGMENT_RE.sub("-", value.strip()).strip("._-") + return sanitized or fallback + + +def _dedupe_uri(uri: str, seen: dict[str, int]) -> str: + count = seen.get(uri, 0) + seen[uri] = count + 1 + if count == 0: + return uri + + path = Path(uri) + suffix = path.suffix or ".md" + return f"{uri[: -len(suffix)]}-{count + 1}{suffix}" + + +def _memory_uri_for_category(category: str, slug: str) -> str: + if category in {"preferences", "entities", "events"}: + return f"viking://user/memories/{category}/{slug}.md" + return f"viking://agent/memories/{category}/{slug}.md" + + +def discover_openclaw_memory_artifacts( + openclaw_dir: str | Path, + *, + category_override: str | None = None, +) -> list[OpenClawMemoryArtifact]: + """Discover OpenClaw native memory markdown files. + + OpenClaw stores durable memory under ``workspace/MEMORY.md`` plus + ``workspace/memory/*.md``. We map those files to deterministic OpenViking + memory URIs so reruns can skip or overwrite cleanly. + """ + + if category_override == "profile": + raise ValueError("category_override=profile is not supported for multi-file migration") + if category_override and category_override not in { + "preferences", + "entities", + "events", + "cases", + "patterns", + "tools", + "skills", + }: + raise ValueError(f"unsupported category_override: {category_override}") + + base = Path(openclaw_dir).expanduser() + workspace = base / "workspace" + memory_dir = workspace / "memory" + seen: dict[str, int] = {} + artifacts: list[OpenClawMemoryArtifact] = [] + + memory_md = workspace / "MEMORY.md" + if memory_md.is_file(): + category = category_override or "entities" + uri = _memory_uri_for_category(category, "openclaw-memory") + artifacts.append( + OpenClawMemoryArtifact( + source_path=memory_md, + category=category, + uri=_dedupe_uri(uri, seen), + kind="memory-md", + ) + ) + + if not memory_dir.is_dir(): + return artifacts + + for path in sorted(memory_dir.glob("*.md")): + stem = path.stem + if category_override: + category = category_override + elif _DATE_ONLY_RE.fullmatch(stem): + category = "events" + elif _DATE_PREFIX_RE.fullmatch(stem): + category = "cases" + else: + category = "entities" + + slug = _sanitize_segment(f"openclaw-{stem}", fallback="openclaw-memory") + uri = _memory_uri_for_category(category, slug) + kind = "daily-log" if _DATE_ONLY_RE.fullmatch(stem) else "session-summary" + artifacts.append( + OpenClawMemoryArtifact( + source_path=path, + category=category, + uri=_dedupe_uri(uri, seen), + kind=kind, + ) + ) + + return artifacts + + +def discover_openclaw_transcript_sessions( + openclaw_dir: str | Path, + *, + agent_ids: Sequence[str] | None = None, + include_orphans: bool = True, +) -> list[OpenClawTranscriptSession]: + """Discover transcript jsonl files from ``~/.openclaw/agents/*/sessions``.""" + + base = Path(openclaw_dir).expanduser() + agents_dir = base / "agents" + if not agents_dir.is_dir(): + return [] + + selected = set(agent_ids or []) + discovered: list[OpenClawTranscriptSession] = [] + + for agent_dir in sorted(p for p in agents_dir.iterdir() if p.is_dir()): + agent_id = agent_dir.name + if selected and agent_id not in selected: + continue + + sessions_dir = agent_dir / "sessions" + index_path = sessions_dir / "sessions.json" + seen_paths: set[Path] = set() + index_data: dict[str, Any] = {} + if index_path.is_file(): + try: + index_data = json.loads(index_path.read_text(encoding="utf-8")) + except json.JSONDecodeError: + index_data = {} + + for session_key, raw_meta in sorted(index_data.items()): + if not isinstance(raw_meta, dict): + continue + session_id = str(raw_meta.get("sessionId") or "").strip() + session_file = str(raw_meta.get("sessionFile") or "").strip() + if not session_id and session_file: + session_id = Path(session_file).stem + if not session_id: + continue + + transcript_path = Path(session_file) if session_file else sessions_dir / f"{session_id}.jsonl" + if not transcript_path.is_absolute(): + transcript_path = sessions_dir / transcript_path + if not transcript_path.is_file(): + continue + + resolved = transcript_path.resolve() + seen_paths.add(resolved) + discovered.append( + OpenClawTranscriptSession( + agent_id=agent_id, + session_id=session_id, + session_key=session_key, + transcript_path=resolved, + label=str(raw_meta.get("label") or ""), + updated_at=str(raw_meta.get("updatedAt") or ""), + channel=str(raw_meta.get("channel") or ""), + ) + ) + + if not include_orphans or not sessions_dir.is_dir(): + continue + + for transcript_path in sorted(sessions_dir.glob("*.jsonl")): + resolved = transcript_path.resolve() + if resolved in seen_paths: + continue + discovered.append( + OpenClawTranscriptSession( + agent_id=agent_id, + session_id=transcript_path.stem, + session_key=transcript_path.stem, + transcript_path=resolved, + ) + ) + + return discovered + + +def _normalize_timestamp(value: Any) -> str | None: + if value in (None, ""): + return None + if isinstance(value, str): + return value.strip() or None + if isinstance(value, (int, float)): + ts = float(value) + if ts > 1_000_000_000_000: + ts /= 1000.0 + return datetime.fromtimestamp(ts, tz=timezone.utc).isoformat() + return None + + +def _extract_text_fragments(value: Any) -> list[str]: + if value is None: + return [] + if isinstance(value, str): + text = value.strip() + return [text] if text else [] + if isinstance(value, list): + fragments: list[str] = [] + for item in value: + fragments.extend(_extract_text_fragments(item)) + return fragments + if not isinstance(value, dict): + return [] + + fragments: list[str] = [] + node_type = str(value.get("type") or "") + if node_type in _TEXT_TYPES and isinstance(value.get("text"), str): + text = value["text"].strip() + if text: + fragments.append(text) + + for key in ("text", "content", "parts"): + child = value.get(key) + if key == "text" and node_type in _TEXT_TYPES: + continue + fragments.extend(_extract_text_fragments(child)) + return fragments + + +def parse_openclaw_transcript(path: str | Path) -> list[OpenClawTranscriptMessage]: + """Parse an OpenClaw jsonl transcript into replayable user/assistant messages.""" + + transcript_path = Path(path) + messages: list[OpenClawTranscriptMessage] = [] + for raw_line in transcript_path.read_text(encoding="utf-8").splitlines(): + line = raw_line.strip() + if not line: + continue + try: + record = json.loads(line) + except json.JSONDecodeError: + continue + + payload = record.get("message") if record.get("type") == "message" else record + if not isinstance(payload, dict): + continue + + role = str(payload.get("role") or record.get("role") or "").strip() + if role not in {"user", "assistant"}: + continue + + fragments = _extract_text_fragments( + payload.get("content", payload.get("parts", payload.get("text"))) + ) + text = "\n\n".join(fragment for fragment in fragments if fragment.strip()).strip() + if not text: + continue + + created_at = None + for key in ("created_at", "createdAt", "timestamp", "time"): + created_at = _normalize_timestamp(payload.get(key)) + if created_at: + break + created_at = _normalize_timestamp(record.get(key)) + if created_at: + break + + messages.append( + OpenClawTranscriptMessage(role=role, content=text, created_at=created_at) + ) + + return messages + + +def _uri_exists(client: Any, uri: str) -> bool: + try: + client.stat(uri) + except NotFoundError: + return False + return True + + +def _session_exists(client: Any, session_id: str) -> bool: + if hasattr(client, "session_exists"): + return bool(client.session_exists(session_id)) + try: + client.get_session(session_id, auto_create=False) + except NotFoundError: + return False + return True + + +def _stable_target_session_id(agent_id: str, session_id: str) -> str: + raw = f"openclaw-{agent_id}-{session_id}" + sanitized = _sanitize_segment(raw, fallback="openclaw-session") + if len(sanitized) <= 96: + return sanitized + digest = hashlib.sha1(raw.encode("utf-8")).hexdigest()[:12] + return f"{sanitized[:83]}-{digest}" + + +def _wait_for_task( + client: Any, + task_id: str, + *, + timeout: float, + poll_interval: float, +) -> dict[str, Any]: + deadline = time.monotonic() + timeout + while time.monotonic() <= deadline: + task = client.get_task(task_id) + if not task: + time.sleep(poll_interval) + continue + status = str(task.get("status") or "").lower() + if status == "completed": + return task + if status in {"failed", "cancelled"}: + raise RuntimeError(f"task {task_id} {status}: {task}") + time.sleep(poll_interval) + raise TimeoutError(f"task {task_id} did not complete within {timeout} seconds") + + +def _summarize_records(records: Iterable[dict[str, Any]]) -> dict[str, int]: + summary = {"planned": 0, "imported": 0, "skipped": 0, "failed": 0} + for record in records: + status = record.get("status") + if status == "planned": + summary["planned"] += 1 + elif status == "imported": + summary["imported"] += 1 + elif str(status).startswith("skipped"): + summary["skipped"] += 1 + else: + summary["failed"] += 1 + return summary + + +def migrate_openclaw( + client: Any, + openclaw_dir: str | Path, + *, + mode: str = "memory", + dry_run: bool = False, + overwrite: bool = False, + wait: bool = True, + timeout: float = 300.0, + poll_interval: float = 1.0, + agent_ids: Sequence[str] | None = None, + category_override: str | None = None, +) -> dict[str, Any]: + """Run an OpenClaw -> OpenViking migration. + + Returns a structured summary containing per-item records for both import + paths. The caller can print or persist the records as needed. + """ + + if mode not in {"memory", "transcript", "all"}: + raise ValueError(f"unsupported migration mode: {mode}") + + memory_records: list[dict[str, Any]] = [] + transcript_records: list[dict[str, Any]] = [] + + if mode in {"memory", "all"}: + for artifact in discover_openclaw_memory_artifacts( + openclaw_dir, category_override=category_override + ): + record = { + "kind": artifact.kind, + "source_path": str(artifact.source_path), + "category": artifact.category, + "uri": artifact.uri, + } + if dry_run: + record["status"] = "planned" + memory_records.append(record) + continue + + content = artifact.source_path.read_text(encoding="utf-8").strip() + if not content: + record["status"] = "skipped_empty" + memory_records.append(record) + continue + if not overwrite and _uri_exists(client, artifact.uri): + record["status"] = "skipped_exists" + memory_records.append(record) + continue + + result = client.import_memory( + artifact.uri, + content, + mode="replace", + wait=wait, + timeout=timeout, + telemetry=False, + ) + record["status"] = "imported" + record["result"] = result + memory_records.append(record) + + if mode in {"transcript", "all"}: + sessions = discover_openclaw_transcript_sessions(openclaw_dir, agent_ids=agent_ids) + for session in sessions: + target_session_id = _stable_target_session_id(session.agent_id, session.session_id) + record = { + "agent_id": session.agent_id, + "session_key": session.session_key, + "source_path": str(session.transcript_path), + "target_session_id": target_session_id, + } + messages = parse_openclaw_transcript(session.transcript_path) + record["message_count"] = len(messages) + if dry_run: + record["status"] = "planned" + transcript_records.append(record) + continue + if not messages: + record["status"] = "skipped_empty" + transcript_records.append(record) + continue + if _session_exists(client, target_session_id): + if not overwrite: + record["status"] = "skipped_exists" + transcript_records.append(record) + continue + client.delete_session(target_session_id) + + client.create_session(target_session_id) + for message in messages: + client.add_message( + target_session_id, + message.role, + content=message.content, + created_at=message.created_at, + ) + commit_result = client.commit_session(target_session_id, telemetry=False) + record["commit_result"] = commit_result + task_id = commit_result.get("task_id") + if wait and task_id: + record["task"] = _wait_for_task( + client, + task_id, + timeout=timeout, + poll_interval=poll_interval, + ) + record["status"] = "imported" + transcript_records.append(record) + + return { + "mode": mode, + "memory": { + "records": memory_records, + "summary": _summarize_records(memory_records), + }, + "transcript": { + "records": transcript_records, + "summary": _summarize_records(transcript_records), + }, + } diff --git a/openviking/server/routers/content.py b/openviking/server/routers/content.py index 7801546d0..c6433cb27 100644 --- a/openviking/server/routers/content.py +++ b/openviking/server/routers/content.py @@ -43,6 +43,19 @@ class WriteContentRequest(BaseModel): telemetry: TelemetryRequest = False +class ImportMemoryRequest(BaseModel): + """Request to create or update a memory file.""" + + model_config = ConfigDict(extra="forbid") + + uri: str + content: str + mode: str = "replace" + wait: bool = False + timeout: float | None = None + telemetry: TelemetryRequest = False + + router = APIRouter(prefix="/api/v1/content", tags=["content"]) @@ -146,6 +159,32 @@ async def write( ).model_dump(exclude_none=True) +@router.post("/import-memory") +async def import_memory( + request: ImportMemoryRequest = Body(...), + _ctx: RequestContext = Depends(get_request_context), +): + """Create or update a memory file and refresh semantics/vectors.""" + service = get_service() + execution = await run_operation( + operation="content.import_memory", + telemetry=request.telemetry, + fn=lambda: service.fs.import_memory( + uri=request.uri, + content=request.content, + ctx=_ctx, + mode=request.mode, + wait=request.wait, + timeout=request.timeout, + ), + ) + return Response( + status="ok", + result=execution.result, + telemetry=execution.telemetry, + ).model_dump(exclude_none=True) + + @router.post("/reindex") async def reindex( request: ReindexRequest = Body(...), diff --git a/openviking/service/fs_service.py b/openviking/service/fs_service.py index 026176c14..5cae21fc3 100644 --- a/openviking/service/fs_service.py +++ b/openviking/service/fs_service.py @@ -249,3 +249,24 @@ async def write( wait=wait, timeout=timeout, ) + + async def import_memory( + self, + uri: str, + content: str, + ctx: RequestContext, + mode: str = "replace", + wait: bool = False, + timeout: Optional[float] = None, + ) -> Dict[str, Any]: + """Create or update a memory file and refresh semantics/vectors.""" + viking_fs = self._ensure_initialized() + coordinator = ContentWriteCoordinator(viking_fs=viking_fs) + return await coordinator.import_memory( + uri=uri, + content=content, + ctx=ctx, + mode=mode, + wait=wait, + timeout=timeout, + ) diff --git a/openviking/storage/content_write.py b/openviking/storage/content_write.py index 9d5343620..a79dfd7f9 100644 --- a/openviking/storage/content_write.py +++ b/openviking/storage/content_write.py @@ -128,6 +128,40 @@ async def write( if wait and telemetry_id: get_request_wait_tracker().cleanup(telemetry_id) + async def import_memory( + self, + *, + uri: str, + content: str, + ctx: RequestContext, + mode: str = "replace", + wait: bool = False, + timeout: Optional[float] = None, + ) -> Dict[str, Any]: + """Create or update a memory file and refresh semantics/vectors.""" + normalized_uri = VikingURI.normalize(uri) + self._validate_mode(mode) + self._validate_memory_import_uri(normalized_uri) + + parent = VikingURI(normalized_uri).parent + if parent is None: + raise InvalidArgumentError(f"memory import target has no parent directory: {uri}") + + await self._viking_fs.mkdir(parent.uri, exist_ok=True, ctx=ctx) + written_bytes = len(content.encode("utf-8")) + telemetry_id = get_current_telemetry().telemetry_id + return await self._write_memory_with_refresh( + uri=normalized_uri, + root_uri=parent.uri, + content=content, + mode=mode, + wait=wait, + timeout=timeout, + ctx=ctx, + written_bytes=written_bytes, + telemetry_id=telemetry_id, + ) + def _validate_mode(self, mode: str) -> None: if mode not in {"replace", "append"}: raise InvalidArgumentError(f"unsupported write mode: {mode}") @@ -143,6 +177,49 @@ def _validate_target_uri(self, uri: str) -> None: if parsed.scope not in {"resources", "user", "agent"}: raise InvalidArgumentError(f"write is not supported for scope: {parsed.scope}") + def _validate_memory_import_uri(self, uri: str) -> None: + self._validate_target_uri(uri) + + parsed = VikingURI(uri) + if parsed.scope not in {"user", "agent"}: + raise InvalidArgumentError(f"memory import only supports user/agent memory URIs: {uri}") + + parts = [part for part in parsed.full_path.split("/") if part] + try: + memories_idx = parts.index("memories") + except ValueError as exc: + raise InvalidArgumentError(f"memory import only supports memory URIs: {uri}") from exc + + name = parts[-1] + if not name.endswith(".md"): + raise InvalidArgumentError(f"memory import only supports markdown files: {uri}") + if name.startswith("."): + raise InvalidArgumentError(f"memory import cannot target hidden files: {uri}") + + if parsed.scope == "user": + if len(parts) == memories_idx + 2 and name == "profile.md": + return + if len(parts) <= memories_idx + 2: + raise InvalidArgumentError( + f"user memory import target must be profile.md or a file inside a category: {uri}" + ) + category = parts[memories_idx + 1] + if category not in {"preferences", "entities", "events"}: + raise InvalidArgumentError( + f"user memory import only supports profile/preferences/entities/events: {uri}" + ) + return + + if len(parts) <= memories_idx + 2: + raise InvalidArgumentError( + f"agent memory import target must be a file inside an agent memory category: {uri}" + ) + category = parts[memories_idx + 1] + if category not in {"cases", "patterns", "tools", "skills"}: + raise InvalidArgumentError( + f"agent memory import only supports cases/patterns/tools/skills: {uri}" + ) + async def _safe_stat(self, uri: str, *, ctx: RequestContext) -> Dict[str, Any]: try: return await self._viking_fs.stat(uri, ctx=ctx) @@ -160,7 +237,10 @@ async def _write_in_place( ctx: RequestContext, ) -> None: if mode == "replace" and self._context_type_for_uri(uri) == "memory": - existing_raw = await self._viking_fs.read_file(uri, ctx=ctx) + try: + existing_raw = await self._viking_fs.read_file(uri, ctx=ctx) + except NotFoundError: + existing_raw = "" _, metadata = deserialize_full(existing_raw) if metadata: content = serialize_with_metadata(content, metadata) @@ -168,7 +248,10 @@ async def _write_in_place( return if mode == "append": - existing_raw = await self._viking_fs.read_file(uri, ctx=ctx) + try: + existing_raw = await self._viking_fs.read_file(uri, ctx=ctx) + except NotFoundError: + existing_raw = "" existing_content, metadata = deserialize_full(existing_raw) updated_content = existing_content + content if metadata: @@ -384,9 +467,12 @@ async def _write_memory_with_refresh( root_uri=root_uri, modified_uri=uri, ctx=ctx, - lifecycle_lock_handle_id=handle.id, + lifecycle_lock_handle_id=handle.id if wait else "", ) - lock_transferred = True + if wait: + lock_transferred = True + else: + await lock_manager.release(handle) queue_status = ( await self._wait_for_request(telemetry_id=telemetry_id, timeout=timeout) if wait diff --git a/openviking/sync_client.py b/openviking/sync_client.py index 5d65d628c..712983fb6 100644 --- a/openviking/sync_client.py +++ b/openviking/sync_client.py @@ -227,6 +227,27 @@ def write( ) ) + def import_memory( + self, + uri: str, + content: str, + mode: str = "replace", + wait: bool = False, + timeout: Optional[float] = None, + telemetry: TelemetryRequest = False, + ) -> Dict[str, Any]: + """Create or update a memory file and refresh semantics/vectors.""" + return run_async( + self._async_client.import_memory( + uri=uri, + content=content, + mode=mode, + wait=wait, + timeout=timeout, + telemetry=telemetry, + ) + ) + def ls(self, uri: str, **kwargs) -> List[Any]: """ List directory contents. diff --git a/openviking_cli/client/base.py b/openviking_cli/client/base.py index 51abb0c8e..0cd4e4c17 100644 --- a/openviking_cli/client/base.py +++ b/openviking_cli/client/base.py @@ -146,6 +146,19 @@ async def write( """Write text content to an existing file and refresh semantics/vectors.""" ... + @abstractmethod + async def import_memory( + self, + uri: str, + content: str, + mode: str = "replace", + wait: bool = False, + timeout: Optional[float] = None, + telemetry: TelemetryRequest = False, + ) -> Dict[str, Any]: + """Create or update a memory file and refresh semantics/vectors.""" + ... + # ============= Search ============= @abstractmethod diff --git a/openviking_cli/client/http.py b/openviking_cli/client/http.py index 198b50f2b..8793afca8 100644 --- a/openviking_cli/client/http.py +++ b/openviking_cli/client/http.py @@ -584,6 +584,32 @@ async def write( response_data = self._handle_response_data(response) return self._attach_telemetry(response_data.get("result") or {}, response_data) + async def import_memory( + self, + uri: str, + content: str, + mode: str = "replace", + wait: bool = False, + timeout: Optional[float] = None, + telemetry: TelemetryRequest = False, + ) -> Dict[str, Any]: + """Create or update a memory file and refresh semantics/vectors.""" + telemetry = self._validate_telemetry(telemetry) + uri = VikingURI.normalize(uri) + response = await self._http.post( + "/api/v1/content/import-memory", + json={ + "uri": uri, + "content": content, + "mode": mode, + "wait": wait, + "timeout": timeout, + "telemetry": telemetry, + }, + ) + response_data = self._handle_response_data(response) + return self._attach_telemetry(response_data.get("result") or {}, response_data) + # ============= Search ============= async def find( diff --git a/openviking_cli/client/sync_http.py b/openviking_cli/client/sync_http.py index 2da8c2d8b..16d4f9e17 100644 --- a/openviking_cli/client/sync_http.py +++ b/openviking_cli/client/sync_http.py @@ -195,6 +195,27 @@ def wait_processed(self, timeout: Optional[float] = None) -> Dict[str, Any]: """Wait for all processing to complete.""" return run_async(self._async_client.wait_processed(timeout)) + def import_memory( + self, + uri: str, + content: str, + mode: str = "replace", + wait: bool = False, + timeout: Optional[float] = None, + telemetry: TelemetryRequest = False, + ) -> Dict[str, Any]: + """Create or update a memory file and refresh semantics/vectors.""" + return run_async( + self._async_client.import_memory( + uri=uri, + content=content, + mode=mode, + wait=wait, + timeout=timeout, + telemetry=telemetry, + ) + ) + # ============= Search ============= def search( diff --git a/tests/client/test_filesystem.py b/tests/client/test_filesystem.py index 966d87b76..9349c44b9 100644 --- a/tests/client/test_filesystem.py +++ b/tests/client/test_filesystem.py @@ -204,3 +204,36 @@ async def test_sync_openviking_write_updates_existing_file(test_data_dir, sample finally: client.close() await AsyncOpenViking.reset() + + +async def test_sync_openviking_import_memory_delegates_to_async_client(test_data_dir): + """Sync OpenViking exposes import_memory() and delegates to the async client.""" + await AsyncOpenViking.reset() + client = OpenViking(path=str(test_data_dir)) + + try: + client._async_client.import_memory = AsyncMock( + return_value={"uri": "viking://user/memories/profile.md"} + ) + + result = client.import_memory( + "viking://user/memories/profile.md", + "imported", + mode="replace", + wait=True, + timeout=5.0, + telemetry=False, + ) + + assert result == {"uri": "viking://user/memories/profile.md"} + client._async_client.import_memory.assert_awaited_once_with( + uri="viking://user/memories/profile.md", + content="imported", + mode="replace", + wait=True, + timeout=5.0, + telemetry=False, + ) + finally: + client.close() + await AsyncOpenViking.reset() diff --git a/tests/client/test_http_client_local_upload.py b/tests/client/test_http_client_local_upload.py index 006f8086b..971242511 100644 --- a/tests/client/test_http_client_local_upload.py +++ b/tests/client/test_http_client_local_upload.py @@ -49,6 +49,29 @@ async def test_write_omits_removed_semantic_flags_from_http_payload(tmp_path, mo } +@pytest.mark.asyncio +async def test_import_memory_posts_expected_payload(monkeypatch): + client = AsyncHTTPClient(url="http://localhost:1933") + fake_http = _FakeHTTPClient() + client._http = fake_http + client._handle_response_data = lambda _response: { + "result": {"uri": "viking://user/memories/profile.md"} + } + + await client.import_memory("viking://user/memories/profile.md", "imported", wait=True) + + call = fake_http.calls[-1] + assert call["path"] == "/api/v1/content/import-memory" + assert call["json"] == { + "uri": "viking://user/memories/profile.md", + "content": "imported", + "mode": "replace", + "wait": True, + "timeout": None, + "telemetry": False, + } + + @pytest.mark.asyncio async def test_add_skill_uploads_local_file_even_when_url_is_localhost(tmp_path): skill_file = tmp_path / "SKILL.md" diff --git a/tests/migration/test_openclaw.py b/tests/migration/test_openclaw.py new file mode 100644 index 000000000..10a0be6b0 --- /dev/null +++ b/tests/migration/test_openclaw.py @@ -0,0 +1,99 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 + +from pathlib import Path + +from openviking.migration.openclaw import ( + discover_openclaw_memory_artifacts, + discover_openclaw_transcript_sessions, + migrate_openclaw, + parse_openclaw_transcript, +) + + +def test_discover_openclaw_memory_artifacts(temp_dir: Path): + workspace = temp_dir / "workspace" + memory_dir = workspace / "memory" + memory_dir.mkdir(parents=True) + (workspace / "MEMORY.md").write_text("# Durable\n") + (memory_dir / "2026-04-01.md").write_text("daily log\n") + (memory_dir / "2026-04-01-project-alpha.md").write_text("case summary\n") + + artifacts = discover_openclaw_memory_artifacts(temp_dir) + + assert [artifact.category for artifact in artifacts] == ["entities", "events", "cases"] + assert artifacts[0].uri == "viking://user/memories/entities/openclaw-memory.md" + assert artifacts[1].uri == "viking://user/memories/events/openclaw-2026-04-01.md" + assert ( + artifacts[2].uri + == "viking://agent/memories/cases/openclaw-2026-04-01-project-alpha.md" + ) + + +def test_discover_openclaw_transcript_sessions_includes_index_and_orphans(temp_dir: Path): + sessions_dir = temp_dir / "agents" / "main" / "sessions" + sessions_dir.mkdir(parents=True) + (sessions_dir / "from-index.jsonl").write_text("") + (sessions_dir / "orphan.jsonl").write_text("") + (sessions_dir / "sessions.json").write_text( + '{"agent:main:test": {"sessionId": "from-index", "sessionFile": "from-index.jsonl", "label": "Indexed"}}' + ) + + sessions = discover_openclaw_transcript_sessions(temp_dir) + + assert [(session.session_id, session.label) for session in sessions] == [ + ("from-index", "Indexed"), + ("orphan", ""), + ] + + +def test_parse_openclaw_transcript_extracts_user_and_assistant_text(temp_dir: Path): + transcript = temp_dir / "session.jsonl" + transcript.write_text( + "\n".join( + [ + '{"type":"header","version":1}', + '{"type":"message","timestamp":1713072000000,"message":{"role":"user","content":[{"type":"text","text":"hello"}]}}', + '{"type":"message","message":{"role":"assistant","content":[{"type":"output_text","text":"hi there"}]}}', + '{"type":"message","message":{"role":"toolResult","content":"ignored"}}', + ] + ) + ) + + messages = parse_openclaw_transcript(transcript) + + assert [message.role for message in messages] == ["user", "assistant"] + assert messages[0].content == "hello" + assert messages[0].created_at is not None + assert messages[1].content == "hi there" + + +def test_migrate_openclaw_dry_run_reports_memory_and_transcript(temp_dir: Path): + workspace = temp_dir / "workspace" + memory_dir = workspace / "memory" + sessions_dir = temp_dir / "agents" / "main" / "sessions" + memory_dir.mkdir(parents=True) + sessions_dir.mkdir(parents=True) + + (workspace / "MEMORY.md").write_text("# Durable\n") + (sessions_dir / "s1.jsonl").write_text( + '{"type":"message","message":{"role":"user","content":[{"type":"text","text":"hello"}]}}' + ) + (sessions_dir / "sessions.json").write_text( + '{"agent:main:s1": {"sessionId": "s1", "sessionFile": "s1.jsonl"}}' + ) + + result = migrate_openclaw(object(), temp_dir, mode="all", dry_run=True) + + assert result["memory"]["summary"] == { + "planned": 1, + "imported": 0, + "skipped": 0, + "failed": 0, + } + assert result["transcript"]["summary"] == { + "planned": 1, + "imported": 0, + "skipped": 0, + "failed": 0, + } diff --git a/tests/server/test_api_content_write.py b/tests/server/test_api_content_write.py index cea70741f..5076271aa 100644 --- a/tests/server/test_api_content_write.py +++ b/tests/server/test_api_content_write.py @@ -22,6 +22,11 @@ async def test_write_endpoint_registered(client): assert resp.status_code == 405 +async def test_import_memory_endpoint_registered(client): + resp = await client.get("/api/v1/content/import-memory") + assert resp.status_code == 405 + + async def test_write_rejects_directory_uri(client_with_resource): client, uri = client_with_resource resp = await client.post( @@ -113,3 +118,37 @@ async def test_write_rejects_removed_semantic_flags(client_with_resource): ) assert resp.status_code == 422 + + +async def test_import_memory_creates_profile_file(client): + uri = "viking://user/memories/profile.md" + resp = await client.post( + "/api/v1/content/import-memory", + json={ + "uri": uri, + "content": "# Imported Profile\n\nOpenClaw durable memory.", + "wait": True, + }, + ) + + assert resp.status_code == 200 + body = resp.json() + assert body["status"] == "ok" + assert body["result"]["uri"] == uri + assert body["result"]["mode"] == "replace" + + read_resp = await client.get("/api/v1/content/read", params={"uri": uri}) + assert read_resp.status_code == 200 + assert read_resp.json()["result"] == "# Imported Profile\n\nOpenClaw durable memory." + + +async def test_import_memory_rejects_resource_uri(client): + resp = await client.post( + "/api/v1/content/import-memory", + json={"uri": "viking://resources/demo.md", "content": "nope"}, + ) + + assert resp.status_code == 400 + body = resp.json() + assert body["status"] == "error" + assert body["error"]["code"] == "INVALID_ARGUMENT" diff --git a/tests/server/test_content_write_service.py b/tests/server/test_content_write_service.py index 6fb72c40a..18b089a12 100644 --- a/tests/server/test_content_write_service.py +++ b/tests/server/test_content_write_service.py @@ -34,6 +34,54 @@ async def test_write_updates_memory_file_and_parent_overview(service): assert await service.viking_fs.read_file(f"{memory_dir}/.abstract.md", ctx=ctx) +@pytest.mark.asyncio +async def test_import_memory_creates_missing_profile_file(service): + ctx = RequestContext(user=service.user, role=Role.USER) + profile_uri = f"viking://user/{ctx.user.user_space_name()}/memories/profile.md" + + result = await service.fs.import_memory( + profile_uri, + content="# Profile\n\nUser likes tea.", + ctx=ctx, + mode="replace", + wait=True, + ) + + assert result["context_type"] == "memory" + assert await service.viking_fs.read_file(profile_uri, ctx=ctx) == "# Profile\n\nUser likes tea." + assert await service.viking_fs.read_file( + f"viking://user/{ctx.user.user_space_name()}/memories/.overview.md", + ctx=ctx, + ) + + +@pytest.mark.asyncio +async def test_import_memory_without_wait_allows_back_to_back_writes(service): + ctx = RequestContext(user=service.user, role=Role.USER) + first_uri = f"viking://user/{ctx.user.user_space_name()}/memories/entities/alice.md" + second_uri = f"viking://user/{ctx.user.user_space_name()}/memories/entities/bob.md" + + first = await service.fs.import_memory( + first_uri, + content="Alice likes tea.", + ctx=ctx, + mode="replace", + wait=False, + ) + second = await service.fs.import_memory( + second_uri, + content="Bob likes coffee.", + ctx=ctx, + mode="replace", + wait=False, + ) + + assert first["uri"] == first_uri + assert second["uri"] == second_uri + assert await service.viking_fs.read_file(first_uri, ctx=ctx) == "Alice likes tea." + assert await service.viking_fs.read_file(second_uri, ctx=ctx) == "Bob likes coffee." + + @pytest.mark.asyncio async def test_write_denies_foreign_user_memory_space(service): owner_ctx = RequestContext(user=service.user, role=Role.USER) @@ -54,6 +102,13 @@ async def test_write_denies_foreign_user_memory_space(service): ctx=foreign_ctx, ) + with pytest.raises(NotFoundError): + await service.fs.import_memory( + memory_uri, + content="Intruder update", + ctx=foreign_ctx, + ) + @pytest.mark.asyncio async def test_memory_replace_preserves_metadata(service): From 7774e27864771d4ba3dcc44acc3747dbee4b6d9e Mon Sep 17 00:00:00 2001 From: yeyitech Date: Tue, 14 Apr 2026 13:12:42 +0800 Subject: [PATCH 2/5] fix(migration): reject async clients in openclaw import --- openviking/migration/openclaw.py | 40 +++++++++++++++++++++++++------- tests/migration/test_openclaw.py | 20 ++++++++++++++++ 2 files changed, 51 insertions(+), 9 deletions(-) diff --git a/openviking/migration/openclaw.py b/openviking/migration/openclaw.py index 62f2fd788..ff8b2dd96 100644 --- a/openviking/migration/openclaw.py +++ b/openviking/migration/openclaw.py @@ -11,6 +11,7 @@ from __future__ import annotations import hashlib +import inspect import json import re import time @@ -328,9 +329,24 @@ def parse_openclaw_transcript(path: str | Path) -> list[OpenClawTranscriptMessag return messages +def _call_sync_client_method(client: Any, method_name: str, *args: Any, **kwargs: Any) -> Any: + """Call a sync client method and fail fast on async clients.""" + method = getattr(client, method_name) + result = method(*args, **kwargs) + if inspect.isawaitable(result): + close = getattr(result, "close", None) + if callable(close): + close() + raise TypeError( + "migrate_openclaw() requires a synchronous client such as " + "SyncOpenViking or SyncHTTPClient; async clients are not supported" + ) + return result + + def _uri_exists(client: Any, uri: str) -> bool: try: - client.stat(uri) + _call_sync_client_method(client, "stat", uri) except NotFoundError: return False return True @@ -338,9 +354,9 @@ def _uri_exists(client: Any, uri: str) -> bool: def _session_exists(client: Any, session_id: str) -> bool: if hasattr(client, "session_exists"): - return bool(client.session_exists(session_id)) + return bool(_call_sync_client_method(client, "session_exists", session_id)) try: - client.get_session(session_id, auto_create=False) + _call_sync_client_method(client, "get_session", session_id, auto_create=False) except NotFoundError: return False return True @@ -364,7 +380,7 @@ def _wait_for_task( ) -> dict[str, Any]: deadline = time.monotonic() + timeout while time.monotonic() <= deadline: - task = client.get_task(task_id) + task = _call_sync_client_method(client, "get_task", task_id) if not task: time.sleep(poll_interval) continue @@ -442,7 +458,9 @@ def migrate_openclaw( memory_records.append(record) continue - result = client.import_memory( + result = _call_sync_client_method( + client, + "import_memory", artifact.uri, content, mode="replace", @@ -479,17 +497,21 @@ def migrate_openclaw( record["status"] = "skipped_exists" transcript_records.append(record) continue - client.delete_session(target_session_id) + _call_sync_client_method(client, "delete_session", target_session_id) - client.create_session(target_session_id) + _call_sync_client_method(client, "create_session", target_session_id) for message in messages: - client.add_message( + _call_sync_client_method( + client, + "add_message", target_session_id, message.role, content=message.content, created_at=message.created_at, ) - commit_result = client.commit_session(target_session_id, telemetry=False) + commit_result = _call_sync_client_method( + client, "commit_session", target_session_id, telemetry=False + ) record["commit_result"] = commit_result task_id = commit_result.get("task_id") if wait and task_id: diff --git a/tests/migration/test_openclaw.py b/tests/migration/test_openclaw.py index 10a0be6b0..bbc9452e3 100644 --- a/tests/migration/test_openclaw.py +++ b/tests/migration/test_openclaw.py @@ -3,6 +3,8 @@ from pathlib import Path +import pytest + from openviking.migration.openclaw import ( discover_openclaw_memory_artifacts, discover_openclaw_transcript_sessions, @@ -97,3 +99,21 @@ def test_migrate_openclaw_dry_run_reports_memory_and_transcript(temp_dir: Path): "skipped": 0, "failed": 0, } + + +def test_migrate_openclaw_rejects_async_clients(temp_dir: Path): + workspace = temp_dir / "workspace" + workspace.mkdir(parents=True) + (workspace / "MEMORY.md").write_text("# Durable\n") + + class AsyncOnlyClient: + async def stat(self, uri: str): + del uri + return {} + + async def import_memory(self, uri: str, content: str, **kwargs): + del uri, content, kwargs + return {"ok": True} + + with pytest.raises(TypeError, match="synchronous client"): + migrate_openclaw(AsyncOnlyClient(), temp_dir, mode="memory", dry_run=False) From 90871c0146f4dd3997fe07999ef15c55482f3b52 Mon Sep 17 00:00:00 2001 From: yeyitech Date: Tue, 14 Apr 2026 13:25:31 +0800 Subject: [PATCH 3/5] style(migration): apply repo ruff formatting --- examples/openclaw-migration/migrate.py | 4 +++- openviking/migration/openclaw.py | 8 ++++---- tests/migration/test_openclaw.py | 5 +---- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/examples/openclaw-migration/migrate.py b/examples/openclaw-migration/migrate.py index 77e16596c..4c7593c03 100644 --- a/examples/openclaw-migration/migrate.py +++ b/examples/openclaw-migration/migrate.py @@ -58,7 +58,9 @@ def _parse_args() -> argparse.Namespace: ) parser.add_argument("--dry-run", action="store_true", help="Preview planned imports only") parser.add_argument("--overwrite", action="store_true", help="Overwrite existing OV targets") - parser.add_argument("--no-wait", action="store_true", help="Do not wait for async queue/task completion") + parser.add_argument( + "--no-wait", action="store_true", help="Do not wait for async queue/task completion" + ) parser.add_argument( "--timeout", type=float, diff --git a/openviking/migration/openclaw.py b/openviking/migration/openclaw.py index ff8b2dd96..28afa8fb9 100644 --- a/openviking/migration/openclaw.py +++ b/openviking/migration/openclaw.py @@ -203,7 +203,9 @@ def discover_openclaw_transcript_sessions( if not session_id: continue - transcript_path = Path(session_file) if session_file else sessions_dir / f"{session_id}.jsonl" + transcript_path = ( + Path(session_file) if session_file else sessions_dir / f"{session_id}.jsonl" + ) if not transcript_path.is_absolute(): transcript_path = sessions_dir / transcript_path if not transcript_path.is_file(): @@ -322,9 +324,7 @@ def parse_openclaw_transcript(path: str | Path) -> list[OpenClawTranscriptMessag if created_at: break - messages.append( - OpenClawTranscriptMessage(role=role, content=text, created_at=created_at) - ) + messages.append(OpenClawTranscriptMessage(role=role, content=text, created_at=created_at)) return messages diff --git a/tests/migration/test_openclaw.py b/tests/migration/test_openclaw.py index bbc9452e3..6ab661e50 100644 --- a/tests/migration/test_openclaw.py +++ b/tests/migration/test_openclaw.py @@ -26,10 +26,7 @@ def test_discover_openclaw_memory_artifacts(temp_dir: Path): assert [artifact.category for artifact in artifacts] == ["entities", "events", "cases"] assert artifacts[0].uri == "viking://user/memories/entities/openclaw-memory.md" assert artifacts[1].uri == "viking://user/memories/events/openclaw-2026-04-01.md" - assert ( - artifacts[2].uri - == "viking://agent/memories/cases/openclaw-2026-04-01-project-alpha.md" - ) + assert artifacts[2].uri == "viking://agent/memories/cases/openclaw-2026-04-01-project-alpha.md" def test_discover_openclaw_transcript_sessions_includes_index_and_orphans(temp_dir: Path): From 3cb63c02d6f39231356eeb8bb80d22b7733acbe4 Mon Sep 17 00:00:00 2001 From: yeyitech Date: Wed, 15 Apr 2026 14:48:48 +0800 Subject: [PATCH 4/5] docs(migration): clarify sync client requirement --- examples/openclaw-migration/README.md | 3 ++ openviking/migration/openclaw.py | 69 ++++++++++++++++++++++++--- 2 files changed, 66 insertions(+), 6 deletions(-) diff --git a/examples/openclaw-migration/README.md b/examples/openclaw-migration/README.md index 5ca7a2dcc..1ed4afb5c 100644 --- a/examples/openclaw-migration/README.md +++ b/examples/openclaw-migration/README.md @@ -8,6 +8,9 @@ This tool imports existing OpenClaw data into OpenViking through two paths: By default it reads from `~/.openclaw` and connects to OpenViking over HTTP using the same config as `ovcli.conf`. You can also point it at an embedded local data path with `--ov-path`. +The migration entrypoint is synchronous. Use `SyncHTTPClient` or `SyncOpenViking`; +async OpenViking clients are intentionally rejected to avoid silent coroutine misuse. + ## Examples ```bash diff --git a/openviking/migration/openclaw.py b/openviking/migration/openclaw.py index 28afa8fb9..cab9bf372 100644 --- a/openviking/migration/openclaw.py +++ b/openviking/migration/openclaw.py @@ -18,7 +18,7 @@ from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path -from typing import Any, Iterable, Sequence +from typing import Any, Iterable, Protocol, Sequence from openviking_cli.exceptions import NotFoundError @@ -67,6 +67,53 @@ class OpenClawTranscriptSession: channel: str = "" +class OpenClawMigrationClient(Protocol): + """Synchronous client surface required by the OpenClaw migration helper.""" + + def stat(self, uri: str) -> dict[str, Any]: + ... + + def import_memory( + self, + uri: str, + content: str, + *, + mode: str = "replace", + wait: bool = True, + timeout: float | None = None, + telemetry: bool = False, + ) -> dict[str, Any]: + ... + + def session_exists(self, session_id: str) -> bool: + ... + + def get_session(self, session_id: str, auto_create: bool = True) -> dict[str, Any]: + ... + + def delete_session(self, session_id: str) -> Any: + ... + + def create_session(self, session_id: str) -> Any: + ... + + def add_message( + self, + session_id: str, + role: str, + *, + content: str, + created_at: str | None = None, + ) -> Any: + ... + + def commit_session(self, session_id: str, *, telemetry: bool = False) -> dict[str, Any]: + ... + + def get_task(self, task_id: str) -> dict[str, Any]: + ... + + def _sanitize_segment(value: str, *, fallback: str) -> str: sanitized = _SAFE_SEGMENT_RE.sub("-", value.strip()).strip("._-") return sanitized or fallback @@ -329,7 +376,12 @@ def parse_openclaw_transcript(path: str | Path) -> list[OpenClawTranscriptMessag return messages -def _call_sync_client_method(client: Any, method_name: str, *args: Any, **kwargs: Any) -> Any: +def _call_sync_client_method( + client: OpenClawMigrationClient, + method_name: str, + *args: Any, + **kwargs: Any, +) -> Any: """Call a sync client method and fail fast on async clients.""" method = getattr(client, method_name) result = method(*args, **kwargs) @@ -344,7 +396,7 @@ def _call_sync_client_method(client: Any, method_name: str, *args: Any, **kwargs return result -def _uri_exists(client: Any, uri: str) -> bool: +def _uri_exists(client: OpenClawMigrationClient, uri: str) -> bool: try: _call_sync_client_method(client, "stat", uri) except NotFoundError: @@ -352,7 +404,7 @@ def _uri_exists(client: Any, uri: str) -> bool: return True -def _session_exists(client: Any, session_id: str) -> bool: +def _session_exists(client: OpenClawMigrationClient, session_id: str) -> bool: if hasattr(client, "session_exists"): return bool(_call_sync_client_method(client, "session_exists", session_id)) try: @@ -372,7 +424,7 @@ def _stable_target_session_id(agent_id: str, session_id: str) -> str: def _wait_for_task( - client: Any, + client: OpenClawMigrationClient, task_id: str, *, timeout: float, @@ -409,7 +461,7 @@ def _summarize_records(records: Iterable[dict[str, Any]]) -> dict[str, int]: def migrate_openclaw( - client: Any, + client: OpenClawMigrationClient, openclaw_dir: str | Path, *, mode: str = "memory", @@ -425,6 +477,11 @@ def migrate_openclaw( Returns a structured summary containing per-item records for both import paths. The caller can print or persist the records as needed. + + The helper expects a synchronous OpenViking client such as + ``SyncOpenViking`` or ``SyncHTTPClient``. If an async client is passed, + the first awaited method is rejected with ``TypeError`` instead of being + silently ignored. """ if mode not in {"memory", "transcript", "all"}: From ce65f68dd3bd6d05d218ed88db18cd53e9f1f194 Mon Sep 17 00:00:00 2001 From: yeyitech Date: Wed, 15 Apr 2026 15:15:06 +0800 Subject: [PATCH 5/5] style(migration): apply ruff formatting --- openviking/migration/openclaw.py | 27 +++++++++------------------ 1 file changed, 9 insertions(+), 18 deletions(-) diff --git a/openviking/migration/openclaw.py b/openviking/migration/openclaw.py index cab9bf372..19fe9c3f3 100644 --- a/openviking/migration/openclaw.py +++ b/openviking/migration/openclaw.py @@ -70,8 +70,7 @@ class OpenClawTranscriptSession: class OpenClawMigrationClient(Protocol): """Synchronous client surface required by the OpenClaw migration helper.""" - def stat(self, uri: str) -> dict[str, Any]: - ... + def stat(self, uri: str) -> dict[str, Any]: ... def import_memory( self, @@ -82,20 +81,15 @@ def import_memory( wait: bool = True, timeout: float | None = None, telemetry: bool = False, - ) -> dict[str, Any]: - ... + ) -> dict[str, Any]: ... - def session_exists(self, session_id: str) -> bool: - ... + def session_exists(self, session_id: str) -> bool: ... - def get_session(self, session_id: str, auto_create: bool = True) -> dict[str, Any]: - ... + def get_session(self, session_id: str, auto_create: bool = True) -> dict[str, Any]: ... - def delete_session(self, session_id: str) -> Any: - ... + def delete_session(self, session_id: str) -> Any: ... - def create_session(self, session_id: str) -> Any: - ... + def create_session(self, session_id: str) -> Any: ... def add_message( self, @@ -104,14 +98,11 @@ def add_message( *, content: str, created_at: str | None = None, - ) -> Any: - ... + ) -> Any: ... - def commit_session(self, session_id: str, *, telemetry: bool = False) -> dict[str, Any]: - ... + def commit_session(self, session_id: str, *, telemetry: bool = False) -> dict[str, Any]: ... - def get_task(self, task_id: str) -> dict[str, Any]: - ... + def get_task(self, task_id: str) -> dict[str, Any]: ... def _sanitize_segment(value: str, *, fallback: str) -> str: