diff --git a/CHANGELOG.md b/CHANGELOG.md index fc3732e..782ceb3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- **Chat-completions session resume via `X-Client-Session-Id`.** When the + client sends this header, amplifier-agent now uses a deterministic + ``http-`` as the amplifier session_id, auto-detects whether + this is the first turn or a continuation by checking if the session state + dir exists on disk, and passes ``is_resumed`` to the existing kernel + resume mechanism (same primitive the CLI face's ``--resume`` flag uses). + One opencode conversation = one amplifier session — unified audit trail, + persistent hook state across turns, append-mode events.jsonl. + +- **Client-authoritative transcript reconciliation** in + ``src/amplifier_agent_http/_reconciler.py``. Since the chat-completions + wire is stateless and the client sends full history every turn, on + divergence between stored and incoming the client wins by fiat — we + persist the client's view over our stored copy without any rewind + ceremony. Sufficient for opencode and any well-behaved OpenAI-compatible + client. No new event types introduced. + - **`bundle.md` declares all 4 default providers** (anthropic, openai, azure-openai, ollama). Previously only anthropic was declared; the other 3 had to be installed lazily at first use of `amplifier-agent run` against a host_config that referenced them. Now all 4 ship as part of the prepared bundle — the top-level `providers:` section is processed by `bundle.prepare(install_deps=True)` during cold-prep and the post-install hook, ensuring every provider module is importable before any session is created. ### Changed diff --git a/src/amplifier_agent_http/_reconciler.py b/src/amplifier_agent_http/_reconciler.py new file mode 100644 index 0000000..e394987 --- /dev/null +++ b/src/amplifier_agent_http/_reconciler.py @@ -0,0 +1,63 @@ +"""Client-authoritative transcript reconciliation for the chat-completions HTTP face. + +The chat-completions wire protocol is stateless — every turn the client sends +the full conversation history. With session resume on the server side, the +kernel will have its own stored transcript copy. On divergence (user edited a +past message, opencode rewound, anything), the client's view wins by fiat. + +This module is HTTP-face-private. The CLI face (``amplifier-agent run``) +uses a different mechanism (``_runtime.py:_repair_loaded_transcript_if_needed``). +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from amplifier_agent_lib.session_store import SessionStore + +logger = logging.getLogger(__name__) + + +def reconcile_client_history( + *, + client_messages: list[dict[str, Any]], + session_id: str, + store: SessionStore, +) -> list[dict[str, Any]]: + """Persist the client's view as authoritative and return it for replay. + + Chat-completions is client-authoritative: opencode (and any conforming + OpenAI-compatible client) sends the full conversation every turn. Whatever + we have stored locally is at best a copy. On any divergence the client's + view wins. + + We persist over the stored copy (idempotent on healthy resumes — same + content) so the next turn's load is consistent, then return for replay. + No divergence detection, no special events, no ceremony. + + Parameters + ---------- + client_messages: + The full ``messages`` array from the chat-completions request body, + already converted to kernel-compatible dicts. + session_id: + The deterministic amplifier sid derived from ``X-Client-Session-Id``. + store: + The ``SessionStore`` instance the HTTP face shares with the kernel + resume mechanism. Same store, same on-disk location. + + Returns + ------- + list[dict] + The client's messages, unchanged. Returned for caller's convenience + so the caller doesn't have to re-reference ``client_messages`` + downstream. + """ + store.save( + session_id, + client_messages, + metadata={"last_turn": "client_reconciled"}, + ) + return client_messages diff --git a/src/amplifier_agent_http/_session_runner.py b/src/amplifier_agent_http/_session_runner.py index 20b7d07..e4d261f 100644 --- a/src/amplifier_agent_http/_session_runner.py +++ b/src/amplifier_agent_http/_session_runner.py @@ -143,6 +143,7 @@ async def run_chat_turn( display: DisplaySystem, approval: ApprovalSystem, session_id: str | None = None, + is_resumed: bool = False, tools: list[dict[str, Any]] | None = None, host_tool_yield_state: dict[str, Any] | None = None, workspace: str | None = None, @@ -176,6 +177,12 @@ async def run_chat_turn( Optional session id. If not provided, a random one is generated. The kernel uses this to tag events; persistent storage is not the HTTP face's responsibility. + is_resumed: + Whether to pass ``is_resumed=True`` to ``prepared.create_session``. + When ``True`` the kernel treats this as a continuation of a prior + session (append-mode events.jsonl, etc.). Defaults to ``False`` for + backward compatibility — callers that do not send ``X-Client-Session-Id`` + get a fresh session every turn as before. Returns ------- @@ -263,7 +270,7 @@ async def run_chat_turn( session = await prepared.create_session( session_id=sid, session_cwd=None, # POC: bundle uses its own default cwd - is_resumed=False, + is_resumed=is_resumed, ) finally: # Restore the lifespan state so concurrent requests start from a diff --git a/src/amplifier_agent_http/routes/chat_completions.py b/src/amplifier_agent_http/routes/chat_completions.py index d74656b..bb0d408 100644 --- a/src/amplifier_agent_http/routes/chat_completions.py +++ b/src/amplifier_agent_http/routes/chat_completions.py @@ -35,6 +35,7 @@ from amplifier_agent_http._auth import require_bearer from amplifier_agent_http._event_translator import extract_usage, translate_event from amplifier_agent_http._host_tool_signal import HostToolYield +from amplifier_agent_http._reconciler import reconcile_client_history from amplifier_agent_http._session_runner import run_chat_turn from amplifier_agent_http._wire import ( ChatCompletionRequest, @@ -48,10 +49,12 @@ stop_chunk, tool_calls_stop_chunk, ) +from amplifier_agent_lib.persistence import workspaces_root from amplifier_agent_lib.protocol_points.defaults_http import ( HttpAutoApprovalSystem, HttpQueueDisplaySystem, ) +from amplifier_agent_lib.session_store import SessionStore logger = logging.getLogger("amplifier_agent_http.chat_completions") @@ -604,6 +607,7 @@ async def chat_completions( # attach it. amplifier-agent has no opinion on the value's shape beyond # requiring a non-empty trimmed string. client_session_id = request.headers.get("X-Client-Session-Id") + client_session_id_clean: str = "" if client_session_id and base_workspace: # Strip whitespace, defensively constrain to a safe slug shape. # Clients are expected to send path-safe IDs. @@ -615,6 +619,32 @@ async def chat_completions( else: workspace = base_workspace + # Determine the amplifier session_id and resume flag for this turn. + # When the client provides X-Client-Session-Id, we use it deterministically: + # same client_sid -> same amplifier sid across turns, resume if a state dir + # already exists from a prior turn. When the header is absent we keep the + # legacy behavior: fresh random sid per turn, no resume. + sid: str | None + is_resumed: bool + if client_session_id_clean: + # workspace is guaranteed non-None here: client_session_id_clean is + # only set when client_session_id is present AND base_workspace is + # truthy (see the outer if-condition above). + sid = f"http-{client_session_id_clean}" + _ws_root = workspaces_root() + _state_dir = _ws_root / str(workspace) / "sessions" / sid + is_resumed = _state_dir.exists() + # Reconcile client's full-history view against stored. Client wins. + # This also creates the session_dir so the NEXT turn detects is_resumed. + reconcile_client_history( + client_messages=[_msg_to_dict(m) for m in payload.messages], + session_id=sid, + store=SessionStore(_ws_root / str(workspace)), + ) + else: + sid = None + is_resumed = False + logger.info( "chat-completion start chunk_id=%s history_len=%d prompt_chars=%d host_tools=%d workspace=%r client_session_id=%r", chunk_id, @@ -649,6 +679,8 @@ async def chat_completions( workspace=workspace, provider_id=provider_id, upstream_model=payload.model, + session_id=sid, + is_resumed=is_resumed, ) ) diff --git a/tests/http/test_reconciler.py b/tests/http/test_reconciler.py new file mode 100644 index 0000000..4420cd4 --- /dev/null +++ b/tests/http/test_reconciler.py @@ -0,0 +1,334 @@ +"""Tests for session-resume via X-Client-Session-Id and client-authoritative reconciliation. + +Covers: +- _reconciler.reconcile_client_history persists client view to store. +- Returned list is the same object (not a copy). +- Existing stored transcript is overwritten when client view differs. +- Empty client_messages list is handled without error. +- Route: deterministic ``http-`` when X-Client-Session-Id is present. +- Route: is_resumed=True on the second POST with the same header. +- Route: fresh random sid (None from route perspective) when header is absent. +- Route: client-edited history replaces stored transcript. +""" + +from __future__ import annotations + +from contextlib import asynccontextmanager +from pathlib import Path +from typing import Any +from unittest.mock import MagicMock, patch + +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from amplifier_agent_http._reconciler import reconcile_client_history +from amplifier_agent_http.routes import chat_completions as cc_module +from amplifier_agent_lib.session_store import SessionStore + +# --------------------------------------------------------------------------- +# Shared helpers +# --------------------------------------------------------------------------- + +AUTH = {"Authorization": "Bearer test-key"} + +_REGISTRY = {"claude-3-5-sonnet-20241022": "anthropic"} + + +def _make_test_app( + *, + registry: dict[str, str] | None = None, + workspace: str | None = None, +) -> FastAPI: + """Build a minimal FastAPI app for session-resume tests. + + Accepts an optional *workspace* to simulate a server that has + ``resolved_workspace`` set (required for the X-Client-Session-Id + deterministic-sid path). + """ + prepared_mock = MagicMock() + prepared_mock.mount_plan = {} + state_registry = registry or {} + + @asynccontextmanager + async def _noop_lifespan(application: FastAPI): + application.state.config = MagicMock() + application.state.config.model_id = "amplifier" + application.state.config.api_key = "test-key" + application.state.prepared = prepared_mock + application.state.agent_configs = {} + application.state.resolved_workspace = workspace + application.state.host_config = {} + application.state.available_models = [] + application.state.served_models_registry = state_registry + yield + + app = FastAPI(lifespan=_noop_lifespan) + app.include_router(cc_module.router) + return app + + +def _chat_payload(model: str = "claude-3-5-sonnet-20241022", **kwargs: Any) -> dict[str, Any]: + base: dict[str, Any] = { + "model": model, + "messages": [{"role": "user", "content": "hello"}], + } + base.update(kwargs) + return base + + +# --------------------------------------------------------------------------- +# Unit tests: _reconciler.reconcile_client_history +# --------------------------------------------------------------------------- + + +def test_reconciler_persists_client_view_to_store(tmp_path: Path) -> None: + """Calling reconcile_client_history persists the messages to the store with + the expected metadata, and returns the same list.""" + store = SessionStore(tmp_path) + msgs = [{"role": "user", "content": "hello"}] + + result = reconcile_client_history( + client_messages=msgs, + session_id="http-test-sid", + store=store, + ) + + # Should be persisted on disk. + loaded = store.load("http-test-sid") + assert loaded is not None + transcript, metadata = loaded + assert transcript == msgs + assert metadata == {"last_turn": "client_reconciled"} + + # Return value should be the same object (caller convenience). + assert result is msgs + + +def test_reconciler_returns_client_messages_unchanged(tmp_path: Path) -> None: + """The returned list IS the input list — not a copy.""" + store = SessionStore(tmp_path) + msgs: list[dict[str, Any]] = [{"role": "user", "content": "unchanged"}] + + returned = reconcile_client_history( + client_messages=msgs, + session_id="http-identity-check", + store=store, + ) + assert returned is msgs + + +def test_reconciler_overwrites_existing_stored_transcript(tmp_path: Path) -> None: + """When the store already has a different transcript for this sid, + reconcile_client_history overwrites it with the client's view.""" + store = SessionStore(tmp_path) + sid = "http-overwrite-test" + + # Seed an old transcript. + store.save(sid, [{"role": "assistant", "content": "old"}], metadata={"last_turn": "old"}) + + # Client sends new history. + new_msgs = [{"role": "user", "content": "new turn"}] + reconcile_client_history(client_messages=new_msgs, session_id=sid, store=store) + + loaded = store.load(sid) + assert loaded is not None + transcript, metadata = loaded + assert transcript == new_msgs + assert metadata["last_turn"] == "client_reconciled" + + +def test_reconciler_handles_empty_client_messages(tmp_path: Path) -> None: + """An empty messages array is valid input and gets persisted as such.""" + store = SessionStore(tmp_path) + sid = "http-empty-messages" + + result = reconcile_client_history( + client_messages=[], + session_id=sid, + store=store, + ) + + assert result == [] + loaded = store.load(sid) + assert loaded is not None + transcript, _ = loaded + assert transcript == [] + + +# --------------------------------------------------------------------------- +# Integration tests: chat_completions route + session-resume +# --------------------------------------------------------------------------- + + +def test_chat_completions_route_uses_deterministic_sid_with_client_header( + tmp_path: Path, +) -> None: + """POST /v1/chat/completions with X-Client-Session-Id uses a deterministic + ``http-`` as the amplifier session_id passed to run_chat_turn.""" + app = _make_test_app(registry=_REGISTRY, workspace="test-ws") + captured: dict[str, Any] = {} + + async def _fake_run(**kwargs: Any) -> str: + captured.update(kwargs) + return "ok" + + with ( + patch( + "amplifier_agent_http.routes.chat_completions.run_chat_turn", + side_effect=_fake_run, + ), + patch( + "amplifier_agent_http.routes.chat_completions.workspaces_root", + return_value=tmp_path, + ), + TestClient(app, raise_server_exceptions=False) as client, + ): + resp = client.post( + "/v1/chat/completions", + json=_chat_payload(), + headers={**AUTH, "X-Client-Session-Id": "mysession123"}, + ) + + assert resp.status_code == 200 + assert captured.get("session_id") == "http-mysession123" + + +def test_chat_completions_route_resumes_on_second_request( + tmp_path: Path, +) -> None: + """Second POST with the same X-Client-Session-Id passes is_resumed=True to + run_chat_turn after the first call has persisted the session state dir.""" + app = _make_test_app(registry=_REGISTRY, workspace="test-ws") + captured_calls: list[dict[str, Any]] = [] + + async def _fake_run(**kwargs: Any) -> str: + captured_calls.append(dict(kwargs)) + return "ok" + + with ( + patch( + "amplifier_agent_http.routes.chat_completions.run_chat_turn", + side_effect=_fake_run, + ), + patch( + "amplifier_agent_http.routes.chat_completions.workspaces_root", + return_value=tmp_path, + ), + TestClient(app, raise_server_exceptions=False) as client, + ): + # First turn — session state dir does not exist yet. + resp1 = client.post( + "/v1/chat/completions", + json=_chat_payload(), + headers={**AUTH, "X-Client-Session-Id": "session-resume-test"}, + ) + assert resp1.status_code == 200 + assert captured_calls[0]["is_resumed"] is False + + # Second turn — state dir was created by the first turn's reconciler call. + resp2 = client.post( + "/v1/chat/completions", + json=_chat_payload(), + headers={**AUTH, "X-Client-Session-Id": "session-resume-test"}, + ) + assert resp2.status_code == 200 + assert captured_calls[1]["is_resumed"] is True + + +def test_chat_completions_route_fresh_sid_when_header_absent( + tmp_path: Path, +) -> None: + """Without X-Client-Session-Id, session_id=None is passed to run_chat_turn + (the runner generates a random sid internally), and is_resumed=False.""" + app = _make_test_app(registry=_REGISTRY, workspace="test-ws") + captured_calls: list[dict[str, Any]] = [] + + async def _fake_run(**kwargs: Any) -> str: + captured_calls.append(dict(kwargs)) + return "ok" + + with ( + patch( + "amplifier_agent_http.routes.chat_completions.run_chat_turn", + side_effect=_fake_run, + ), + patch( + "amplifier_agent_http.routes.chat_completions.workspaces_root", + return_value=tmp_path, + ), + TestClient(app, raise_server_exceptions=False) as client, + ): + resp1 = client.post( + "/v1/chat/completions", + json=_chat_payload(), + headers=AUTH, # no X-Client-Session-Id + ) + resp2 = client.post( + "/v1/chat/completions", + json=_chat_payload(), + headers=AUTH, + ) + + assert resp1.status_code == 200 + assert resp2.status_code == 200 + # Both turns have session_id=None (runner picks a random sid) and no resume. + assert captured_calls[0]["session_id"] is None + assert captured_calls[0]["is_resumed"] is False + assert captured_calls[1]["session_id"] is None + assert captured_calls[1]["is_resumed"] is False + + +def test_chat_completions_route_client_edits_history_replaces_stored( + tmp_path: Path, +) -> None: + """Client sends T1 on turn 1, then T2 (different messages) on turn 2 with + the same X-Client-Session-Id. The store must contain T2 after turn 2.""" + app = _make_test_app(registry=_REGISTRY, workspace="test-ws") + + async def _fake_run(**kwargs: Any) -> str: + return "ok" + + sid_clean = "edits-test" + session_id = f"http-{sid_clean}" + + with ( + patch( + "amplifier_agent_http.routes.chat_completions.run_chat_turn", + side_effect=_fake_run, + ), + patch( + "amplifier_agent_http.routes.chat_completions.workspaces_root", + return_value=tmp_path, + ), + TestClient(app, raise_server_exceptions=False) as client, + ): + # Turn 1: original history. + client.post( + "/v1/chat/completions", + json={ + "model": "claude-3-5-sonnet-20241022", + "messages": [{"role": "user", "content": "turn one message"}], + }, + headers={**AUTH, "X-Client-Session-Id": sid_clean}, + ) + + # Turn 2: user edited the past message (different content). + client.post( + "/v1/chat/completions", + json={ + "model": "claude-3-5-sonnet-20241022", + "messages": [{"role": "user", "content": "edited message"}], + }, + headers={**AUTH, "X-Client-Session-Id": sid_clean}, + ) + + # After turn 2, the store should contain the turn-2 client view (T2). + workspace_slug = f"test-ws-{sid_clean}" + store = SessionStore(tmp_path / workspace_slug) + loaded = store.load(session_id) + assert loaded is not None + transcript, _ = loaded + # The transcript should reflect T2 (the edited version). + assert any(msg.get("content") == "edited message" for msg in transcript if isinstance(msg.get("content"), str)), ( + f"Expected 'edited message' in transcript, got: {transcript}" + )