Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Changed

- **`reconcile_client_history` now runs foundation's transcript-repair pass before persisting** the client's view. Catches broken chat-completions clients (orphaned `tool_use` without paired `tool_result`, ordering violations, incomplete assistant turns) that would otherwise cause Anthropic to reject the next LLM call with HTTP 400. Mirrors `_runtime.py:_repair_loaded_transcript_if_needed` from the CLI face. Healthy transcripts pass through unchanged with negligible overhead (<10ms diagnostic).

- **`X-Session-Id` header is now recognized as a fallback** for the existing `X-Client-Session-Id` correlation mechanism (PR #71). opencode and other Vercel AI SDK-based clients send `X-Session-Id` by default; amplifier-agent now picks it up automatically, so session-resume + client-authoritative reconciliation works for opencode with zero config. `X-Client-Session-Id` remains authoritative when both headers are present.

- **Workspace name is no longer suffixed with the client session id.** Previously, `X-Client-Session-Id: abc` would route requests into `workspaces/<base>-abc/`. Now the workspace stays at `<base>` and per-client distinction is purely at the session_id level (`workspaces/<base>/sessions/http-abc/`). This keeps workspace-level hook state (context-intelligence, etc.) shared across all sessions of the same server process, where it belongs.
Expand Down
57 changes: 47 additions & 10 deletions src/amplifier_agent_http/_reconciler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import logging
from typing import TYPE_CHECKING, Any

from amplifier_foundation.session import diagnose_transcript, repair_transcript

if TYPE_CHECKING:
from amplifier_agent_lib.session_store import SessionStore

Expand All @@ -26,16 +28,25 @@ def reconcile_client_history(
session_id: str,
store: SessionStore,
) -> list[dict[str, Any]]:
"""Persist the client's view as authoritative and return it for replay.
"""Repair the client-sent transcript (if broken), then persist as authoritative.

Step 1 — Layer-1 repair: run foundation's ``diagnose_transcript`` /
``repair_transcript`` against the incoming messages. Catches orphaned
``tool_use`` blocks (no paired ``tool_result``), ordering violations, and
incomplete assistant turns that would otherwise cause Anthropic to reject
the next LLM call with HTTP 400. Healthy transcripts pass through
unchanged.

Step 2 — Persist the (now repaired) client view to the session store so
the kernel's resume path loads from a clean state.

Chat-completions is client-authoritative: opencode (and any conforming
OpenAI-compatible client) sends the full conversation every turn. Whatever
we have stored locally is at best a copy. On any divergence the client's
view wins.
Mirrors the CLI face's ``_runtime.py:_repair_loaded_transcript_if_needed``
pattern, but for the HTTP wire's client-authoritative model: we trust the
client's view of the conversation, while still defending Anthropic's
API contract before replay.

We persist over the stored copy (idempotent on healthy resumes — same
content) so the next turn's load is consistent, then return for replay.
No divergence detection, no special events, no ceremony.
The repair runs every turn but is essentially free on healthy
transcripts (Layer 1, pure, <10ms — annotate + diagnose).

Parameters
----------
Expand All @@ -51,10 +62,36 @@ def reconcile_client_history(
Returns
-------
list[dict]
The client's messages, unchanged. Returned for caller's convenience
so the caller doesn't have to re-reference ``client_messages``
The client's messages, repaired if broken and stripped of any
``line_num`` annotations. Returned for caller's convenience so
the caller doesn't have to re-reference ``client_messages``
downstream.
"""
if client_messages:
# Foundation's diagnose_transcript prefers line_num annotations for
# the incomplete-turns fallback path. SessionStore doesn't annotate
# them, so add them to shallow copies before diagnosing. repair_transcript's
# output strips line_num itself; we strip again here defensively in case
# the healthy path is hit (no repair invocation).
annotated = [{**m, "line_num": i + 1} for i, m in enumerate(client_messages)]
diagnosis = diagnose_transcript(annotated)

if diagnosis["status"] != "healthy":
repaired = repair_transcript(annotated, diagnosis)
client_messages = [{k: v for k, v in m.items() if k != "line_num"} for m in repaired]
logger.warning(
"Client-sent transcript was broken — repaired before reconcile. "
"failure_modes=%s orphaned_tool_ids=%s misplaced_tool_ids=%s "
"incomplete_turns=%d entries_before=%d entries_after=%d session=%s",
diagnosis["failure_modes"],
diagnosis["orphaned_tool_ids"],
diagnosis["misplaced_tool_ids"],
len(diagnosis["incomplete_turns"]),
len(annotated),
len(client_messages),
session_id,
)

store.save(
session_id,
client_messages,
Expand Down
141 changes: 141 additions & 0 deletions tests/http/test_reconciler.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,3 +333,144 @@ async def _fake_run(**kwargs: Any) -> str:
assert any(msg.get("content") == "edited message" for msg in transcript if isinstance(msg.get("content"), str)), (
f"Expected 'edited message' in transcript, got: {transcript}"
)


# ---------------------------------------------------------------------------
# Unit tests: repair step in reconcile_client_history
# ---------------------------------------------------------------------------


def test_reconciler_repairs_orphaned_tool_use_before_persist(tmp_path: Path) -> None:
"""Client sends a transcript with an orphaned tool_call (no matching tool result).

The foundation's diagnose_transcript / repair_transcript operates on the
OpenAI wire format: ``tool_calls`` on the assistant message and ``role:
"tool"`` response messages with ``tool_call_id``.

reconcile_client_history runs the foundation repair pass, synthesises a
synthetic ``role: "tool"`` result, persists the cleaned version to the
store, and returns the repaired list — not the original broken one.
"""
store = SessionStore(tmp_path)
sid = "http-repair-orphan"

broken: list[dict[str, Any]] = [
{"role": "user", "content": "do the thing"},
{
"role": "assistant",
"content": None,
"tool_calls": [
{
"id": "call-123",
"type": "function",
"function": {"name": "bash", "arguments": '{"command": "ls"}'},
}
],
},
# missing role:"tool" tool_call_id:"call-123" result — that's the break
{"role": "user", "content": "follow up"},
]

result = reconcile_client_history(
client_messages=broken,
session_id=sid,
store=store,
)

# The returned transcript must no longer have an orphaned tool_call.
# After repair the stored and returned transcripts should be consistent.
loaded = store.load(sid)
assert loaded is not None
transcript, metadata = loaded
assert metadata == {"last_turn": "client_reconciled"}

# Returned value must be the repaired (stored) version.
assert result == transcript

# Every tool_call id in the repaired transcript must have a paired
# role:"tool" result message.
tool_call_ids: set[str] = set()
tool_result_ids: set[str] = set()
for msg in result:
if msg.get("role") == "assistant":
for tc in msg.get("tool_calls") or []:
tool_call_ids.add(tc["id"])
elif msg.get("role") == "tool":
tool_result_ids.add(msg.get("tool_call_id", ""))
unmatched = tool_call_ids - tool_result_ids
assert not unmatched, f"Orphaned tool_call ids remain after repair: {unmatched}"


def test_reconciler_healthy_transcript_passes_through_unchanged(tmp_path: Path) -> None:
"""Client sends a well-formed transcript.

diagnose_transcript reports healthy — no repair is invoked — and
store.save sees a transcript identity-equal to the original input.
"""
store = SessionStore(tmp_path)
sid = "http-healthy-passthrough"

healthy: list[dict[str, Any]] = [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "hi there"},
{"role": "user", "content": "goodbye"},
]

result = reconcile_client_history(
client_messages=healthy,
session_id=sid,
store=store,
)

# Healthy transcript: returned value is the same object (identity check).
assert result is healthy

# Persisted view matches input.
loaded = store.load(sid)
assert loaded is not None
transcript, _ = loaded
assert transcript == healthy


def test_reconciler_logs_warning_with_failure_modes_on_repair(tmp_path: Path, caplog: Any) -> None:
"""When a repair happens, a warning is logged containing 'failure_modes='
and the session_id — operator visibility for production debugging.

Uses the OpenAI wire format (``tool_calls`` / ``role: "tool"``) which is
what foundation's diagnose_transcript understands.
"""
import logging

store = SessionStore(tmp_path)
sid = "http-warn-on-repair"

broken: list[dict[str, Any]] = [
{"role": "user", "content": "do the thing"},
{
"role": "assistant",
"content": None,
"tool_calls": [
{
"id": "call-456",
"type": "function",
"function": {"name": "bash", "arguments": '{"command": "pwd"}'},
}
],
},
# missing role:"tool" tool_call_id:"call-456" result
{"role": "user", "content": "next step"},
]

with caplog.at_level(logging.WARNING, logger="amplifier_agent_http._reconciler"):
reconcile_client_history(
client_messages=broken,
session_id=sid,
store=store,
)

# At least one WARNING record must contain the expected fields.
warning_texts = [r.getMessage() for r in caplog.records if r.levelno == logging.WARNING]
assert warning_texts, "Expected at least one WARNING log entry from the reconciler"
combined = " ".join(warning_texts)
assert "failure_modes=" in combined, f"'failure_modes=' not found in: {combined}"
assert sid in combined, f"session_id '{sid}' not found in warning: {combined}"
Loading