Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- **Chat-completions session resume via `X-Client-Session-Id`.** When the
client sends this header, amplifier-agent now uses a deterministic
``http-<client_sid>`` as the amplifier session_id, auto-detects whether
this is the first turn or a continuation by checking if the session state
dir exists on disk, and passes ``is_resumed`` to the existing kernel
resume mechanism (same primitive the CLI face's ``--resume`` flag uses).
One opencode conversation = one amplifier session — unified audit trail,
persistent hook state across turns, append-mode events.jsonl.

- **Client-authoritative transcript reconciliation** in
``src/amplifier_agent_http/_reconciler.py``. Since the chat-completions
wire is stateless and the client sends full history every turn, on
divergence between stored and incoming the client wins by fiat — we
persist the client's view over our stored copy without any rewind
ceremony. Sufficient for opencode and any well-behaved OpenAI-compatible
client. No new event types introduced.

- **`bundle.md` declares all 4 default providers** (anthropic, openai, azure-openai, ollama). Previously only anthropic was declared; the other 3 had to be installed lazily at first use of `amplifier-agent run` against a host_config that referenced them. Now all 4 ship as part of the prepared bundle — the top-level `providers:` section is processed by `bundle.prepare(install_deps=True)` during cold-prep and the post-install hook, ensuring every provider module is importable before any session is created.

### Changed
Expand Down
63 changes: 63 additions & 0 deletions src/amplifier_agent_http/_reconciler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""Client-authoritative transcript reconciliation for the chat-completions HTTP face.

The chat-completions wire protocol is stateless — every turn the client sends
the full conversation history. With session resume on the server side, the
kernel will have its own stored transcript copy. On divergence (user edited a
past message, opencode rewound, anything), the client's view wins by fiat.

This module is HTTP-face-private. The CLI face (``amplifier-agent run``)
uses a different mechanism (``_runtime.py:_repair_loaded_transcript_if_needed``).
"""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
from amplifier_agent_lib.session_store import SessionStore

logger = logging.getLogger(__name__)


def reconcile_client_history(
*,
client_messages: list[dict[str, Any]],
session_id: str,
store: SessionStore,
) -> list[dict[str, Any]]:
"""Persist the client's view as authoritative and return it for replay.

Chat-completions is client-authoritative: opencode (and any conforming
OpenAI-compatible client) sends the full conversation every turn. Whatever
we have stored locally is at best a copy. On any divergence the client's
view wins.

We persist over the stored copy (idempotent on healthy resumes — same
content) so the next turn's load is consistent, then return for replay.
No divergence detection, no special events, no ceremony.

Parameters
----------
client_messages:
The full ``messages`` array from the chat-completions request body,
already converted to kernel-compatible dicts.
session_id:
The deterministic amplifier sid derived from ``X-Client-Session-Id``.
store:
The ``SessionStore`` instance the HTTP face shares with the kernel
resume mechanism. Same store, same on-disk location.

Returns
-------
list[dict]
The client's messages, unchanged. Returned for caller's convenience
so the caller doesn't have to re-reference ``client_messages``
downstream.
"""
store.save(
session_id,
client_messages,
metadata={"last_turn": "client_reconciled"},
)
return client_messages
9 changes: 8 additions & 1 deletion src/amplifier_agent_http/_session_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ async def run_chat_turn(
display: DisplaySystem,
approval: ApprovalSystem,
session_id: str | None = None,
is_resumed: bool = False,
tools: list[dict[str, Any]] | None = None,
host_tool_yield_state: dict[str, Any] | None = None,
workspace: str | None = None,
Expand Down Expand Up @@ -176,6 +177,12 @@ async def run_chat_turn(
Optional session id. If not provided, a random one is generated. The
kernel uses this to tag events; persistent storage is not the HTTP
face's responsibility.
is_resumed:
Whether to pass ``is_resumed=True`` to ``prepared.create_session``.
When ``True`` the kernel treats this as a continuation of a prior
session (append-mode events.jsonl, etc.). Defaults to ``False`` for
backward compatibility — callers that do not send ``X-Client-Session-Id``
get a fresh session every turn as before.

Returns
-------
Expand Down Expand Up @@ -263,7 +270,7 @@ async def run_chat_turn(
session = await prepared.create_session(
session_id=sid,
session_cwd=None, # POC: bundle uses its own default cwd
is_resumed=False,
is_resumed=is_resumed,
)
finally:
# Restore the lifespan state so concurrent requests start from a
Expand Down
32 changes: 32 additions & 0 deletions src/amplifier_agent_http/routes/chat_completions.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from amplifier_agent_http._auth import require_bearer
from amplifier_agent_http._event_translator import extract_usage, translate_event
from amplifier_agent_http._host_tool_signal import HostToolYield
from amplifier_agent_http._reconciler import reconcile_client_history
from amplifier_agent_http._session_runner import run_chat_turn
from amplifier_agent_http._wire import (
ChatCompletionRequest,
Expand All @@ -48,10 +49,12 @@
stop_chunk,
tool_calls_stop_chunk,
)
from amplifier_agent_lib.persistence import workspaces_root
from amplifier_agent_lib.protocol_points.defaults_http import (
HttpAutoApprovalSystem,
HttpQueueDisplaySystem,
)
from amplifier_agent_lib.session_store import SessionStore

logger = logging.getLogger("amplifier_agent_http.chat_completions")

Expand Down Expand Up @@ -604,6 +607,7 @@ async def chat_completions(
# attach it. amplifier-agent has no opinion on the value's shape beyond
# requiring a non-empty trimmed string.
client_session_id = request.headers.get("X-Client-Session-Id")
client_session_id_clean: str = ""
if client_session_id and base_workspace:
# Strip whitespace, defensively constrain to a safe slug shape.
# Clients are expected to send path-safe IDs.
Expand All @@ -615,6 +619,32 @@ async def chat_completions(
else:
workspace = base_workspace

# Determine the amplifier session_id and resume flag for this turn.
# When the client provides X-Client-Session-Id, we use it deterministically:
# same client_sid -> same amplifier sid across turns, resume if a state dir
# already exists from a prior turn. When the header is absent we keep the
# legacy behavior: fresh random sid per turn, no resume.
sid: str | None
is_resumed: bool
if client_session_id_clean:
# workspace is guaranteed non-None here: client_session_id_clean is
# only set when client_session_id is present AND base_workspace is
# truthy (see the outer if-condition above).
sid = f"http-{client_session_id_clean}"
_ws_root = workspaces_root()
_state_dir = _ws_root / str(workspace) / "sessions" / sid
is_resumed = _state_dir.exists()
# Reconcile client's full-history view against stored. Client wins.
# This also creates the session_dir so the NEXT turn detects is_resumed.
reconcile_client_history(
client_messages=[_msg_to_dict(m) for m in payload.messages],
session_id=sid,
store=SessionStore(_ws_root / str(workspace)),
)
else:
sid = None
is_resumed = False

logger.info(
"chat-completion start chunk_id=%s history_len=%d prompt_chars=%d host_tools=%d workspace=%r client_session_id=%r",
chunk_id,
Expand Down Expand Up @@ -649,6 +679,8 @@ async def chat_completions(
workspace=workspace,
provider_id=provider_id,
upstream_model=payload.model,
session_id=sid,
is_resumed=is_resumed,
)
)

Expand Down
Loading
Loading