diff --git a/docs/2026-02-22-openclaw-integration-plan.md b/docs/2026-02-22-openclaw-integration-plan.md index 892f8f6..47b57ff 100644 --- a/docs/2026-02-22-openclaw-integration-plan.md +++ b/docs/2026-02-22-openclaw-integration-plan.md @@ -28,7 +28,7 @@ Canonical permanent reference for error behavior: ## Goals -- Make `acpx` JSON output fully correlation-safe for orchestrators. +- Make `acpx` JSON stream ACP-pure for orchestrators. - Make failures machine-readable in JSON mode across all layers (CLI, runtime, queue, ACP). - Make non-interactive permission behavior explicit and policy-driven. - Add idempotent session ensure flow for orchestrators. @@ -54,24 +54,16 @@ Use a two-layer error contract (fully specified in `docs/ACPX_ERROR_STRATEGY.md` ## Required changes -### 1. Correlation-safe JSON event envelope +### 1. ACP-only JSON stream -Current `--format json` events do not consistently carry session/request correlation fields. +Current JSON output includes acpx-specific envelope/event objects. -Add envelope fields to all JSON stream events: +Replace that with strict ACP JSON-RPC stream behavior: -- `eventVersion: 1` -- `sessionId: string` -- `requestId?: string` -- `seq: number` (monotonic per request stream) -- `stream: "prompt" | "control"` -- existing payload-specific fields (`type`, `content`, `toolCallId`, etc.) - -Notes: - -- `requestId` is required for queue-owner submitted turns. -- For direct (non-queued) local turn execution, `requestId` may be omitted. -- `seq` resets per request stream. +- `--format json --json-strict` stdout emits only raw ACP JSON-RPC messages. +- no acpx-specific stream envelope fields (`eventVersion`, `sessionId`, `seq`, `stream`, custom `type`). +- no ACP payload key renaming in stream messages. +- local orchestration metadata stays out-of-band (checkpoint/state/status APIs), not in stream payloads. ### 2. General structured JSON error contract @@ -211,7 +203,7 @@ Primary files: ### Phase 1 (MVP hardening) -- JSON envelope (`eventVersion/sessionId/requestId/seq`) +- ACP-only JSON stream contract - structured JSON errors - session ensure command @@ -232,7 +224,7 @@ Primary files: Unit tests: -- output formatter JSON envelope fields and sequence behavior +- output formatter ACP pass-through behavior in JSON mode - structured error mapping from exit paths - normalization matrix (ACP `RequestError`, queue protocol errors, permission errors, usage errors) - ensure command path resolution (`created=true|false`) @@ -257,9 +249,9 @@ Regression tests: ## Acceptance criteria -- every JSON event in streamed prompt mode includes `eventVersion`, `sessionId`, and `seq` -- queue-submitted turns include `requestId` on all events -- JSON mode failures emit at least one structured `error` event before exit, with stable `code` +- every JSON line in streamed prompt mode is a valid ACP JSON-RPC message +- no acpx-specific envelope/event keys are emitted on ACP stream +- JSON mode failures emit structured machine-readable diagnostics without violating JSON-only strict mode - auth-required failures include `detailCode=AUTH_REQUIRED` and preserve ACP payload when present - `sessions ensure` is idempotent and returns deterministic JSON - `--json-strict` enforces JSON-only output behavior diff --git a/docs/2026-02-27-acpx-session-model.md b/docs/2026-02-27-acpx-session-model.md index e3af560..49310b9 100644 --- a/docs/2026-02-27-acpx-session-model.md +++ b/docs/2026-02-27-acpx-session-model.md @@ -7,296 +7,89 @@ Status: Specification (target model) Define a long-term stable persistence model with: -- one canonical event schema, -- one authoritative event timeline, -- one checkpoint/session schema where conversation and ACPX runtime state are both top-level. +- one authoritative ACP transcript stream, +- one session checkpoint/index schema, +- strict separation between ACP stream data and local runtime bookkeeping. ## Core Decisions -1. Persist exactly one canonical event schema: `acpx.event.v1`. -2. Use append-only NDJSON event files as source of truth. -3. Use `session.json` as a derived checkpoint/index. -4. Keep conversation fields at the root of `session.json` (no nested conversation object). -5. Keep ACPX runtime fields at the root of `session.json` (no separate runtime file). -6. Use `snake_case` for all persisted acpx-owned keys. +1. ACP messages are the only allowed payloads in the stream. +2. The stream is append-only NDJSON, one raw ACP JSON-RPC message per line. +3. `session.json` is a derived checkpoint/index, not a second event protocol. +4. Local reliability state (queue owner, process, retries, lock, offsets) is out-of-band from the ACP stream. +5. No custom event envelope is allowed on the ACP stream. ## Canonical ID Semantics -- `session_id`: acpx local record id (stable primary id for storage paths and lookup). -- `acp_session_id`: ACP adapter/session id exposed by the adapter/runtime. -- `agent_session_id`: upstream harness-native session id (Codex/Claude/OpenCode/Pi/etc), if available. -- `request_id`: turn/control request scope id. -- `event_id`: unique id of one persisted event. +- `acpx_record_id`: acpx local record id (stable storage id). +- `acp_session_id`: ACP session id used on wire. +- `agent_session_id`: harness-native id (Codex/Claude/OpenCode/Pi/etc), when available. +- `request_id`: ACP request id scope. Rules: -- `session_id` is always required. -- `acp_session_id` and `agent_session_id` are optional but should be populated when known. -- IDs may be equal in some runtimes; semantics remain distinct. +- `acpx_record_id` is always required in local storage. +- `acp_session_id` and `agent_session_id` are optional and may appear later. +- Values may be equal in some runtimes; semantics remain distinct. ## Storage Layout -For each `session_id`: +For each `acpx_record_id`: ```text -~/.acpx/sessions/.events.ndjson -~/.acpx/sessions/.events.1.ndjson -~/.acpx/sessions/.events.2.ndjson +~/.acpx/sessions/.stream.ndjson +~/.acpx/sessions/.stream.1.ndjson +~/.acpx/sessions/.stream.2.ndjson ... -~/.acpx/sessions/.json -~/.acpx/sessions/.events.lock +~/.acpx/sessions/.json +~/.acpx/sessions/.stream.lock ``` Rules: -- `events*.ndjson` is authoritative history. -- `.json` is derived checkpoint/index. -- `.events.lock` enforces single-writer sequencing. -- No second persisted event schema. +- `*.stream*.ndjson` is authoritative history. +- `.json` is the local checkpoint/index. +- No second persisted event protocol is allowed. -## Canonical Event Schema (`acpx.event.v1`) +## ACP Stream Contract -Each NDJSON line is exactly one object: +Each NDJSON line is one raw ACP JSON-RPC message as exchanged over ACP. -```json -{ - "schema": "acpx.event.v1", - "event_id": "dce8a12e-4f8b-4a4e-b9f6-1f8f6fd2d66e", - "acpx_record_id": "019c....", - "acp_session_id": "019c....", - "agent_session_id": "019c....", - "request_id": "req_123", - "seq": 412, - "ts": "2026-02-27T12:10:00.000Z", - "type": "output_delta", - "data": { - "stream": "output", - "text": "hello" - } -} -``` - -Field contract: - -- `schema`: fixed string, currently `acpx.event.v1`. -- `event_id`: UUID, unique per event. -- `session_id`: required. -- `acp_session_id`: optional. -- `agent_session_id`: optional. -- `request_id`: optional for session lifecycle events. -- `seq`: strict monotonic integer per session; never resets. -- `ts`: ISO-8601 UTC emit timestamp. -- `type`: event discriminator. -- `data`: type-specific payload. - -## Canonical Event Types - -### Prompt/Turn Flow - -#### `turn_started` - -```json -{ - "type": "turn_started", - "data": { - "mode": "prompt", - "resumed": true, - "input_preview": "first 200 chars" - } -} -``` - -#### `output_delta` - -```json -{ - "type": "output_delta", - "data": { - "stream": "output", - "text": "chunk" - } -} -``` - -`data.stream` enum: - -- `output` -- `thought` - -#### `tool_call` - -```json -{ - "type": "tool_call", - "data": { - "tool_call_id": "call_1", - "title": "run_command", - "status": "in_progress" - } -} -``` - -`data.status` enum: - -- `pending` -- `in_progress` -- `completed` -- `failed` -- `unknown` - -#### `turn_done` - -```json -{ - "type": "turn_done", - "data": { - "stop_reason": "end_turn", - "permission_stats": { - "requested": 1, - "approved": 1, - "denied": 0, - "cancelled": 0 - } - } -} -``` - -#### `error` - -```json -{ - "type": "error", - "data": { - "code": "RUNTIME", - "detail_code": "QUEUE_RUNTIME_PROMPT_FAILED", - "origin": "queue", - "message": "Queue owner disconnected", - "retryable": true, - "acp_error": { - "code": -32002, - "message": "...", - "data": {} - } - } -} -``` - -`data.code` enum: - -- `NO_SESSION` -- `TIMEOUT` -- `PERMISSION_DENIED` -- `PERMISSION_PROMPT_UNAVAILABLE` -- `RUNTIME` -- `USAGE` - -`data.origin` enum: - -- `cli` -- `runtime` -- `queue` -- `acp` - -### Control/Lifecycle Flow - -#### `session_ensured` - -```json -{ - "type": "session_ensured", - "data": { - "created": true, - "name": "my-session" - } -} -``` - -#### `cancel_requested` - -```json -{ - "type": "cancel_requested", - "data": {} -} -``` - -#### `cancel_result` - -```json -{ - "type": "cancel_result", - "data": { - "cancelled": true - } -} -``` - -#### `mode_set` +Allowed message shapes are standard JSON-RPC 2.0 forms used by ACP: -```json -{ - "type": "mode_set", - "data": { - "mode_id": "code" - } -} -``` +- request: `{ "jsonrpc": "2.0", "id": ..., "method": "...", "params": ... }` +- response: `{ "jsonrpc": "2.0", "id": ..., "result": ... }` +- error: `{ "jsonrpc": "2.0", "id": ..., "error": { "code": ..., "message": ..., "data": ... } }` +- notification: `{ "jsonrpc": "2.0", "method": "...", "params": ... }` -#### `config_set` +Examples: ```json -{ - "type": "config_set", - "data": { - "config_id": "model", - "value": "gpt-5.3-codex" - } -} -``` - -#### `status_snapshot` - -```json -{ - "type": "status_snapshot", - "data": { - "status": "alive", - "pid": 1234, - "summary": "status=alive" - } -} +{"jsonrpc":"2.0","id":"req-1","method":"session/prompt","params":{"sessionId":"019c...","prompt":"hi"}} +{"jsonrpc":"2.0","method":"session/update","params":{"sessionUpdate":"agent_message_chunk","content":{"type":"text","text":"Hello"}}} +{"jsonrpc":"2.0","id":"req-1","result":{"stopReason":"end_turn"}} ``` -#### `session_closed` - -```json -{ - "type": "session_closed", - "data": { - "reason": "close" - } -} -``` +Hard constraints: -## Stdout Contract (All JSON Commands) +- no custom `schema` field in streamed messages, +- no synthetic `type`/`stream` envelope keys, +- no acpx-only control/event wrappers in the stream, +- no key renaming of ACP payload fields in the stream. -When `--format json --json-strict` is enabled: +## Stdout Contract (`--format json --json-strict`) -- every stdout line must be valid JSON, -- every JSON line must conform to `acpx.event.v1`, -- no non-JSON diagnostics are allowed on stdout. +For commands that communicate with an ACP adapter, stdout must contain only raw ACP JSON-RPC messages, one per line. -Command behavior: +- no non-ACP JSON objects in this stream, +- no human text in stdout, +- no stderr noise when `--json-strict` is enabled. -- prompt commands emit `turn_started`, zero or more `output_delta`/`tool_call`, then terminal event (`turn_done` or `error`). -- control/status/session commands emit relevant control/lifecycle events (`session_ensured`, `mode_set`, `config_set`, `status_snapshot`, `cancel_result`, `session_closed`) and may emit `error`. -- `seq` ordering must match emission order for each `session_id`. +If local command output is needed for non-ACP commands, that output is not part of the ACP stream contract. ## Session Checkpoint Schema (`acpx.session.v1`) -`session.json` is derived from event replay. - -Conversation fields and runtime fields are stored at the same top level. +`session.json` is derived from replay + local runtime state, with top-level conversation and top-level acpx state. ```json { @@ -310,17 +103,12 @@ Conversation fields and runtime fields are stored at the same top level. "name": "my-session", "created_at": "2026-02-27T12:00:00.000Z", - "updated_at": "2026-02-27T12:10:00.000Z", "last_used_at": "2026-02-27T12:10:00.000Z", - "last_seq": 412, "last_request_id": "req_123", - "closed": false, - "closed_at": null, - "pid": 1234, "event_log": { - "active_path": "/home/user/.acpx/sessions/019c....events.ndjson", + "active_path": "/home/user/.acpx/sessions/019c....stream.ndjson", "segment_count": 3, "max_segment_bytes": 67108864, "max_segments": 5, @@ -331,112 +119,74 @@ Conversation fields and runtime fields are stored at the same top level. "title": null, "messages": [], "cumulative_token_usage": {}, - "request_token_usage": {} + "request_token_usage": {}, + + "acpx": { + "current_mode_id": "code", + "available_commands": ["session/set_mode", "session/set_config_option"] + } } ``` Rules: -- Conversation fields must remain top-level. -- Runtime fields must remain top-level. -- `session.json` must be reconstructible by replaying `events*.ndjson`. - -### Message Encoding - -`messages` uses externally tagged variants: - -- user message: `{ "User": { ... } }` -- agent message: `{ "Agent": { ... } }` -- resume marker: `"Resume"` +- `session.json` is not a transport protocol. +- `session.json` may include local bookkeeping, but the stream may not. +- `session.json` must be reconstructible from stream + local deterministic projection rules. -## Sequence and Single-Writer Rules +## Local State Boundary -To preserve strict monotonic `seq`: +Local app state must stay out of the ACP stream. -1. Acquire exclusive lock on `.events.lock`. -2. Determine next `seq` from checkpoint tail/replay state. -3. Append event line to active segment. -4. Flush append (`fdatasync`/equivalent durability step). -5. Update checkpoint and write `session.json` atomically (temp + rename). -6. Release lock. +Examples of local-only state: -No writes are allowed without acquiring lock. +- queue owner pid and health, +- lock/lease metadata, +- process lifecycle snapshots, +- retry counters, +- write offsets/segment pointers, +- local diagnostics. -## Write Ordering and Failure Behavior +This state belongs in checkpoint/state stores or status commands, never in streamed ACP payloads. -For each event write: +## Sequence and Single-Writer Rules -1. Validate event against `acpx.event.v1`. -2. Validate persisted key policy (`snake_case`). -3. Append event. -4. Update checkpoint. +To preserve strict monotonic ordering: -Failure policy: +1. Acquire `.stream.lock`. +2. Determine next sequence position from checkpoint tail. +3. Append one raw ACP message line. +4. Flush append. +5. Update checkpoint atomically. +6. Release lock. -- append failure: operation fails; no synthetic success. -- checkpoint failure after successful append: event remains authoritative; checkpoint rebuilt later. +No writes are allowed without lock ownership. ## Replay and Recovery On startup or repair: -1. Read all segments oldest -> newest. -2. Validate `schema` and event payloads. -3. Enforce monotonic `seq`. -4. Rebuild checkpoint state. -5. Rewrite `session.json` atomically. +1. Read all stream segments oldest to newest. +2. Parse each line as JSON-RPC ACP message. +3. Rebuild checkpoint projection. +4. Atomically rewrite `session.json`. Corrupt line policy: -- trailing partial final line: ignore only that final line, -- any mid-file invalid line: fatal in strict mode. - -## Rotation and Retention - -Defaults: - -- `max_segment_bytes`: `64 MiB` -- `max_segments`: `5` - -Rotation: - -1. `.events.(n-1).ndjson -> .events.n.ndjson` -2. active `.events.ndjson -> .events.1.ndjson` -3. create new active `.events.ndjson` -4. delete oldest beyond limit. - -All rotation operations must occur under the same session lock. - -## Privacy and Redaction - -Default behavior: - -- persist output deltas and minimal tool-call summaries, -- do not persist raw terminal secrets, -- do not persist opaque provider blobs unless explicitly enabled. +- trailing partial final line: ignore only that line, +- any mid-file invalid line: fail strict replay. ## Validation and Guardrails Required: -- schema validator for `acpx.event.v1` before write, -- persisted-key-casing validator before write, -- contract tests with golden NDJSON fixtures, -- CI checks for unknown/invalid persisted keys, -- parser contract tests that consume canonical events from stdout. - -## Mapping from ACP Runtime to Canonical Events - -- turn accepted -> `turn_started` -- `agent_message_chunk` -> `output_delta` (`stream=output`) -- `agent_thought_chunk` -> `output_delta` (`stream=thought`) -- `tool_call` / `tool_call_update` -> `tool_call` -- runtime/queue/acp failures -> `error` -- completion -> `turn_done` -- ensure/status/set/cancel/close control paths -> matching control/lifecycle types +- stream validator: each line must be JSON-RPC ACP message, +- no acpx envelope/event schema accepted on ACP stream, +- checkpoint validator for `acpx.session.v1`, +- contract tests that assert `--format json --json-strict` emits ACP-only lines. ## Non-Goals -- backward-compat layers for legacy persisted event schemas, -- multiple persisted event schemas for one timeline, -- duplicate canonical event history in `session.json`. +- custom stream schema layered over ACP, +- mixed ACP + acpx envelope events in one stream, +- backward-compat wrappers in the ACP stream. diff --git a/docs/CLI.md b/docs/CLI.md index b19932e..d94c382 100644 --- a/docs/CLI.md +++ b/docs/CLI.md @@ -385,50 +385,36 @@ When a prompt is already in flight for a session, `acpx` uses a per-session queu `--format` controls output mode: - `text` (default): human-readable stream -- `json`: NDJSON event stream for automation +- `json`: raw ACP NDJSON stream for automation - `quiet`: assistant text only -- `--format json --json-strict`: same NDJSON stream, with non-JSON stderr output suppressed +- `--format json --json-strict`: same ACP NDJSON stream, with non-JSON stderr output suppressed ### Prompt/exec output behavior - `text`: assistant text, tool status blocks, client-operation logs, plan updates, and `[done] ` -- `json`: one canonical `acpx.event.v1` object per line +- `json`: one raw ACP JSON-RPC message per line - `quiet`: concatenated assistant text only -Canonical event envelope: +ACP message examples: ```json -{ - "schema": "acpx.event.v1", - "event_id": "...", - "session_id": "...", - "acp_session_id": "...", - "agent_session_id": "...", - "request_id": "...", - "seq": 12, - "ts": "2026-02-27T00:00:00.000Z", - "type": "output_delta", - "data": { - "stream": "output", - "text": "..." - } -} +{"jsonrpc":"2.0","id":"req-1","method":"session/prompt","params":{"sessionId":"019c...","prompt":"hi"}} +{"jsonrpc":"2.0","method":"session/update","params":{"sessionUpdate":"agent_message_chunk","content":{"type":"text","text":"Hello"}}} +{"jsonrpc":"2.0","id":"req-1","result":{"stopReason":"end_turn"}} ``` -JSON error events are also emitted as canonical `acpx.event.v1` payloads with `type: "error"`. +Hard rule for the ACP stream: + +- no acpx-specific event envelope, +- no synthetic `type`/`stream` wrapper fields, +- no ACP payload key renaming. ### Control-command JSON mapping -When `--format json` is used, control commands emit canonical `acpx.event.v1` events: +When `--format json` is used: -- `prompt --no-wait`: `type: "prompt_queued"` -- `sessions new`: `type: "session_ensured"` with `data.created: true` -- `sessions ensure`: `type: "session_ensured"` with `data.created: true|false` -- `sessions close`: `type: "session_closed"` -- `cancel`: `type: "cancel_result"` -- `set-mode`: `type: "mode_set"` -- `set`: `type: "config_set"` -- `status`: `type: "status_snapshot"` +- commands that talk to an ACP adapter emit raw ACP JSON-RPC messages. +- local query commands (`sessions list/show/history`) emit local JSON documents (not ACP stream traffic). ### Sessions/query command output behavior diff --git a/src/acp-jsonrpc.ts b/src/acp-jsonrpc.ts new file mode 100644 index 0000000..4bb7db9 --- /dev/null +++ b/src/acp-jsonrpc.ts @@ -0,0 +1,95 @@ +import type { AnyMessage } from "@agentclientprotocol/sdk"; + +type JsonRpcId = string | number | null; + +function asRecord(value: unknown): Record | null { + if (!value || typeof value !== "object" || Array.isArray(value)) { + return null; + } + return value as Record; +} + +function hasValidId(value: unknown): value is JsonRpcId { + return ( + value === null || + typeof value === "string" || + (typeof value === "number" && Number.isFinite(value)) + ); +} + +function isErrorObject(value: unknown): value is { code: number; message: string } { + const record = asRecord(value); + return ( + !!record && + typeof record.code === "number" && + Number.isFinite(record.code) && + typeof record.message === "string" + ); +} + +function hasResultOrError(value: Record): boolean { + const hasResult = Object.hasOwn(value, "result"); + const hasError = Object.hasOwn(value, "error"); + if (hasResult && hasError) { + return false; + } + if (!hasResult && !hasError) { + return false; + } + if (hasError && !isErrorObject(value.error)) { + return false; + } + return true; +} + +export function isAcpJsonRpcMessage(value: unknown): value is AnyMessage { + const record = asRecord(value); + if (!record || record.jsonrpc !== "2.0") { + return false; + } + + const hasMethod = typeof record.method === "string" && record.method.length > 0; + const hasId = Object.hasOwn(record, "id"); + + if (hasMethod && !hasId) { + // Notification + return true; + } + + if (hasMethod && hasId) { + // Request + return hasValidId(record.id); + } + + if (!hasMethod && hasId) { + // Response + if (!hasValidId(record.id)) { + return false; + } + return hasResultOrError(record); + } + + return false; +} + +export function parsePromptStopReason(message: AnyMessage): string | undefined { + if (!Object.hasOwn(message, "id") || !Object.hasOwn(message, "result")) { + return undefined; + } + const record = asRecord((message as { result?: unknown }).result); + if (!record) { + return undefined; + } + return typeof record.stopReason === "string" ? record.stopReason : undefined; +} + +export function parseJsonRpcErrorMessage(message: AnyMessage): string | undefined { + if (!Object.hasOwn(message, "error")) { + return undefined; + } + const errorRecord = asRecord((message as { error?: unknown }).error); + if (!errorRecord || typeof errorRecord.message !== "string") { + return undefined; + } + return errorRecord.message; +} diff --git a/src/cli-core.ts b/src/cli-core.ts index 3e5f606..0e2e053 100644 --- a/src/cli-core.ts +++ b/src/cli-core.ts @@ -38,7 +38,17 @@ import { normalizeOutputError, type NormalizedOutputError, } from "./error-normalization.js"; -import { createAcpxEvent } from "./events.js"; +import { + agentSessionIdPayload, + emitJsonResult, + printClosedSessionByFormat, + printCreatedSessionBanner, + printEnsuredSessionByFormat, + printNewSessionByFormat, + printPromptSessionBanner, + printQueuedPromptByFormat, + printSessionsByFormat, +} from "./cli/output-render.js"; import { createOutputFormatter } from "./output.js"; import { DEFAULT_HISTORY_LIMIT, @@ -57,12 +67,9 @@ import { sendSession, } from "./session.js"; import { probeQueueOwnerHealth } from "./queue-ipc.js"; -import { normalizeRuntimeSessionId } from "./runtime-session-id.js"; import { - ACPX_EVENT_TYPES, EXIT_CODES, OUTPUT_FORMATS, - type AcpxEventDraft, type OutputFormat, type OutputPolicy, type SessionRecord, @@ -153,280 +160,7 @@ function applyPermissionExitCode(result: { } export { parseTtlSeconds }; - -function printSessionsByFormat(sessions: SessionRecord[], format: OutputFormat): void { - if (format === "json") { - process.stdout.write(`${JSON.stringify(sessions)}\n`); - return; - } - - if (format === "quiet") { - for (const session of sessions) { - const closedMarker = session.closed ? " [closed]" : ""; - process.stdout.write(`${session.acpxRecordId}${closedMarker}\n`); - } - return; - } - - if (sessions.length === 0) { - process.stdout.write("No sessions\n"); - return; - } - - for (const session of sessions) { - const closedMarker = session.closed ? " [closed]" : ""; - process.stdout.write( - `${session.acpxRecordId}${closedMarker}\t${session.name ?? "-"}\t${session.cwd}\t${session.lastUsedAt}\n`, - ); - } -} - -function jsonEventIdentityFromRecord( - record: SessionRecord, - overrides: { - requestId?: string; - nextSeq?: number; - } = {}, -): { - sessionId: string; - acpSessionId?: string; - agentSessionId?: string; - requestId?: string; - nextSeq: number; -} { - return { - sessionId: record.acpxRecordId, - acpSessionId: record.acpSessionId, - agentSessionId: record.agentSessionId, - requestId: overrides.requestId, - nextSeq: overrides.nextSeq ?? record.lastSeq + 1, - }; -} - -function emitControlEvent( - format: OutputFormat, - identity: { - sessionId: string; - acpSessionId?: string; - agentSessionId?: string; - requestId?: string; - nextSeq: number; - }, - draft: AcpxEventDraft, -): boolean { - if (format !== "json") { - return false; - } - - const event = createAcpxEvent( - { - sessionId: identity.sessionId, - acpSessionId: identity.acpSessionId, - agentSessionId: identity.agentSessionId, - requestId: identity.requestId, - seq: identity.nextSeq, - }, - draft, - ); - process.stdout.write(`${JSON.stringify(event)}\n`); - return true; -} - -function printClosedSessionByFormat(record: SessionRecord, format: OutputFormat): void { - if ( - emitControlEvent(format, jsonEventIdentityFromRecord(record), { - type: ACPX_EVENT_TYPES.SESSION_CLOSED, - data: { - reason: "close", - }, - }) - ) { - return; - } - - if (format === "quiet") { - return; - } - - process.stdout.write(`${record.acpxRecordId}\n`); -} - -function agentSessionIdPayload(agentSessionId: string | undefined): { - agentSessionId?: string; -} { - const normalized = normalizeRuntimeSessionId(agentSessionId); - if (!normalized) { - return {}; - } - - return { agentSessionId: normalized }; -} - -function printNewSessionByFormat( - record: SessionRecord, - replaced: SessionRecord | undefined, - format: OutputFormat, -): void { - if ( - emitControlEvent(format, jsonEventIdentityFromRecord(record), { - type: ACPX_EVENT_TYPES.SESSION_ENSURED, - data: { - created: true, - name: record.name, - replaced_session_id: replaced?.acpxRecordId, - }, - }) - ) { - return; - } - - if (format === "quiet") { - process.stdout.write(`${record.acpxRecordId}\n`); - return; - } - - if (replaced) { - process.stdout.write( - `${record.acpxRecordId}\t(replaced ${replaced.acpxRecordId})\n`, - ); - return; - } - - process.stdout.write(`${record.acpxRecordId}\n`); -} - -function printEnsuredSessionByFormat( - record: SessionRecord, - created: boolean, - format: OutputFormat, -): void { - if ( - emitControlEvent(format, jsonEventIdentityFromRecord(record), { - type: ACPX_EVENT_TYPES.SESSION_ENSURED, - data: { - created, - name: record.name, - }, - }) - ) { - return; - } - - if (format === "quiet") { - process.stdout.write(`${record.acpxRecordId}\n`); - return; - } - - const action = created ? "created" : "existing"; - process.stdout.write(`${record.acpxRecordId}\t(${action})\n`); -} - -function printQueuedPromptByFormat( - result: { - sessionId: string; - requestId: string; - }, - format: OutputFormat, -): void { - if ( - emitControlEvent( - format, - { - sessionId: result.sessionId, - requestId: result.requestId, - nextSeq: 0, - }, - { - type: ACPX_EVENT_TYPES.PROMPT_QUEUED, - data: { - request_id: result.requestId, - }, - }, - ) - ) { - return; - } - - if (format === "quiet") { - return; - } - - process.stdout.write(`[queued] ${result.requestId}\n`); -} - -function formatSessionLabel(record: SessionRecord): string { - return record.name ?? "cwd"; -} - -function formatRoutedFrom(sessionCwd: string, currentCwd: string): string | undefined { - const relative = path.relative(sessionCwd, currentCwd); - if (!relative || relative === ".") { - return undefined; - } - return relative.startsWith(".") ? relative : `.${path.sep}${relative}`; -} - -type SessionConnectionStatus = "connected" | "needs reconnect"; - -async function resolveSessionConnectionStatus( - record: SessionRecord, -): Promise { - const health = await probeQueueOwnerHealth(record.acpxRecordId); - return health.healthy ? "connected" : "needs reconnect"; -} - -export function formatPromptSessionBannerLine( - record: SessionRecord, - currentCwd: string, - connectionStatus: SessionConnectionStatus = "needs reconnect", -): string { - const label = formatSessionLabel(record); - const normalizedSessionCwd = path.resolve(record.cwd); - const normalizedCurrentCwd = path.resolve(currentCwd); - const routedFrom = - normalizedSessionCwd === normalizedCurrentCwd - ? undefined - : formatRoutedFrom(normalizedSessionCwd, normalizedCurrentCwd); - const status = connectionStatus; - - if (routedFrom) { - return `[acpx] session ${label} (${record.acpxRecordId}) · ${normalizedSessionCwd} (routed from ${routedFrom}) · agent ${status}`; - } - - return `[acpx] session ${label} (${record.acpxRecordId}) · ${normalizedSessionCwd} · agent ${status}`; -} - -async function printPromptSessionBanner( - record: SessionRecord, - currentCwd: string, - format: OutputFormat, - jsonStrict = false, -): Promise { - if (format === "quiet" || (jsonStrict && format === "json")) { - return; - } - - const status = await resolveSessionConnectionStatus(record); - process.stderr.write( - `${formatPromptSessionBannerLine(record, currentCwd, status)}\n`, - ); -} - -function printCreatedSessionBanner( - record: SessionRecord, - agentName: string, - format: OutputFormat, - jsonStrict = false, -): void { - if (format === "quiet" || (jsonStrict && format === "json")) { - return; - } - - const label = formatSessionLabel(record); - process.stderr.write(`[acpx] created session ${label} (${record.acpxRecordId})\n`); - process.stderr.write(`[acpx] agent: ${agentName}\n`); - process.stderr.write(`[acpx] cwd: ${record.cwd}\n`); -} +export { formatPromptSessionBannerLine } from "./cli/output-render.js"; async function findRoutedSessionOrThrow( agentCommand: string, @@ -560,19 +294,11 @@ function printCancelResultByFormat( format: OutputFormat, ): void { if ( - emitControlEvent( - format, - { - sessionId: result.sessionId || "unknown", - nextSeq: 0, - }, - { - type: ACPX_EVENT_TYPES.CANCEL_RESULT, - data: { - cancelled: result.cancelled, - }, - }, - ) + emitJsonResult(format, { + action: "cancel_result", + acpxRecordId: result.sessionId || "unknown", + cancelled: result.cancelled, + }) ) { return; } @@ -591,12 +317,13 @@ function printSetModeResultByFormat( format: OutputFormat, ): void { if ( - emitControlEvent(format, jsonEventIdentityFromRecord(result.record), { - type: ACPX_EVENT_TYPES.MODE_SET, - data: { - mode_id: modeId, - resumed: result.resumed, - }, + emitJsonResult(format, { + action: "mode_set", + modeId, + resumed: result.resumed, + acpxRecordId: result.record.acpxRecordId, + acpxSessionId: result.record.acpSessionId, + agentSessionId: result.record.agentSessionId, }) ) { return; @@ -621,14 +348,15 @@ function printSetConfigOptionResultByFormat( format: OutputFormat, ): void { if ( - emitControlEvent(format, jsonEventIdentityFromRecord(result.record), { - type: ACPX_EVENT_TYPES.CONFIG_SET, - data: { - config_id: configId, - value, - resumed: result.resumed, - config_options: result.response.configOptions, - }, + emitJsonResult(format, { + action: "config_set", + configId, + value, + resumed: result.resumed, + configOptions: result.response.configOptions, + acpxRecordId: result.record.acpxRecordId, + acpxSessionId: result.record.acpSessionId, + agentSessionId: result.record.agentSessionId, }) ) { return; @@ -1126,20 +854,11 @@ async function handleStatus( if (!record) { if ( - emitControlEvent( - globalFlags.format, - { - sessionId: "unknown", - nextSeq: 0, - }, - { - type: ACPX_EVENT_TYPES.STATUS_SNAPSHOT, - data: { - status: "no-session", - summary: "no active session", - }, - }, - ) + emitJsonResult(globalFlags.format, { + action: "status_snapshot", + status: "no-session", + summary: "no active session", + }) ) { return; } @@ -1173,17 +892,18 @@ async function handleStatus( }; if ( - emitControlEvent(globalFlags.format, jsonEventIdentityFromRecord(record), { - type: ACPX_EVENT_TYPES.STATUS_SNAPSHOT, - data: { - status: running ? "alive" : "dead", - pid: payload.pid ?? undefined, - summary: running ? "queue owner healthy" : "queue owner unavailable", - uptime: payload.uptime ?? undefined, - last_prompt_time: payload.lastPromptTime ?? undefined, - exit_code: payload.exitCode ?? undefined, - signal: payload.signal ?? undefined, - }, + emitJsonResult(globalFlags.format, { + action: "status_snapshot", + status: running ? "alive" : "dead", + pid: payload.pid ?? undefined, + summary: running ? "queue owner healthy" : "queue owner unavailable", + uptime: payload.uptime ?? undefined, + lastPromptTime: payload.lastPromptTime ?? undefined, + exitCode: payload.exitCode ?? undefined, + signal: payload.signal ?? undefined, + acpxRecordId: record.acpxRecordId, + acpxSessionId: record.acpSessionId, + agentSessionId: record.agentSessionId, }) ) { return; diff --git a/src/cli/output-render.ts b/src/cli/output-render.ts new file mode 100644 index 0000000..d11e458 --- /dev/null +++ b/src/cli/output-render.ts @@ -0,0 +1,234 @@ +import path from "node:path"; +import { probeQueueOwnerHealth } from "../queue-ipc.js"; +import { normalizeRuntimeSessionId } from "../runtime-session-id.js"; +import type { OutputFormat, SessionRecord } from "../types.js"; + +function formatSessionLabel(record: SessionRecord): string { + return record.name ?? "cwd"; +} + +function formatRoutedFrom(sessionCwd: string, currentCwd: string): string | undefined { + const relative = path.relative(sessionCwd, currentCwd); + if (!relative || relative === ".") { + return undefined; + } + return relative.startsWith(".") ? relative : `.${path.sep}${relative}`; +} + +type SessionConnectionStatus = "connected" | "needs reconnect"; + +async function resolveSessionConnectionStatus( + record: SessionRecord, +): Promise { + const health = await probeQueueOwnerHealth(record.acpxRecordId); + return health.healthy ? "connected" : "needs reconnect"; +} + +export function emitJsonResult(format: OutputFormat, payload: unknown): boolean { + if (format !== "json") { + return false; + } + process.stdout.write(`${JSON.stringify(payload)}\n`); + return true; +} + +export function printSessionsByFormat( + sessions: SessionRecord[], + format: OutputFormat, +): void { + if (format === "json") { + process.stdout.write(`${JSON.stringify(sessions)}\n`); + return; + } + + if (format === "quiet") { + for (const session of sessions) { + const closedMarker = session.closed ? " [closed]" : ""; + process.stdout.write(`${session.acpxRecordId}${closedMarker}\n`); + } + return; + } + + if (sessions.length === 0) { + process.stdout.write("No sessions\n"); + return; + } + + for (const session of sessions) { + const closedMarker = session.closed ? " [closed]" : ""; + process.stdout.write( + `${session.acpxRecordId}${closedMarker}\t${session.name ?? "-"}\t${session.cwd}\t${session.lastUsedAt}\n`, + ); + } +} + +export function printClosedSessionByFormat( + record: SessionRecord, + format: OutputFormat, +): void { + if ( + emitJsonResult(format, { + action: "session_closed", + acpxRecordId: record.acpxRecordId, + acpxSessionId: record.acpSessionId, + agentSessionId: record.agentSessionId, + }) + ) { + return; + } + + if (format === "quiet") { + return; + } + + process.stdout.write(`${record.acpxRecordId}\n`); +} + +export function printNewSessionByFormat( + record: SessionRecord, + replaced: SessionRecord | undefined, + format: OutputFormat, +): void { + if ( + emitJsonResult(format, { + action: "session_ensured", + created: true, + acpxRecordId: record.acpxRecordId, + acpxSessionId: record.acpSessionId, + agentSessionId: record.agentSessionId, + name: record.name, + replacedSessionId: replaced?.acpxRecordId, + }) + ) { + return; + } + + if (format === "quiet") { + process.stdout.write(`${record.acpxRecordId}\n`); + return; + } + + if (replaced) { + process.stdout.write( + `${record.acpxRecordId}\t(replaced ${replaced.acpxRecordId})\n`, + ); + return; + } + + process.stdout.write(`${record.acpxRecordId}\n`); +} + +export function printEnsuredSessionByFormat( + record: SessionRecord, + created: boolean, + format: OutputFormat, +): void { + if ( + emitJsonResult(format, { + action: "session_ensured", + created, + acpxRecordId: record.acpxRecordId, + acpxSessionId: record.acpSessionId, + agentSessionId: record.agentSessionId, + name: record.name, + }) + ) { + return; + } + + if (format === "quiet") { + process.stdout.write(`${record.acpxRecordId}\n`); + return; + } + + const action = created ? "created" : "existing"; + process.stdout.write(`${record.acpxRecordId}\t(${action})\n`); +} + +export function printQueuedPromptByFormat( + result: { + sessionId: string; + requestId: string; + }, + format: OutputFormat, +): void { + if ( + emitJsonResult(format, { + action: "prompt_queued", + acpxRecordId: result.sessionId, + requestId: result.requestId, + }) + ) { + return; + } + + if (format === "quiet") { + return; + } + + process.stdout.write(`[queued] ${result.requestId}\n`); +} + +export function formatPromptSessionBannerLine( + record: SessionRecord, + currentCwd: string, + connectionStatus: SessionConnectionStatus = "needs reconnect", +): string { + const label = formatSessionLabel(record); + const normalizedSessionCwd = path.resolve(record.cwd); + const normalizedCurrentCwd = path.resolve(currentCwd); + const routedFrom = + normalizedSessionCwd === normalizedCurrentCwd + ? undefined + : formatRoutedFrom(normalizedSessionCwd, normalizedCurrentCwd); + const status = connectionStatus; + + if (routedFrom) { + return `[acpx] session ${label} (${record.acpxRecordId}) · ${normalizedSessionCwd} (routed from ${routedFrom}) · agent ${status}`; + } + + return `[acpx] session ${label} (${record.acpxRecordId}) · ${normalizedSessionCwd} · agent ${status}`; +} + +export async function printPromptSessionBanner( + record: SessionRecord, + currentCwd: string, + format: OutputFormat, + jsonStrict = false, +): Promise { + if (format === "quiet" || (jsonStrict && format === "json")) { + return; + } + + const status = await resolveSessionConnectionStatus(record); + process.stderr.write( + `${formatPromptSessionBannerLine(record, currentCwd, status)}\n`, + ); +} + +export function printCreatedSessionBanner( + record: SessionRecord, + agentName: string, + format: OutputFormat, + jsonStrict = false, +): void { + if (format === "quiet" || (jsonStrict && format === "json")) { + return; + } + + const label = formatSessionLabel(record); + process.stderr.write(`[acpx] created session ${label} (${record.acpxRecordId})\n`); + process.stderr.write(`[acpx] agent: ${agentName}\n`); + process.stderr.write(`[acpx] cwd: ${record.cwd}\n`); +} + +export function agentSessionIdPayload(agentSessionId: string | undefined): { + agentSessionId?: string; +} { + const normalized = normalizeRuntimeSessionId(agentSessionId); + if (!normalized) { + return {}; + } + + return { agentSessionId: normalized }; +} diff --git a/src/client.ts b/src/client.ts index d47ddc1..2c20042 100644 --- a/src/client.ts +++ b/src/client.ts @@ -2,6 +2,7 @@ import { ClientSideConnection, PROTOCOL_VERSION, ndJsonStream, + type AnyMessage, type AuthMethod, type CreateTerminalRequest, type CreateTerminalResponse, @@ -393,7 +394,7 @@ export class AcpClient { const input = Writable.toWeb(child.stdin); const output = Readable.toWeb(child.stdout) as ReadableStream; - const stream = ndJsonStream(input, output); + const stream = this.createTappedStream(ndJsonStream(input, output)); const connection = new ClientSideConnection( () => ({ @@ -483,6 +484,56 @@ export class AcpClient { } } + private createTappedStream(base: { + readable: ReadableStream; + writable: WritableStream; + }): { + readable: ReadableStream; + writable: WritableStream; + } { + const onAcpMessage = this.options.onAcpMessage; + + if (!onAcpMessage) { + return base; + } + + const readable = new ReadableStream({ + async start(controller) { + const reader = base.readable.getReader(); + try { + while (true) { + const { value, done } = await reader.read(); + if (done) { + break; + } + if (!value) { + continue; + } + onAcpMessage("inbound", value); + controller.enqueue(value); + } + } finally { + reader.releaseLock(); + controller.close(); + } + }, + }); + + const writable = new WritableStream({ + async write(message) { + onAcpMessage("outbound", message); + const writer = base.writable.getWriter(); + try { + await writer.write(message); + } finally { + writer.releaseLock(); + } + }, + }); + + return { readable, writable }; + } + async createSession(cwd = this.options.cwd): Promise { const connection = this.getConnection(); const result = await connection.newSession({ diff --git a/src/events.ts b/src/events.ts deleted file mode 100644 index f709bca..0000000 --- a/src/events.ts +++ /dev/null @@ -1,421 +0,0 @@ -import { randomUUID } from "node:crypto"; -import type { SessionNotification } from "@agentclientprotocol/sdk"; -import { - ACPX_EVENT_OUTPUT_STREAMS, - ACPX_EVENT_SCHEMA, - ACPX_EVENT_STATUS_SNAPSHOT_STATUSES, - ACPX_EVENT_TOOL_CALL_STATUSES, - ACPX_EVENT_TURN_MODES, - ACPX_EVENT_TYPES, - OUTPUT_ERROR_CODES, - OUTPUT_ERROR_ORIGINS, - type AcpxEvent, - type AcpxEventDraft, - type AcpxEventOutputStream, - type ClientOperation, - type OutputErrorAcpPayload, - type OutputErrorCode, - type OutputErrorOrigin, -} from "./types.js"; - -type EventIdentity = { - sessionId: string; - acpSessionId?: string; - agentSessionId?: string; - requestId?: string; - seq: number; - ts?: string; -}; - -function asRecord(value: unknown): Record | undefined { - if (!value || typeof value !== "object" || Array.isArray(value)) { - return undefined; - } - return value as Record; -} - -function isoNow(): string { - return new Date().toISOString(); -} - -function trimNonEmpty(value: string | undefined): string | undefined { - if (!value) { - return undefined; - } - const trimmed = value.trim(); - return trimmed.length > 0 ? trimmed : undefined; -} - -export function truncateInputPreview(message: string, maxChars = 200): string { - const trimmed = message.trim(); - if (trimmed.length <= maxChars) { - return trimmed; - } - if (maxChars <= 3) { - return trimmed.slice(0, maxChars); - } - return `${trimmed.slice(0, maxChars - 3)}...`; -} - -export function createAcpxEvent( - identity: EventIdentity, - draft: AcpxEventDraft, -): AcpxEvent { - return { - schema: ACPX_EVENT_SCHEMA, - event_id: randomUUID(), - session_id: identity.sessionId, - acp_session_id: trimNonEmpty(identity.acpSessionId), - agent_session_id: trimNonEmpty(identity.agentSessionId), - request_id: trimNonEmpty(draft.request_id ?? identity.requestId), - seq: identity.seq, - ts: identity.ts ?? isoNow(), - type: draft.type, - data: draft.data, - } as AcpxEvent; -} - -export function sessionUpdateToEventDrafts( - notification: SessionNotification, -): AcpxEventDraft[] { - const update = notification.update; - - switch (update.sessionUpdate) { - case "agent_message_chunk": { - if (update.content.type !== "text") { - return []; - } - return [ - { - type: ACPX_EVENT_TYPES.OUTPUT_DELTA, - data: { - stream: "output", - text: update.content.text, - }, - }, - ]; - } - case "agent_thought_chunk": { - if (update.content.type !== "text") { - return []; - } - return [ - { - type: ACPX_EVENT_TYPES.OUTPUT_DELTA, - data: { - stream: "thought", - text: update.content.text, - }, - }, - ]; - } - case "tool_call": - case "tool_call_update": { - return [ - { - type: ACPX_EVENT_TYPES.TOOL_CALL, - data: { - tool_call_id: update.toolCallId, - title: update.title ?? undefined, - status: update.status ?? undefined, - }, - }, - ]; - } - case "plan": { - return [ - { - type: ACPX_EVENT_TYPES.PLAN, - data: { - entries: update.entries.map((entry) => ({ - content: entry.content, - status: entry.status, - priority: entry.priority, - })), - }, - }, - ]; - } - default: { - return [ - { - type: ACPX_EVENT_TYPES.UPDATE, - data: { - update: update.sessionUpdate, - }, - }, - ]; - } - } -} - -export function clientOperationToEventDraft( - operation: ClientOperation, -): AcpxEventDraft { - return { - type: ACPX_EVENT_TYPES.CLIENT_OPERATION, - data: { - method: operation.method, - status: operation.status, - summary: operation.summary, - details: operation.details, - }, - }; -} - -export function errorToEventDraft(params: { - code: OutputErrorCode; - detailCode?: string; - origin?: OutputErrorOrigin; - message: string; - retryable?: boolean; - acp?: OutputErrorAcpPayload; -}): AcpxEventDraft { - return { - type: ACPX_EVENT_TYPES.ERROR, - data: { - code: params.code, - detail_code: params.detailCode, - origin: params.origin, - message: params.message, - retryable: params.retryable, - acp_error: params.acp, - }, - }; -} - -function isAcpxEventOutputStream(value: unknown): value is AcpxEventOutputStream { - return ( - typeof value === "string" && - ACPX_EVENT_OUTPUT_STREAMS.includes(value as AcpxEventOutputStream) - ); -} - -function isOutputErrorCode(value: unknown): value is OutputErrorCode { - return ( - typeof value === "string" && OUTPUT_ERROR_CODES.includes(value as OutputErrorCode) - ); -} - -function isOutputErrorOrigin(value: unknown): value is OutputErrorOrigin { - return ( - typeof value === "string" && - OUTPUT_ERROR_ORIGINS.includes(value as OutputErrorOrigin) - ); -} - -function isAcpError(value: unknown): value is OutputErrorAcpPayload { - const record = asRecord(value); - return ( - !!record && - typeof record.code === "number" && - Number.isFinite(record.code) && - typeof record.message === "string" - ); -} - -function isToolCallStatus(value: unknown): boolean { - return ( - typeof value === "string" && - ACPX_EVENT_TOOL_CALL_STATUSES.includes( - value as (typeof ACPX_EVENT_TOOL_CALL_STATUSES)[number], - ) - ); -} - -function isFiniteInteger(value: unknown): value is number { - return typeof value === "number" && Number.isInteger(value); -} - -function hasOnlyKeys(data: Record, allowed: string[]): boolean { - const allowedSet = new Set(allowed); - return Object.keys(data).every((key) => allowedSet.has(key)); -} - -export function isAcpxEvent(value: unknown): value is AcpxEvent { - const event = asRecord(value); - if (!event) { - return false; - } - - if ( - event.schema !== ACPX_EVENT_SCHEMA || - typeof event.event_id !== "string" || - typeof event.session_id !== "string" || - typeof event.seq !== "number" || - !Number.isInteger(event.seq) || - event.seq < 0 || - typeof event.ts !== "string" || - typeof event.type !== "string" - ) { - return false; - } - - if (event.request_id !== undefined && typeof event.request_id !== "string") { - return false; - } - - if (event.acp_session_id !== undefined && typeof event.acp_session_id !== "string") { - return false; - } - - if ( - event.agent_session_id !== undefined && - typeof event.agent_session_id !== "string" - ) { - return false; - } - - const data = asRecord(event.data); - if (!data) { - return false; - } - - switch (event.type) { - case ACPX_EVENT_TYPES.TURN_STARTED: - return ( - hasOnlyKeys(data, ["mode", "resumed", "input_preview"]) && - typeof data.mode === "string" && - ACPX_EVENT_TURN_MODES.includes( - data.mode as (typeof ACPX_EVENT_TURN_MODES)[number], - ) && - typeof data.resumed === "boolean" && - (data.input_preview === undefined || typeof data.input_preview === "string") - ); - case ACPX_EVENT_TYPES.OUTPUT_DELTA: - return ( - hasOnlyKeys(data, ["stream", "text"]) && - isAcpxEventOutputStream(data.stream) && - typeof data.text === "string" - ); - case ACPX_EVENT_TYPES.TOOL_CALL: - return ( - hasOnlyKeys(data, ["tool_call_id", "title", "status"]) && - typeof data.tool_call_id === "string" && - data.tool_call_id.trim().length > 0 && - (data.title === undefined || - (typeof data.title === "string" && data.title.trim().length > 0)) && - (data.status === undefined || isToolCallStatus(data.status)) - ); - case ACPX_EVENT_TYPES.PLAN: - return ( - hasOnlyKeys(data, ["entries"]) && - Array.isArray(data.entries) && - data.entries.every((entry) => { - const parsed = asRecord(entry); - return ( - !!parsed && - hasOnlyKeys(parsed, ["content", "status", "priority"]) && - typeof parsed.content === "string" && - typeof parsed.status === "string" && - typeof parsed.priority === "string" - ); - }) - ); - case ACPX_EVENT_TYPES.UPDATE: - return hasOnlyKeys(data, ["update"]) && typeof data.update === "string"; - case ACPX_EVENT_TYPES.CLIENT_OPERATION: - return ( - hasOnlyKeys(data, ["method", "status", "summary", "details"]) && - typeof data.method === "string" && - typeof data.status === "string" && - typeof data.summary === "string" && - (data.details === undefined || typeof data.details === "string") - ); - case ACPX_EVENT_TYPES.TURN_DONE: - return ( - hasOnlyKeys(data, ["stop_reason", "permission_stats"]) && - typeof data.stop_reason === "string" && - (data.permission_stats === undefined || - (() => { - const stats = asRecord(data.permission_stats); - return ( - !!stats && - hasOnlyKeys(stats, ["requested", "approved", "denied", "cancelled"]) && - isFiniteInteger(stats.requested) && - isFiniteInteger(stats.approved) && - isFiniteInteger(stats.denied) && - isFiniteInteger(stats.cancelled) - ); - })()) - ); - case ACPX_EVENT_TYPES.ERROR: - return ( - hasOnlyKeys(data, [ - "code", - "detail_code", - "origin", - "message", - "retryable", - "acp_error", - ]) && - isOutputErrorCode(data.code) && - (data.detail_code === undefined || typeof data.detail_code === "string") && - (data.origin === undefined || isOutputErrorOrigin(data.origin)) && - typeof data.message === "string" && - (data.retryable === undefined || typeof data.retryable === "boolean") && - (data.acp_error === undefined || isAcpError(data.acp_error)) - ); - case ACPX_EVENT_TYPES.PROMPT_QUEUED: - return ( - hasOnlyKeys(data, ["request_id"]) && - typeof data.request_id === "string" && - data.request_id.trim().length > 0 - ); - case ACPX_EVENT_TYPES.SESSION_ENSURED: - return ( - hasOnlyKeys(data, ["created", "name", "replaced_session_id"]) && - typeof data.created === "boolean" && - (data.name === undefined || typeof data.name === "string") && - (data.replaced_session_id === undefined || - typeof data.replaced_session_id === "string") - ); - case ACPX_EVENT_TYPES.CANCEL_REQUESTED: - return hasOnlyKeys(data, []); - case ACPX_EVENT_TYPES.CANCEL_RESULT: - return hasOnlyKeys(data, ["cancelled"]) && typeof data.cancelled === "boolean"; - case ACPX_EVENT_TYPES.MODE_SET: - return ( - hasOnlyKeys(data, ["mode_id", "resumed"]) && - typeof data.mode_id === "string" && - data.mode_id.trim().length > 0 && - (data.resumed === undefined || typeof data.resumed === "boolean") - ); - case ACPX_EVENT_TYPES.CONFIG_SET: - return ( - hasOnlyKeys(data, ["config_id", "value", "resumed", "config_options"]) && - typeof data.config_id === "string" && - data.config_id.trim().length > 0 && - typeof data.value === "string" && - (data.resumed === undefined || typeof data.resumed === "boolean") && - (data.config_options === undefined || Array.isArray(data.config_options)) - ); - case ACPX_EVENT_TYPES.STATUS_SNAPSHOT: - return ( - hasOnlyKeys(data, [ - "status", - "pid", - "summary", - "uptime", - "last_prompt_time", - "exit_code", - "signal", - ]) && - typeof data.status === "string" && - ACPX_EVENT_STATUS_SNAPSHOT_STATUSES.includes( - data.status as (typeof ACPX_EVENT_STATUS_SNAPSHOT_STATUSES)[number], - ) && - (data.pid === undefined || (isFiniteInteger(data.pid) && data.pid > 0)) && - (data.summary === undefined || typeof data.summary === "string") && - (data.uptime === undefined || typeof data.uptime === "string") && - (data.last_prompt_time === undefined || - typeof data.last_prompt_time === "string") && - (data.exit_code === undefined || isFiniteInteger(data.exit_code)) && - (data.signal === undefined || typeof data.signal === "string") - ); - case ACPX_EVENT_TYPES.SESSION_CLOSED: - return hasOnlyKeys(data, ["reason"]) && data.reason === "close"; - default: - return false; - } -} diff --git a/src/jsonrpc-error.ts b/src/jsonrpc-error.ts new file mode 100644 index 0000000..1ab1228 --- /dev/null +++ b/src/jsonrpc-error.ts @@ -0,0 +1,91 @@ +import type { + OutputErrorAcpPayload, + OutputErrorCode, + OutputErrorOrigin, +} from "./types.js"; + +export const OUTPUT_ERROR_JSONRPC_CODES: Record = { + NO_SESSION: -32002, + TIMEOUT: -32070, + PERMISSION_DENIED: -32071, + PERMISSION_PROMPT_UNAVAILABLE: -32072, + RUNTIME: -32603, + USAGE: -32602, +}; + +type JsonRpcErrorObject = { + code: number; + message: string; + data?: unknown; +}; + +export type BuildJsonRpcErrorParams = { + id?: string | number | null; + outputCode: OutputErrorCode; + detailCode?: string; + origin?: OutputErrorOrigin; + message: string; + retryable?: boolean; + timestamp?: string; + sessionId?: string; + acp?: OutputErrorAcpPayload; +}; + +function hasValidAcpError( + acp: OutputErrorAcpPayload | undefined, +): acp is { code: number; message: string; data?: unknown } { + return Boolean( + acp && + Number.isFinite(acp.code) && + typeof acp.message === "string" && + acp.message.trim().length > 0, + ); +} + +function buildFallbackData(params: BuildJsonRpcErrorParams): Record { + const data: Record = { + acpxCode: params.outputCode, + detailCode: params.detailCode, + origin: params.origin, + retryable: params.retryable, + timestamp: params.timestamp, + sessionId: params.sessionId, + }; + + for (const [key, value] of Object.entries(data)) { + if (value === undefined) { + delete data[key]; + } + } + + return data; +} + +function buildErrorObject(params: BuildJsonRpcErrorParams): JsonRpcErrorObject { + if (hasValidAcpError(params.acp)) { + return { + code: params.acp.code, + message: params.acp.message, + ...(params.acp.data !== undefined ? { data: params.acp.data } : {}), + }; + } + + const data = buildFallbackData(params); + return { + code: OUTPUT_ERROR_JSONRPC_CODES[params.outputCode] ?? -32603, + message: params.message, + ...(Object.keys(data).length > 0 ? { data } : {}), + }; +} + +export function buildJsonRpcErrorResponse(params: BuildJsonRpcErrorParams): { + jsonrpc: "2.0"; + id: string | number | null; + error: JsonRpcErrorObject; +} { + return { + jsonrpc: "2.0", + id: params.id ?? null, + error: buildErrorObject(params), + }; +} diff --git a/src/output-json-formatter.ts b/src/output-json-formatter.ts new file mode 100644 index 0000000..fd4e284 --- /dev/null +++ b/src/output-json-formatter.ts @@ -0,0 +1,69 @@ +import { buildJsonRpcErrorResponse } from "./jsonrpc-error.js"; +import type { + OutputErrorAcpPayload, + OutputErrorCode, + OutputErrorOrigin, + OutputFormatter, + OutputFormatterContext, +} from "./types.js"; + +type WritableLike = { + write(chunk: string): void; +}; + +const DEFAULT_JSON_SESSION_ID = "unknown"; + +class JsonOutputFormatter implements OutputFormatter { + private readonly stdout: WritableLike; + private sessionId: string; + + constructor(stdout: WritableLike, context?: OutputFormatterContext) { + this.stdout = stdout; + this.sessionId = context?.sessionId?.trim() || DEFAULT_JSON_SESSION_ID; + } + + setContext(context: OutputFormatterContext): void { + this.sessionId = + context.sessionId?.trim() || this.sessionId || DEFAULT_JSON_SESSION_ID; + } + + onAcpMessage(message: unknown): void { + this.stdout.write(`${JSON.stringify(message)}\n`); + } + + onError(params: { + code: OutputErrorCode; + detailCode?: string; + origin?: OutputErrorOrigin; + message: string; + retryable?: boolean; + acp?: OutputErrorAcpPayload; + timestamp?: string; + }): void { + this.stdout.write( + `${JSON.stringify( + buildJsonRpcErrorResponse({ + outputCode: params.code, + detailCode: params.detailCode, + origin: params.origin, + message: params.message, + retryable: params.retryable, + timestamp: params.timestamp, + sessionId: this.sessionId, + acp: params.acp, + }), + )}\n`, + ); + } + + flush(): void { + // no-op for streaming output + } +} + +export function createJsonOutputFormatter( + stdout: WritableLike, + context?: OutputFormatterContext, +): OutputFormatter { + return new JsonOutputFormatter(stdout, context); +} diff --git a/src/output.ts b/src/output.ts index c85f346..8ba4e98 100644 --- a/src/output.ts +++ b/src/output.ts @@ -1,24 +1,17 @@ import type { + AnyMessage, ContentBlock, SessionNotification, - StopReason, ToolCall, ToolCallContent, ToolCallLocation, ToolCallStatus, ToolCallUpdate, } from "@agentclientprotocol/sdk"; -import { - clientOperationToEventDraft, - createAcpxEvent, - errorToEventDraft, - isAcpxEvent, - sessionUpdateToEventDrafts, -} from "./events.js"; -import { ACPX_EVENT_TYPES } from "./types.js"; +import { parseJsonRpcErrorMessage, parsePromptStopReason } from "./acp-jsonrpc.js"; +import { createJsonOutputFormatter } from "./output-json-formatter.js"; import type { - AcpxEvent, - AcpxEventDraft, + AcpJsonRpcMessage, ClientOperation, OutputErrorAcpPayload, OutputErrorCode, @@ -72,8 +65,6 @@ const OUTPUT_PRIORITY_KEYS = [ "value", ] as const; -const DEFAULT_JSON_SESSION_ID = "unknown"; - function asStatus(status: ToolCallStatus | null | undefined): NormalizedToolStatus { return status ?? "unknown"; } @@ -104,6 +95,38 @@ function asRecord(value: unknown): Record | undefined { return value as Record; } +function extractSessionUpdate(message: AnyMessage): SessionNotification | undefined { + if (!Object.hasOwn(message, "method")) { + return undefined; + } + const method = (message as { method?: unknown }).method; + if (method !== "session/update") { + return undefined; + } + const params = asRecord((message as { params?: unknown }).params); + if (!params) { + return undefined; + } + const sessionId = typeof params.sessionId === "string" ? params.sessionId : null; + if (!sessionId) { + return undefined; + } + const update = asRecord(params.update); + if (!update || typeof update.sessionUpdate !== "string") { + return undefined; + } + return { + sessionId, + update: update as SessionNotification["update"], + }; +} + +function extractJsonRpcMethod(message: AnyMessage): string | undefined { + return Object.hasOwn(message, "method") + ? (message as { method?: unknown }).method?.toString() + : undefined; +} + function collapseWhitespace(value: string): string { return value.replace(/\s+/g, " ").trim(); } @@ -491,69 +514,41 @@ class TextOutputFormatter implements OutputFormatter { // no-op for text mode } - onEvent(event: AcpxEvent): void { - if (event.type === ACPX_EVENT_TYPES.OUTPUT_DELTA) { - if (event.data.stream === "output") { - this.flushThoughtBuffer(); - this.writeAssistantChunk(event.data.text); - return; - } - - this.thoughtBuffer += event.data.text; - return; - } - - if (event.type === ACPX_EVENT_TYPES.TOOL_CALL) { - this.flushThoughtBuffer(); - this.renderToolUpdate({ - sessionUpdate: "tool_call_update", - toolCallId: event.data.tool_call_id ?? "tool_call", - title: event.data.title, - status: event.data.status as ToolCallStatus | undefined, - } as ToolCallUpdate); - return; - } - - if (event.type === ACPX_EVENT_TYPES.PLAN) { - this.flushThoughtBuffer(); - this.beginSection("plan"); - this.writeLine(this.bold("[plan]")); - for (const entry of event.data.entries) { - this.writeLine(` - [${entry.status}] ${entry.content}`); - } + onAcpMessage(message: AcpJsonRpcMessage): void { + const notification = extractSessionUpdate(message); + if (notification) { + this.renderSessionUpdate(notification); return; } - if (event.type === ACPX_EVENT_TYPES.CLIENT_OPERATION) { + const method = extractJsonRpcMethod(message); + if (method && method !== "session/prompt" && method !== "session/cancel") { this.onClientOperation({ - method: event.data.method, - status: event.data.status, - summary: event.data.summary, - details: event.data.details, - timestamp: event.ts, + method: method as ClientOperation["method"], + status: "running", + summary: method, + timestamp: new Date().toISOString(), }); return; } - if (event.type === ACPX_EVENT_TYPES.TURN_DONE) { - this.onDone(event.data.stop_reason); + const stopReason = parsePromptStopReason(message); + if (stopReason) { + this.renderDone(stopReason); return; } - if (event.type === ACPX_EVENT_TYPES.ERROR) { + const errorMessage = parseJsonRpcErrorMessage(message); + if (errorMessage) { this.onError({ - code: event.data.code, - detailCode: event.data.detail_code, - origin: event.data.origin, - message: event.data.message, - retryable: event.data.retryable, - acp: event.data.acp_error, - timestamp: event.ts, + code: "RUNTIME", + origin: "acp", + message: errorMessage, }); } } - onSessionUpdate(notification: SessionNotification): void { + private renderSessionUpdate(notification: SessionNotification): void { const update = notification.update; if (update.sessionUpdate !== "agent_thought_chunk") { this.flushThoughtBuffer(); @@ -593,7 +588,7 @@ class TextOutputFormatter implements OutputFormatter { } } - onDone(stopReason: StopReason): void { + private renderDone(stopReason: string): void { this.flushThoughtBuffer(); this.beginSection("done"); this.writeLine(this.dim(`[done] ${stopReason}`)); @@ -844,118 +839,9 @@ class TextOutputFormatter implements OutputFormatter { } } -class JsonOutputFormatter implements OutputFormatter { - private readonly stdout: WritableLike; - private sessionId: string; - private acpSessionId?: string; - private agentSessionId?: string; - private requestId?: string; - private nextSeq = 0; - - constructor(stdout: WritableLike, context?: OutputFormatterContext) { - this.stdout = stdout; - this.sessionId = context?.sessionId?.trim() || DEFAULT_JSON_SESSION_ID; - this.acpSessionId = context?.acpSessionId?.trim() || undefined; - this.agentSessionId = context?.agentSessionId?.trim() || undefined; - this.requestId = context?.requestId?.trim() || undefined; - this.nextSeq = context?.nextSeq ?? 0; - } - - setContext(context: OutputFormatterContext): void { - this.sessionId = - context.sessionId?.trim() || this.sessionId || DEFAULT_JSON_SESSION_ID; - this.acpSessionId = context.acpSessionId?.trim() || this.acpSessionId; - this.agentSessionId = context.agentSessionId?.trim() || this.agentSessionId; - this.requestId = context.requestId?.trim() || this.requestId; - if ( - typeof context.nextSeq === "number" && - Number.isInteger(context.nextSeq) && - context.nextSeq >= 0 - ) { - this.nextSeq = context.nextSeq; - } - } - - onEvent(event: AcpxEvent): void { - if (!isAcpxEvent(event)) { - throw new Error("Attempted to render invalid acpx.event.v1 payload"); - } - this.sessionId = event.session_id || this.sessionId; - this.acpSessionId = event.acp_session_id || this.acpSessionId; - this.agentSessionId = event.agent_session_id || this.agentSessionId; - this.requestId = event.request_id || this.requestId; - this.nextSeq = event.seq + 1; - this.stdout.write(JSON.stringify(event) + "\n"); - } - - onSessionUpdate(notification: SessionNotification): void { - for (const draft of sessionUpdateToEventDrafts(notification)) { - this.emitDraft(draft); - } - } - - onDone(stopReason: StopReason): void { - this.emitDraft({ - type: ACPX_EVENT_TYPES.TURN_DONE, - data: { - stop_reason: stopReason, - }, - }); - } - - onError(params: { - code: OutputErrorCode; - detailCode?: string; - origin?: OutputErrorOrigin; - message: string; - retryable?: boolean; - acp?: OutputErrorAcpPayload; - timestamp?: string; - }): void { - this.emitDraft( - errorToEventDraft({ - code: params.code, - detailCode: params.detailCode, - origin: params.origin, - message: params.message, - retryable: params.retryable, - acp: params.acp, - }), - ); - } - - onClientOperation(operation: ClientOperation): void { - this.emitDraft(clientOperationToEventDraft(operation)); - } - - flush(): void { - // no-op for streaming output - } - - private emitDraft(draft: AcpxEventDraft): void { - const event = createAcpxEvent( - { - sessionId: this.sessionId || DEFAULT_JSON_SESSION_ID, - acpSessionId: this.acpSessionId, - agentSessionId: this.agentSessionId, - requestId: this.requestId, - seq: this.nextSeq, - }, - draft, - ); - if (!isAcpxEvent(event)) { - throw new Error("Attempted to render invalid acpx.event.v1 payload"); - } - this.nextSeq += 1; - this.stdout.write(JSON.stringify(event) + "\n"); - } -} - class QuietOutputFormatter implements OutputFormatter { private readonly stdout: WritableLike; private chunks: string[] = []; - private sawEventOutput = false; - private sawSessionUpdateOutput = false; private flushed = false; constructor(stdout: WritableLike) { @@ -966,48 +852,21 @@ class QuietOutputFormatter implements OutputFormatter { // no-op for quiet mode } - onEvent(event: AcpxEvent): void { - if (event.type === ACPX_EVENT_TYPES.OUTPUT_DELTA) { - if (event.data.stream !== "output") { - return; - } - - if (!this.sawEventOutput && this.sawSessionUpdateOutput) { - // Prefer canonical event output when both paths are observed. - this.chunks = []; - } - - this.sawEventOutput = true; - this.chunks.push(event.data.text); + onAcpMessage(message: AcpJsonRpcMessage): void { + const update = extractSessionUpdate(message); + if ( + update?.update.sessionUpdate === "agent_message_chunk" && + update.update.content.type === "text" + ) { + this.chunks.push(update.update.content.text); return; } - if (event.type === ACPX_EVENT_TYPES.TURN_DONE) { + if (parsePromptStopReason(message)) { this.flushBufferedOutput(); } } - onSessionUpdate(notification: SessionNotification): void { - if (this.sawEventOutput) { - return; - } - - const update = notification.update; - if (update.sessionUpdate !== "agent_message_chunk") { - return; - } - if (update.content.type !== "text") { - return; - } - - this.sawSessionUpdateOutput = true; - this.chunks.push(update.content.text); - } - - onDone(_stopReason: StopReason): void { - this.flushBufferedOutput(); - } - onError(_params: { code: OutputErrorCode; detailCode?: string; @@ -1020,10 +879,6 @@ class QuietOutputFormatter implements OutputFormatter { // no-op in quiet mode } - onClientOperation(_operation: ClientOperation): void { - // no-op in quiet mode - } - flush(): void { // no-op for streaming output } @@ -1049,7 +904,7 @@ export function createOutputFormatter( case "text": return new TextOutputFormatter(stdout); case "json": - return new JsonOutputFormatter(stdout, options.jsonContext); + return createJsonOutputFormatter(stdout, options.jsonContext); case "quiet": return new QuietOutputFormatter(stdout); default: { diff --git a/src/queue-ipc-server.ts b/src/queue-ipc-server.ts index e7ac251..59c537f 100644 --- a/src/queue-ipc-server.ts +++ b/src/queue-ipc-server.ts @@ -1,7 +1,6 @@ import type { SetSessionConfigOptionResponse } from "@agentclientprotocol/sdk"; import net from "node:net"; import { normalizeOutputError } from "./error-normalization.js"; -import type { QueueOwnerLease } from "./queue-lease-store.js"; import { parseQueueRequest, type QueueOwnerErrorMessage, @@ -9,6 +8,10 @@ import { } from "./queue-messages.js"; import type { NonInteractivePermissionPolicy, PermissionMode } from "./types.js"; +type QueueOwnerSocketLease = { + socketPath: string; +}; + function makeQueueOwnerError( requestId: string, message: string, @@ -97,7 +100,7 @@ export class SessionQueueOwner { } static async start( - lease: QueueOwnerLease, + lease: QueueOwnerSocketLease, controlHandlers: QueueOwnerControlHandlers, ): Promise { const ownerRef: { current: SessionQueueOwner | undefined } = { current: undefined }; diff --git a/src/queue-ipc.ts b/src/queue-ipc.ts index edf4c49..4127d1c 100644 --- a/src/queue-ipc.ts +++ b/src/queue-ipc.ts @@ -4,14 +4,11 @@ import fs from "node:fs/promises"; import net from "node:net"; import os from "node:os"; import path from "node:path"; -import { normalizeOutputError } from "./error-normalization.js"; import { QueueConnectionError, QueueProtocolError } from "./errors.js"; import { parseQueueOwnerMessage, - parseQueueRequest, type QueueCancelRequest, type QueueOwnerCancelResultMessage, - type QueueOwnerErrorMessage, type QueueOwnerMessage, type QueueOwnerSetConfigOptionResultMessage, type QueueOwnerSetModeResultMessage, @@ -38,52 +35,6 @@ function queueBaseDir(): string { return path.join(os.homedir(), ".acpx", "queues"); } -function makeQueueOwnerError( - requestId: string, - message: string, - detailCode: string, - options: { - retryable?: boolean; - } = {}, -): QueueOwnerErrorMessage { - return { - type: "error", - requestId, - code: "RUNTIME", - detailCode, - origin: "queue", - retryable: options.retryable, - message, - }; -} - -function makeQueueOwnerErrorFromUnknown( - requestId: string, - error: unknown, - detailCode: string, - options: { - retryable?: boolean; - } = {}, -): QueueOwnerErrorMessage { - const normalized = normalizeOutputError(error, { - defaultCode: "RUNTIME", - origin: "queue", - detailCode, - retryable: options.retryable, - }); - - return { - type: "error", - requestId, - code: normalized.code, - detailCode: normalized.detailCode, - origin: normalized.origin, - message: normalized.message, - retryable: normalized.retryable, - acp: normalized.acp, - }; -} - const STALE_OWNER_PROTOCOL_DETAIL_CODES = new Set([ "QUEUE_PROTOCOL_MALFORMED_MESSAGE", "QUEUE_PROTOCOL_UNEXPECTED_RESPONSE", @@ -191,18 +142,8 @@ export type QueueOwnerLease = { }; export type { QueueOwnerMessage, QueueSubmitRequest } from "./queue-messages.js"; - -export type QueueTask = { - requestId: string; - message: string; - permissionMode: PermissionMode; - nonInteractivePermissions?: NonInteractivePermissionPolicy; - timeoutMs?: number; - suppressSdkConsoleErrors?: boolean; - waitForCompletion: boolean; - send: (message: QueueOwnerMessage) => void; - close: () => void; -}; +export type { QueueOwnerControlHandlers, QueueTask } from "./queue-ipc-server.js"; +export { SessionQueueOwner } from "./queue-ipc-server.js"; function parseQueueOwnerRecord(raw: unknown): QueueOwnerRecord | null { if (!raw || typeof raw !== "object" || Array.isArray(raw)) { @@ -449,354 +390,6 @@ export async function probeQueueOwnerHealth( }; } -function writeQueueMessage(socket: net.Socket, message: QueueOwnerMessage): void { - if (socket.destroyed || !socket.writable) { - return; - } - socket.write(`${JSON.stringify(message)}\n`); -} - -export type QueueOwnerControlHandlers = { - cancelPrompt: () => Promise; - setSessionMode: (modeId: string, timeoutMs?: number) => Promise; - setSessionConfigOption: ( - configId: string, - value: string, - timeoutMs?: number, - ) => Promise; -}; - -export class SessionQueueOwner { - private readonly server: net.Server; - private readonly controlHandlers: QueueOwnerControlHandlers; - private readonly pending: QueueTask[] = []; - private readonly waiters: Array<(task: QueueTask | undefined) => void> = []; - private closed = false; - - private constructor(server: net.Server, controlHandlers: QueueOwnerControlHandlers) { - this.server = server; - this.controlHandlers = controlHandlers; - } - - static async start( - lease: QueueOwnerLease, - controlHandlers: QueueOwnerControlHandlers, - ): Promise { - const ownerRef: { current: SessionQueueOwner | undefined } = { current: undefined }; - const server = net.createServer((socket) => { - ownerRef.current?.handleConnection(socket); - }); - ownerRef.current = new SessionQueueOwner(server, controlHandlers); - - await new Promise((resolve, reject) => { - const onListening = () => { - server.off("error", onError); - resolve(); - }; - const onError = (error: Error) => { - server.off("listening", onListening); - reject(error); - }; - - server.once("listening", onListening); - server.once("error", onError); - server.listen(lease.socketPath); - }); - - return ownerRef.current!; - } - - async close(): Promise { - if (this.closed) { - return; - } - - this.closed = true; - for (const waiter of this.waiters.splice(0)) { - waiter(undefined); - } - - for (const task of this.pending.splice(0)) { - if (task.waitForCompletion) { - task.send( - makeQueueOwnerError( - task.requestId, - "Queue owner shutting down before prompt execution", - "QUEUE_OWNER_SHUTTING_DOWN", - { - retryable: true, - }, - ), - ); - } - task.close(); - } - - await new Promise((resolve) => { - this.server.close(() => resolve()); - }); - } - - async nextTask(timeoutMs?: number): Promise { - if (this.pending.length > 0) { - return this.pending.shift(); - } - if (this.closed) { - return undefined; - } - - return await new Promise((resolve) => { - const shouldTimeout = timeoutMs != null; - const timer = - shouldTimeout && - setTimeout( - () => { - const index = this.waiters.indexOf(waiter); - if (index >= 0) { - this.waiters.splice(index, 1); - } - resolve(undefined); - }, - Math.max(0, timeoutMs), - ); - - const waiter = (task: QueueTask | undefined) => { - if (timer) { - clearTimeout(timer); - } - resolve(task); - }; - - this.waiters.push(waiter); - }); - } - - private enqueue(task: QueueTask): void { - if (this.closed) { - if (task.waitForCompletion) { - task.send( - makeQueueOwnerError( - task.requestId, - "Queue owner is shutting down", - "QUEUE_OWNER_SHUTTING_DOWN", - { - retryable: true, - }, - ), - ); - } - task.close(); - return; - } - - const waiter = this.waiters.shift(); - if (waiter) { - waiter(task); - return; - } - - this.pending.push(task); - } - - private handleConnection(socket: net.Socket): void { - socket.setEncoding("utf8"); - - if (this.closed) { - writeQueueMessage( - socket, - makeQueueOwnerError("unknown", "Queue owner is closed", "QUEUE_OWNER_CLOSED", { - retryable: true, - }), - ); - socket.end(); - return; - } - - let buffer = ""; - let handled = false; - - const fail = (requestId: string, message: string, detailCode: string): void => { - writeQueueMessage( - socket, - makeQueueOwnerError(requestId, message, detailCode, { - retryable: false, - }), - ); - socket.end(); - }; - - const processLine = (line: string): void => { - if (handled) { - return; - } - handled = true; - - let parsed: unknown; - try { - parsed = JSON.parse(line); - } catch { - fail( - "unknown", - "Invalid queue request payload", - "QUEUE_REQUEST_PAYLOAD_INVALID_JSON", - ); - return; - } - - const request = parseQueueRequest(parsed); - if (!request) { - fail("unknown", "Invalid queue request", "QUEUE_REQUEST_INVALID"); - return; - } - - if (request.type === "cancel_prompt") { - writeQueueMessage(socket, { - type: "accepted", - requestId: request.requestId, - }); - void this.controlHandlers - .cancelPrompt() - .then((cancelled) => { - writeQueueMessage(socket, { - type: "cancel_result", - requestId: request.requestId, - cancelled, - }); - }) - .catch((error) => { - writeQueueMessage( - socket, - makeQueueOwnerErrorFromUnknown( - request.requestId, - error, - "QUEUE_CONTROL_REQUEST_FAILED", - ), - ); - }) - .finally(() => { - if (!socket.destroyed) { - socket.end(); - } - }); - return; - } - - if (request.type === "set_mode") { - writeQueueMessage(socket, { - type: "accepted", - requestId: request.requestId, - }); - void this.controlHandlers - .setSessionMode(request.modeId, request.timeoutMs) - .then(() => { - writeQueueMessage(socket, { - type: "set_mode_result", - requestId: request.requestId, - modeId: request.modeId, - }); - }) - .catch((error) => { - writeQueueMessage( - socket, - makeQueueOwnerErrorFromUnknown( - request.requestId, - error, - "QUEUE_CONTROL_REQUEST_FAILED", - ), - ); - }) - .finally(() => { - if (!socket.destroyed) { - socket.end(); - } - }); - return; - } - - if (request.type === "set_config_option") { - writeQueueMessage(socket, { - type: "accepted", - requestId: request.requestId, - }); - void this.controlHandlers - .setSessionConfigOption(request.configId, request.value, request.timeoutMs) - .then((response) => { - writeQueueMessage(socket, { - type: "set_config_option_result", - requestId: request.requestId, - response, - }); - }) - .catch((error) => { - writeQueueMessage( - socket, - makeQueueOwnerErrorFromUnknown( - request.requestId, - error, - "QUEUE_CONTROL_REQUEST_FAILED", - ), - ); - }) - .finally(() => { - if (!socket.destroyed) { - socket.end(); - } - }); - return; - } - - const task: QueueTask = { - requestId: request.requestId, - message: request.message, - permissionMode: request.permissionMode, - nonInteractivePermissions: request.nonInteractivePermissions, - timeoutMs: request.timeoutMs, - suppressSdkConsoleErrors: request.suppressSdkConsoleErrors, - waitForCompletion: request.waitForCompletion, - send: (message) => { - writeQueueMessage(socket, message); - }, - close: () => { - if (!socket.destroyed) { - socket.end(); - } - }, - }; - - writeQueueMessage(socket, { - type: "accepted", - requestId: request.requestId, - }); - - if (!request.waitForCompletion) { - task.close(); - } - - this.enqueue(task); - }; - - socket.on("data", (chunk: string) => { - buffer += chunk; - - let index = buffer.indexOf("\n"); - while (index >= 0) { - const line = buffer.slice(0, index).trim(); - buffer = buffer.slice(index + 1); - - if (line.length > 0) { - processLine(line); - } - - index = buffer.indexOf("\n"); - } - }); - - socket.on("error", () => { - // no-op: queue processing continues even if client disconnects - }); - } -} - export type SubmitToQueueOwnerOptions = { sessionId: string; message: string; @@ -834,7 +427,6 @@ async function submitToQueueOwner( options.outputFormatter.setContext({ sessionId: options.sessionId, - requestId, }); return await new Promise((resolve, reject) => { @@ -897,7 +489,6 @@ async function submitToQueueOwner( acknowledged = true; options.outputFormatter.setContext({ sessionId: options.sessionId, - requestId: message.requestId, }); if (!options.waitForCompletion) { const queued: SessionEnqueueResult = { @@ -913,7 +504,6 @@ async function submitToQueueOwner( if (message.type === "error") { options.outputFormatter.setContext({ sessionId: options.sessionId, - requestId: message.requestId, }); const queueErrorAlreadyEmitted = @@ -957,7 +547,7 @@ async function submitToQueueOwner( } if (message.type === "event") { - options.outputFormatter.onEvent(message.event); + options.outputFormatter.onAcpMessage(message.message); return; } @@ -992,7 +582,7 @@ async function submitToQueueOwner( } }); - socket.once("error", (error) => { + socket.once("error", (error: Error) => { finishReject(error); }); @@ -1165,7 +755,7 @@ async function submitControlToQueueOwner( } }); - socket.once("error", (error) => { + socket.once("error", (error: Error) => { finishReject(error); }); diff --git a/src/queue-messages.ts b/src/queue-messages.ts index cc5593c..43ee616 100644 --- a/src/queue-messages.ts +++ b/src/queue-messages.ts @@ -1,5 +1,5 @@ import type { SetSessionConfigOptionResponse } from "@agentclientprotocol/sdk"; -import { isAcpxEvent } from "./events.js"; +import { isAcpJsonRpcMessage } from "./acp-jsonrpc.js"; import { OUTPUT_ERROR_CODES, OUTPUT_ERROR_ORIGINS, @@ -8,7 +8,7 @@ import { type OutputErrorOrigin, } from "./types.js"; import type { - AcpxEvent, + AcpJsonRpcMessage, NonInteractivePermissionPolicy, PermissionMode, SessionSendResult, @@ -59,7 +59,7 @@ export type QueueOwnerAcceptedMessage = { export type QueueOwnerEventMessage = { type: "event"; requestId: string; - event: AcpxEvent; + message: AcpJsonRpcMessage; }; export type QueueOwnerResultMessage = { @@ -315,14 +315,14 @@ export function parseQueueOwnerMessage(raw: unknown): QueueOwnerMessage | null { } if (message.type === "event") { - if (!isAcpxEvent(message.event)) { + if (!isAcpJsonRpcMessage(message.message)) { return null; } return { type: "event", requestId: message.requestId, - event: message.event, + message: message.message, }; } diff --git a/src/session-event-log.ts b/src/session-event-log.ts index 367db67..8fb1b09 100644 --- a/src/session-event-log.ts +++ b/src/session-event-log.ts @@ -14,18 +14,18 @@ export function safeSessionId(sessionId: string): string { } export function sessionEventActivePath(sessionId: string): string { - return path.join(sessionBaseDir(), `${safeSessionId(sessionId)}.events.ndjson`); + return path.join(sessionBaseDir(), `${safeSessionId(sessionId)}.stream.ndjson`); } export function sessionEventSegmentPath(sessionId: string, segment: number): string { return path.join( sessionBaseDir(), - `${safeSessionId(sessionId)}.events.${segment}.ndjson`, + `${safeSessionId(sessionId)}.stream.${segment}.ndjson`, ); } export function sessionEventLockPath(sessionId: string): string { - return path.join(sessionBaseDir(), `${safeSessionId(sessionId)}.events.lock`); + return path.join(sessionBaseDir(), `${safeSessionId(sessionId)}.stream.lock`); } export function defaultSessionEventLog(sessionId: string): SessionEventLog { diff --git a/src/session-events.ts b/src/session-events.ts index c4adb81..ad75463 100644 --- a/src/session-events.ts +++ b/src/session-events.ts @@ -1,6 +1,5 @@ import fs from "node:fs/promises"; -import { createAcpxEvent, isAcpxEvent } from "./events.js"; -import { assertPersistedKeyPolicy } from "./persisted-key-policy.js"; +import { isAcpJsonRpcMessage } from "./acp-jsonrpc.js"; import { DEFAULT_EVENT_MAX_SEGMENTS, DEFAULT_EVENT_SEGMENT_MAX_BYTES, @@ -10,7 +9,7 @@ import { sessionEventSegmentPath as segmentEventPath, } from "./session-event-log.js"; import { resolveSessionRecord, writeSessionRecord } from "./session-persistence.js"; -import type { AcpxEvent, AcpxEventDraft, SessionRecord } from "./types.js"; +import type { AcpJsonRpcMessage, SessionRecord } from "./types.js"; const LOCK_RETRY_MS = 15; @@ -36,7 +35,7 @@ async function statSize(filePath: string): Promise { } } -async function countExistingEventSegments( +async function countExistingSegments( sessionId: string, maxSegments: number, ): Promise { @@ -63,7 +62,7 @@ async function resolveSessionMaxSegments(sessionId: string): Promise { return configured; } } catch { - // Fall back to default when session metadata is unavailable. + // Fall back to defaults when metadata is unavailable. } return DEFAULT_EVENT_MAX_SEGMENTS; @@ -150,7 +149,6 @@ export class SessionEventWriter { private readonly lock: LockHandle; private readonly maxSegmentBytes: number; private readonly maxSegments: number; - private nextSeq: number; private closed = false; private constructor( @@ -162,7 +160,6 @@ export class SessionEventWriter { this.lock = lock; this.maxSegmentBytes = options.maxSegmentBytes; this.maxSegments = options.maxSegments; - this.nextSeq = record.lastSeq + 1; } static async open( @@ -186,51 +183,34 @@ export class SessionEventWriter { return this.record; } - createEvent(draft: AcpxEventDraft): AcpxEvent { - const event = createAcpxEvent( - { - sessionId: this.record.acpxRecordId, - acpSessionId: this.record.acpSessionId, - agentSessionId: this.record.agentSessionId, - requestId: draft.request_id, - seq: this.nextSeq, - }, - draft, - ); - this.nextSeq += 1; - return event; - } - - async appendEvent(event: AcpxEvent, options: AppendOptions = {}): Promise { - await this.appendEvents([event], options); + async appendMessage( + message: AcpJsonRpcMessage, + options: AppendOptions = {}, + ): Promise { + await this.appendMessages([message], options); } - async appendEvents(events: AcpxEvent[], options: AppendOptions = {}): Promise { + async appendMessages( + messages: AcpJsonRpcMessage[], + options: AppendOptions = {}, + ): Promise { if (this.closed) { throw new Error("SessionEventWriter is closed"); } - if (events.length === 0) { + if (messages.length === 0) { return; } await ensureSessionDir(); let activePath = activeEventPath(this.record.acpxRecordId); - for (const event of events) { - if (!isAcpxEvent(event)) { - throw new Error("Attempted to persist invalid acpx.event.v1 payload"); + for (const message of messages) { + if (!isAcpJsonRpcMessage(message)) { + throw new Error("Attempted to persist invalid ACP JSON-RPC payload"); } - if (event.seq !== this.record.lastSeq + 1) { - throw new Error( - `acpx event sequence mismatch: expected ${this.record.lastSeq + 1}, got ${event.seq}`, - ); - } - - assertPersistedKeyPolicy(event); - - const line = `${JSON.stringify(event)}\n`; + const line = `${JSON.stringify(message)}\n`; const lineBytes = Buffer.byteLength(line); const currentSize = await statSize(activePath); if (currentSize > 0 && currentSize + lineBytes > this.maxSegmentBytes) { @@ -240,21 +220,24 @@ export class SessionEventWriter { await fs.appendFile(activePath, line, "utf8"); - this.record.lastSeq = event.seq; - if (event.seq >= this.nextSeq) { - this.nextSeq = event.seq + 1; + this.record.lastSeq += 1; + if (Object.hasOwn(message, "id")) { + const id = (message as { id?: unknown }).id; + if (typeof id === "string" || typeof id === "number") { + this.record.lastRequestId = String(id); + } } - this.record.lastRequestId = event.request_id ?? this.record.lastRequestId; - this.record.lastUsedAt = event.ts; + const writeTs = new Date().toISOString(); + this.record.lastUsedAt = writeTs; this.record.eventLog = { active_path: activePath, - segment_count: await countExistingEventSegments( + segment_count: await countExistingSegments( this.record.acpxRecordId, this.maxSegments, ), max_segment_bytes: this.maxSegmentBytes, max_segments: this.maxSegments, - last_write_at: event.ts, + last_write_at: writeTs, last_write_error: null, }; } @@ -264,24 +247,6 @@ export class SessionEventWriter { } } - async appendDraft( - draft: AcpxEventDraft, - options: AppendOptions = {}, - ): Promise { - const event = this.createEvent(draft); - await this.appendEvent(event, options); - return event; - } - - async appendDrafts( - drafts: AcpxEventDraft[], - options: AppendOptions = {}, - ): Promise { - const events = drafts.map((draft) => this.createEvent(draft)); - await this.appendEvents(events, options); - return events; - } - async checkpoint(): Promise { if (this.closed) { throw new Error("SessionEventWriter is closed"); @@ -305,7 +270,9 @@ export class SessionEventWriter { } } -export async function listSessionEvents(sessionId: string): Promise { +export async function listSessionEvents( + sessionId: string, +): Promise { const maxSegments = await resolveSessionMaxSegments(sessionId); const files: string[] = []; @@ -321,14 +288,18 @@ export async function listSessionEvents(sessionId: string): Promise files.push(active); } - const events: AcpxEvent[] = []; + const events: AcpJsonRpcMessage[] = []; for (const filePath of files) { const payload = await fs.readFile(filePath, "utf8"); const lines = payload.split("\n").filter((line) => line.trim().length > 0); for (const line of lines) { - const parsed = JSON.parse(line); - if (isAcpxEvent(parsed)) { - events.push(parsed); + try { + const parsed = JSON.parse(line); + if (isAcpJsonRpcMessage(parsed)) { + events.push(parsed); + } + } catch { + // Skip malformed lines to keep event listing resilient. } } } diff --git a/src/session-runtime.ts b/src/session-runtime.ts index 54e0d6a..bb98878 100644 --- a/src/session-runtime.ts +++ b/src/session-runtime.ts @@ -1,17 +1,6 @@ -import type { SessionNotification, StopReason } from "@agentclientprotocol/sdk"; -import { spawn } from "node:child_process"; import fs from "node:fs/promises"; import path from "node:path"; -import { fileURLToPath } from "node:url"; import { AcpClient } from "./client.js"; -import { - clientOperationToEventDraft, - createAcpxEvent, - errorToEventDraft, - isAcpxEvent, - sessionUpdateToEventDrafts, - truncateInputPreview, -} from "./events.js"; import { formatErrorMessage, normalizeOutputError } from "./error-normalization.js"; import { cloneSessionAcpxState, @@ -59,6 +48,12 @@ import { runSessionSetConfigOptionDirect, runSessionSetModeDirect, } from "./session-runtime/prompt-runner.js"; +import { + queueOwnerRuntimeOptionsFromSend, + spawnQueueOwnerProcess, + type QueueOwnerRuntimeOptions, +} from "./session-runtime/queue-owner-process.js"; +export type { QueueOwnerRuntimeOptions } from "./session-runtime/queue-owner-process.js"; import { DEFAULT_HISTORY_LIMIT, absolutePath, @@ -72,13 +67,10 @@ import { resolveSessionRecord, writeSessionRecord, } from "./session-persistence.js"; -import { ACPX_EVENT_TYPES } from "./types.js"; import { SESSION_RECORD_SCHEMA, + type AcpJsonRpcMessage, type AuthPolicy, - type AcpxEvent, - type AcpxEventDraft, - type ClientOperation, type NonInteractivePermissionPolicy, type OutputErrorEmissionPolicy, type OutputErrorAcpPayload, @@ -98,9 +90,6 @@ import { export const DEFAULT_QUEUE_OWNER_TTL_MS = 300_000; const INTERRUPT_CANCEL_WAIT_MS = 2_500; const QUEUE_OWNER_STARTUP_MAX_ATTEMPTS = 120; -const QUEUE_OWNER_MAIN_PATH = fileURLToPath( - new URL("./queue-owner-main.js", import.meta.url), -); type TimedRunOptions = { timeoutMs?: number; @@ -225,30 +214,18 @@ class QueueTaskOutputFormatter implements OutputFormatter { this.send = task.send; } - setContext(): void { + setContext(_context: { sessionId: string }): void { // queue formatter context is fixed by task request id } - onEvent(event: AcpxEvent): void { + onAcpMessage(message: AcpJsonRpcMessage): void { this.send({ type: "event", requestId: this.requestId, - event, + message, }); } - onSessionUpdate(_notification: SessionNotification): void { - // Queue protocol forwards canonical events only. - } - - onClientOperation(_operation: ClientOperation): void { - // Queue protocol forwards canonical events only. - } - - onDone(_stopReason: StopReason): void { - // turn_done is emitted as a canonical event. - } - onError(params: { code: OutputErrorCode; detailCode?: string; @@ -276,19 +253,10 @@ class QueueTaskOutputFormatter implements OutputFormatter { } const DISCARD_OUTPUT_FORMATTER: OutputFormatter = { - setContext() { - // no-op - }, - onEvent() { - // no-op - }, - onSessionUpdate() { - // no-op - }, - onClientOperation() { + setContext(_context) { // no-op }, - onDone() { + onAcpMessage() { // no-op }, onError() { @@ -395,13 +363,11 @@ async function runSessionPrompt( output.setContext({ sessionId: record.acpxRecordId, - acpSessionId: record.acpSessionId, - agentSessionId: record.agentSessionId, - nextSeq: record.lastSeq + 1, }); const eventWriter = await SessionEventWriter.open(record); - const pendingEvents: AcpxEvent[] = []; + const pendingMessages: AcpJsonRpcMessage[] = []; + let sawAcpMessage = false; let eventWriterClosed = false; const closeEventWriter = async (checkpoint: boolean): Promise => { @@ -412,23 +378,13 @@ async function runSessionPrompt( await eventWriter.close({ checkpoint }); }; - const flushPendingEvents = async (checkpoint = false): Promise => { - if (pendingEvents.length === 0) { + const flushPendingMessages = async (checkpoint = false): Promise => { + if (pendingMessages.length === 0) { return; } - const batch = pendingEvents.splice(0, pendingEvents.length); - await eventWriter.appendEvents(batch, { checkpoint }); - }; - - const emitEvent = (draft: AcpxEventDraft): AcpxEvent => { - const event = eventWriter.createEvent(draft); - if (!isAcpxEvent(event)) { - throw new Error("Attempted to emit invalid acpx.event.v1 payload"); - } - pendingEvents.push(event); - output.onEvent(event); - return event; + const batch = pendingMessages.splice(0, pendingMessages.length); + await eventWriter.appendMessages(batch, { checkpoint }); }; const client = new AcpClient({ @@ -440,20 +396,20 @@ async function runSessionPrompt( authPolicy: options.authPolicy, suppressSdkConsoleErrors: options.suppressSdkConsoleErrors, verbose: options.verbose, + onAcpMessage: (_direction, message) => { + sawAcpMessage = true; + pendingMessages.push(message); + output.onAcpMessage(message); + }, onSessionUpdate: (notification) => { acpxState = recordConversationSessionUpdate( conversation, acpxState, notification, ); - const drafts = sessionUpdateToEventDrafts(notification); - for (const draft of drafts) { - emitEvent(draft); - } }, onClientOperation: (operation) => { acpxState = recordConversationClientOperation(conversation, acpxState, operation); - emitEvent(clientOperationToEventDraft(operation)); }, }); let activeSessionIdForControl = record.acpSessionId; @@ -500,20 +456,8 @@ async function runSessionPrompt( output.setContext({ sessionId: record.acpxRecordId, - acpSessionId: record.acpSessionId, - agentSessionId: record.agentSessionId, - nextSeq: record.lastSeq + 1, }); - - emitEvent({ - type: ACPX_EVENT_TYPES.TURN_STARTED, - data: { - mode: "prompt", - resumed, - input_preview: truncateInputPreview(options.message), - }, - }); - await flushPendingEvents(false); + await flushPendingMessages(false); let response; try { @@ -551,18 +495,7 @@ async function runSessionPrompt( origin: "runtime", }); - emitEvent( - errorToEventDraft({ - code: normalizedError.code, - detailCode: normalizedError.detailCode, - origin: normalizedError.origin, - message: normalizedError.message, - retryable: normalizedError.retryable, - acp: normalizedError.acp, - }), - ); - - await flushPendingEvents(true).catch(() => { + await flushPendingMessages(true).catch(() => { // best effort while bubbling prompt failure }); @@ -578,19 +511,13 @@ async function runSessionPrompt( const propagated = error instanceof Error ? error : new Error(formatErrorMessage(error)); (propagated as { outputAlreadyEmitted?: boolean }).outputAlreadyEmitted = - true; + sawAcpMessage; + (propagated as { normalizedOutputError?: unknown }).normalizedOutputError = + normalizedError; throw propagated; } - emitEvent({ - type: ACPX_EVENT_TYPES.TURN_DONE, - data: { - stop_reason: response.stopReason, - permission_stats: client.getPermissionStats(), - }, - }); - - await flushPendingEvents(true); + await flushPendingMessages(true); output.flush(); const now = isoNow(); @@ -617,7 +544,7 @@ async function runSessionPrompt( record.lastUsedAt = isoNow(); applyConversation(record, conversation); record.acpx = acpxState; - await flushPendingEvents(true).catch(() => { + await flushPendingMessages(true).catch(() => { // best effort while process is being interrupted }); await writeSessionRecord(record).catch(() => { @@ -637,7 +564,7 @@ async function runSessionPrompt( applyLifecycleSnapshotToRecord(record, client.getAgentLifecycleSnapshot()); applyConversation(record, conversation); record.acpx = acpxState; - await flushPendingEvents(false).catch(() => { + await flushPendingMessages(false).catch(() => { // best effort on close }); await writeSessionRecord(record).catch(() => { @@ -660,8 +587,7 @@ export async function runOnce(options: RunOnceOptions): Promise authPolicy: options.authPolicy, suppressSdkConsoleErrors: options.suppressSdkConsoleErrors, verbose: options.verbose, - onSessionUpdate: (notification) => output.onSessionUpdate(notification), - onClientOperation: (operation) => output.onClientOperation(operation), + onAcpMessage: (_direction, message) => output.onAcpMessage(message), }); try { @@ -673,39 +599,15 @@ export async function runOnce(options: RunOnceOptions): Promise options.timeoutMs, ); const sessionId = createdSession.sessionId; - const agentSessionId = normalizeRuntimeSessionId(createdSession.agentSessionId); output.setContext({ sessionId, - acpSessionId: sessionId, - agentSessionId, - nextSeq: 0, }); - output.onEvent( - createAcpxEvent( - { - sessionId, - acpSessionId: sessionId, - agentSessionId, - seq: 0, - }, - { - type: ACPX_EVENT_TYPES.TURN_STARTED, - data: { - mode: "prompt", - resumed: false, - input_preview: truncateInputPreview(options.message), - }, - }, - ), - ); - const response = await withTimeout( client.prompt(sessionId, options.message), options.timeoutMs, ); - output.onDone(response.stopReason); output.flush(); return toPromptResult(response.stopReason, sessionId, client); }, @@ -816,45 +718,6 @@ export async function ensureSession( }; } -export type QueueOwnerRuntimeOptions = { - sessionId: string; - permissionMode: PermissionMode; - nonInteractivePermissions?: NonInteractivePermissionPolicy; - authCredentials?: Record; - authPolicy?: AuthPolicy; - suppressSdkConsoleErrors?: boolean; - verbose?: boolean; - ttlMs?: number; -}; - -function queueOwnerRuntimeOptionsFromSend( - options: SessionSendOptions, -): QueueOwnerRuntimeOptions { - return { - sessionId: options.sessionId, - permissionMode: options.permissionMode, - nonInteractivePermissions: options.nonInteractivePermissions, - authCredentials: options.authCredentials, - authPolicy: options.authPolicy, - suppressSdkConsoleErrors: options.suppressSdkConsoleErrors, - verbose: options.verbose, - ttlMs: options.ttlMs, - }; -} - -function spawnQueueOwnerProcess(options: QueueOwnerRuntimeOptions): void { - const payload = JSON.stringify(options); - const child = spawn(process.execPath, [QUEUE_OWNER_MAIN_PATH], { - detached: true, - stdio: "ignore", - env: { - ...process.env, - ACPX_QUEUE_OWNER_PAYLOAD: payload, - }, - }); - child.unref(); -} - async function submitToRunningOwner( options: SessionSendOptions, waitForCompletion: boolean, diff --git a/src/session-runtime/queue-owner-process.ts b/src/session-runtime/queue-owner-process.ts new file mode 100644 index 0000000..643fe8a --- /dev/null +++ b/src/session-runtime/queue-owner-process.ts @@ -0,0 +1,61 @@ +import { spawn } from "node:child_process"; +import { fileURLToPath } from "node:url"; +import type { + AuthPolicy, + NonInteractivePermissionPolicy, + PermissionMode, +} from "../types.js"; + +export type QueueOwnerRuntimeOptions = { + sessionId: string; + permissionMode: PermissionMode; + nonInteractivePermissions?: NonInteractivePermissionPolicy; + authCredentials?: Record; + authPolicy?: AuthPolicy; + suppressSdkConsoleErrors?: boolean; + verbose?: boolean; + ttlMs?: number; +}; + +type SessionSendLike = { + sessionId: string; + permissionMode: PermissionMode; + nonInteractivePermissions?: NonInteractivePermissionPolicy; + authCredentials?: Record; + authPolicy?: AuthPolicy; + suppressSdkConsoleErrors?: boolean; + verbose?: boolean; + ttlMs?: number; +}; + +const QUEUE_OWNER_MAIN_PATH = fileURLToPath( + new URL("../queue-owner-main.js", import.meta.url), +); + +export function queueOwnerRuntimeOptionsFromSend( + options: SessionSendLike, +): QueueOwnerRuntimeOptions { + return { + sessionId: options.sessionId, + permissionMode: options.permissionMode, + nonInteractivePermissions: options.nonInteractivePermissions, + authCredentials: options.authCredentials, + authPolicy: options.authPolicy, + suppressSdkConsoleErrors: options.suppressSdkConsoleErrors, + verbose: options.verbose, + ttlMs: options.ttlMs, + }; +} + +export function spawnQueueOwnerProcess(options: QueueOwnerRuntimeOptions): void { + const payload = JSON.stringify(options); + const child = spawn(process.execPath, [QUEUE_OWNER_MAIN_PATH], { + detached: true, + stdio: "ignore", + env: { + ...process.env, + ACPX_QUEUE_OWNER_PAYLOAD: payload, + }, + }); + child.unref(); +} diff --git a/src/types.ts b/src/types.ts index 44bb7ae..b0a6e5b 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,7 +1,8 @@ import type { AgentCapabilities, - SessionConfigOption, + AnyMessage, SessionNotification, + SessionConfigOption, SetSessionConfigOptionResponse, StopReason, } from "@agentclientprotocol/sdk"; @@ -33,46 +34,8 @@ export type NonInteractivePermissionPolicy = export const OUTPUT_STREAMS = ["prompt", "control"] as const; export type OutputStream = (typeof OUTPUT_STREAMS)[number]; - -export const ACPX_EVENT_SCHEMA = "acpx.event.v1" as const; -export const ACPX_EVENT_OUTPUT_STREAMS = ["output", "thought"] as const; -export type AcpxEventOutputStream = (typeof ACPX_EVENT_OUTPUT_STREAMS)[number]; -export const ACPX_EVENT_TYPES = { - TURN_STARTED: "turn_started", - OUTPUT_DELTA: "output_delta", - TOOL_CALL: "tool_call", - PLAN: "plan", - UPDATE: "update", - CLIENT_OPERATION: "client_operation", - TURN_DONE: "turn_done", - ERROR: "error", - PROMPT_QUEUED: "prompt_queued", - SESSION_ENSURED: "session_ensured", - CANCEL_REQUESTED: "cancel_requested", - CANCEL_RESULT: "cancel_result", - MODE_SET: "mode_set", - CONFIG_SET: "config_set", - STATUS_SNAPSHOT: "status_snapshot", - SESSION_CLOSED: "session_closed", -} as const; -export type AcpxEventType = (typeof ACPX_EVENT_TYPES)[keyof typeof ACPX_EVENT_TYPES]; -export const ACPX_EVENT_TURN_MODES = ["prompt"] as const; -export type AcpxEventTurnMode = (typeof ACPX_EVENT_TURN_MODES)[number]; -export const ACPX_EVENT_STATUS_SNAPSHOT_STATUSES = [ - "alive", - "dead", - "no-session", -] as const; -export type AcpxEventStatusSnapshotStatus = - (typeof ACPX_EVENT_STATUS_SNAPSHOT_STATUSES)[number]; - -export const ACPX_EVENT_TOOL_CALL_STATUSES = [ - "pending", - "in_progress", - "completed", - "failed", -] as const; -export type AcpxEventToolCallStatus = (typeof ACPX_EVENT_TOOL_CALL_STATUSES)[number]; +export type AcpJsonRpcMessage = AnyMessage; +export type AcpMessageDirection = "outbound" | "inbound"; export const OUTPUT_ERROR_CODES = [ "NO_SESSION", @@ -136,148 +99,6 @@ export type ClientOperation = { timestamp: string; }; -type AcpxEventEnvelope = { - schema: typeof ACPX_EVENT_SCHEMA; - event_id: string; - session_id: string; - acp_session_id?: string; - agent_session_id?: string; - request_id?: string; - seq: number; - ts: string; -}; - -export type AcpxEvent = - | (AcpxEventEnvelope & { - type: typeof ACPX_EVENT_TYPES.TURN_STARTED; - data: { - mode: AcpxEventTurnMode; - resumed: boolean; - input_preview?: string; - }; - }) - | (AcpxEventEnvelope & { - type: typeof ACPX_EVENT_TYPES.OUTPUT_DELTA; - data: { - stream: AcpxEventOutputStream; - text: string; - }; - }) - | (AcpxEventEnvelope & { - type: typeof ACPX_EVENT_TYPES.TOOL_CALL; - data: { - tool_call_id: string; - title?: string; - status?: AcpxEventToolCallStatus; - }; - }) - | (AcpxEventEnvelope & { - type: typeof ACPX_EVENT_TYPES.PLAN; - data: { - entries: Array<{ - content: string; - status: string; - priority: string; - }>; - }; - }) - | (AcpxEventEnvelope & { - type: typeof ACPX_EVENT_TYPES.UPDATE; - data: { - update: string; - }; - }) - | (AcpxEventEnvelope & { - type: typeof ACPX_EVENT_TYPES.CLIENT_OPERATION; - data: { - method: ClientOperationMethod; - status: ClientOperationStatus; - summary: string; - details?: string; - }; - }) - | (AcpxEventEnvelope & { - type: typeof ACPX_EVENT_TYPES.TURN_DONE; - data: { - stop_reason: StopReason; - permission_stats?: PermissionStats; - }; - }) - | (AcpxEventEnvelope & { - type: typeof ACPX_EVENT_TYPES.ERROR; - data: { - code: OutputErrorCode; - detail_code?: string; - origin?: OutputErrorOrigin; - message: string; - retryable?: boolean; - acp_error?: OutputErrorAcpPayload; - }; - }) - | (AcpxEventEnvelope & { - type: typeof ACPX_EVENT_TYPES.PROMPT_QUEUED; - data: { - request_id: string; - }; - }) - | (AcpxEventEnvelope & { - type: typeof ACPX_EVENT_TYPES.SESSION_ENSURED; - data: { - created: boolean; - name?: string; - replaced_session_id?: string; - }; - }) - | (AcpxEventEnvelope & { - type: typeof ACPX_EVENT_TYPES.CANCEL_REQUESTED; - data: Record; - }) - | (AcpxEventEnvelope & { - type: typeof ACPX_EVENT_TYPES.CANCEL_RESULT; - data: { - cancelled: boolean; - }; - }) - | (AcpxEventEnvelope & { - type: typeof ACPX_EVENT_TYPES.MODE_SET; - data: { - mode_id: string; - resumed?: boolean; - }; - }) - | (AcpxEventEnvelope & { - type: typeof ACPX_EVENT_TYPES.CONFIG_SET; - data: { - config_id: string; - value: string; - resumed?: boolean; - config_options?: unknown[]; - }; - }) - | (AcpxEventEnvelope & { - type: typeof ACPX_EVENT_TYPES.STATUS_SNAPSHOT; - data: { - status: AcpxEventStatusSnapshotStatus; - pid?: number; - summary?: string; - uptime?: string; - last_prompt_time?: string; - exit_code?: number; - signal?: string; - }; - }) - | (AcpxEventEnvelope & { - type: typeof ACPX_EVENT_TYPES.SESSION_CLOSED; - data: { - reason: "close"; - }; - }); - -export type AcpxEventDraft = Omit< - AcpxEvent, - "schema" | "event_id" | "session_id" | "seq" | "ts" ->; - export type SessionEventLog = { active_path: string; segment_count: number; @@ -289,10 +110,6 @@ export type SessionEventLog = { export type OutputFormatterContext = { sessionId: string; - acpSessionId?: string; - agentSessionId?: string; - requestId?: string; - nextSeq?: number; }; export type OutputPolicy = { @@ -309,9 +126,7 @@ export type OutputErrorEmissionPolicy = { export interface OutputFormatter { setContext(context: OutputFormatterContext): void; - onEvent(event: AcpxEvent): void; - onSessionUpdate(notification: SessionNotification): void; - onClientOperation(operation: ClientOperation): void; + onAcpMessage(message: AcpJsonRpcMessage): void; onError(params: { code: OutputErrorCode; detailCode?: string; @@ -321,7 +136,6 @@ export interface OutputFormatter { acp?: OutputErrorAcpPayload; timestamp?: string; }): void; - onDone(stopReason: StopReason): void; flush(): void; } @@ -334,6 +148,7 @@ export type AcpClientOptions = { authPolicy?: AuthPolicy; suppressSdkConsoleErrors?: boolean; verbose?: boolean; + onAcpMessage?: (direction: AcpMessageDirection, message: AcpJsonRpcMessage) => void; onSessionUpdate?: (notification: SessionNotification) => void; onClientOperation?: (operation: ClientOperation) => void; }; diff --git a/test/cli.test.ts b/test/cli.test.ts index a72e01a..64fc00e 100644 --- a/test/cli.test.ts +++ b/test/cli.test.ts @@ -9,7 +9,7 @@ import { fileURLToPath } from "node:url"; import { InvalidArgumentError } from "commander"; import { formatPromptSessionBannerLine, parseTtlSeconds } from "../src/cli.js"; import { serializeSessionRecordForDisk } from "../src/session-persistence.js"; -import type { AcpxEvent, SessionRecord } from "../src/types.js"; +import type { SessionRecord } from "../src/types.js"; import { cleanupOwnerArtifacts, closeServer, @@ -33,6 +33,40 @@ type CliRunResult = { stderr: string; }; +type ParsedAcpError = { + code?: number; + message?: string; + data?: { + acpxCode?: string; + detailCode?: string; + origin?: string; + sessionId?: string; + }; +}; + +function parseSingleAcpErrorLine(stdout: string): ParsedAcpError { + const payload = JSON.parse(stdout.trim()) as { + jsonrpc?: string; + error?: ParsedAcpError; + }; + assert.equal(payload.jsonrpc, "2.0"); + assert.equal(typeof payload.error, "object"); + return payload.error ?? {}; +} + +function parseJsonRpcLines(stdout: string): Array> { + const lines = stdout + .split(/\r?\n/) + .map((line) => line.trim()) + .filter((line) => line.length > 0); + assert(lines.length > 0, "expected at least one stdout line"); + return lines.map((line) => { + const parsed = JSON.parse(line) as Record; + assert.equal(parsed.jsonrpc, "2.0"); + return parsed; + }); +} + test("parseTtlSeconds parses and rounds valid numeric values", () => { assert.equal(parseTtlSeconds("30"), 30_000); assert.equal(parseTtlSeconds("0"), 0); @@ -154,19 +188,19 @@ test("sessions ensure creates when missing and returns existing on subsequent ca homeDir, ); assert.equal(first.code, 0, first.stderr); - const firstPayload = JSON.parse(first.stdout.trim()) as AcpxEvent; - assert.equal(firstPayload.type, "session_ensured"); - assert.equal((firstPayload.data as { created?: boolean }).created, true); + const firstPayload = JSON.parse(first.stdout.trim()) as Record; + assert.equal(firstPayload.action, "session_ensured"); + assert.equal(firstPayload.created, true); const second = await runCli( ["--cwd", cwd, "--format", "json", "codex", "sessions", "ensure"], homeDir, ); assert.equal(second.code, 0, second.stderr); - const secondPayload = JSON.parse(second.stdout.trim()) as AcpxEvent; - assert.equal(secondPayload.type, "session_ensured"); - assert.equal((secondPayload.data as { created?: boolean }).created, false); - assert.equal(secondPayload.session_id, firstPayload.session_id); + const secondPayload = JSON.parse(second.stdout.trim()) as Record; + assert.equal(secondPayload.action, "session_ensured"); + assert.equal(secondPayload.created, false); + assert.equal(secondPayload.acpxRecordId, firstPayload.acpxRecordId); }); }); @@ -192,10 +226,10 @@ test("sessions ensure resolves existing session by directory walk", async () => homeDir, ); assert.equal(result.code, 0, result.stderr); - const payload = JSON.parse(result.stdout.trim()) as AcpxEvent; - assert.equal(payload.session_id, "parent-session"); - assert.equal(payload.type, "session_ensured"); - assert.equal((payload.data as { created?: boolean }).created, false); + const payload = JSON.parse(result.stdout.trim()) as Record; + assert.equal(payload.acpxRecordId, "parent-session"); + assert.equal(payload.action, "session_ensured"); + assert.equal(payload.created, false); }); }); @@ -243,29 +277,35 @@ test("sessions and status surface agentSessionId for codex and claude in JSON mo homeDir, ); assert.equal(created.code, 0, created.stderr); - const createdPayload = JSON.parse(created.stdout.trim()) as AcpxEvent; - assert.equal(createdPayload.type, "session_ensured"); - assert.equal((createdPayload.data as { created?: boolean }).created, true); - assert.equal(createdPayload.agent_session_id, scenario.expectedRuntimeSessionId); + const createdPayload = JSON.parse(created.stdout.trim()) as Record< + string, + unknown + >; + assert.equal(createdPayload.action, "session_ensured"); + assert.equal(createdPayload.created, true); + assert.equal(createdPayload.agentSessionId, scenario.expectedRuntimeSessionId); const ensured = await runCli( ["--cwd", cwd, "--format", "json", scenario.agentName, "sessions", "ensure"], homeDir, ); assert.equal(ensured.code, 0, ensured.stderr); - const ensuredPayload = JSON.parse(ensured.stdout.trim()) as AcpxEvent; - assert.equal(ensuredPayload.type, "session_ensured"); - assert.equal((ensuredPayload.data as { created?: boolean }).created, false); - assert.equal(ensuredPayload.agent_session_id, scenario.expectedRuntimeSessionId); + const ensuredPayload = JSON.parse(ensured.stdout.trim()) as Record< + string, + unknown + >; + assert.equal(ensuredPayload.action, "session_ensured"); + assert.equal(ensuredPayload.created, false); + assert.equal(ensuredPayload.agentSessionId, scenario.expectedRuntimeSessionId); const status = await runCli( ["--cwd", cwd, "--format", "json", scenario.agentName, "status"], homeDir, ); assert.equal(status.code, 0, status.stderr); - const statusPayload = JSON.parse(status.stdout.trim()) as AcpxEvent; - assert.equal(statusPayload.type, "status_snapshot"); - assert.equal(statusPayload.agent_session_id, scenario.expectedRuntimeSessionId); + const statusPayload = JSON.parse(status.stdout.trim()) as Record; + assert.equal(statusPayload.action, "status_snapshot"); + assert.equal(statusPayload.agentSessionId, scenario.expectedRuntimeSessionId); } }); }); @@ -364,10 +404,10 @@ test("--non-interactive-permissions validates supported values", async () => { homeDir, ); assert.equal(invalid.code, 2); - const payload = JSON.parse(invalid.stdout.trim()) as AcpxEvent; - assert.equal(payload.type, "error"); - assert.equal(payload.data.code, "USAGE"); - assert.match(payload.data.message, /Invalid non-interactive permission policy/); + const error = parseSingleAcpErrorLine(invalid.stdout); + assert.equal(error.code, -32602); + assert.equal(error.data?.acpxCode, "USAGE"); + assert.match(error.message ?? "", /Invalid non-interactive permission policy/); }); }); @@ -376,10 +416,10 @@ test("--json-strict requires --format json", async () => { const result = await runCli(["--json-strict", "sessions"], homeDir); assert.equal(result.code, 2); assert.equal(result.stderr.trim(), ""); - const payload = JSON.parse(result.stdout.trim()) as AcpxEvent; - assert.equal(payload.type, "error"); - assert.equal(payload.data.code, "USAGE"); - assert.match(payload.data.message, /--json-strict requires --format json/); + const error = parseSingleAcpErrorLine(result.stdout); + assert.equal(error.code, -32602); + assert.equal(error.data?.acpxCode, "USAGE"); + assert.match(error.message ?? "", /--json-strict requires --format json/); }); }); @@ -391,11 +431,11 @@ test("--json-strict rejects --verbose", async () => { ); assert.equal(result.code, 2); assert.equal(result.stderr.trim(), ""); - const payload = JSON.parse(result.stdout.trim()) as AcpxEvent; - assert.equal(payload.type, "error"); - assert.equal(payload.data.code, "USAGE"); + const error = parseSingleAcpErrorLine(result.stdout); + assert.equal(error.code, -32602); + assert.equal(error.data?.acpxCode, "USAGE"); assert.match( - payload.data.message, + error.message ?? "", /--json-strict cannot be combined with --verbose/, ); }); @@ -463,12 +503,99 @@ test("queued prompt failures emit exactly one JSON error event", async () => { .split(/\r?\n/) .map((line) => line.trim()) .filter((line) => line.length > 0) - .map((line) => JSON.parse(line) as AcpxEvent); + .map((line) => JSON.parse(line) as Record); - const errors = events.filter((event) => event.type === "error"); + const errors = events.filter( + (event) => typeof event.error === "object" && event.error !== null, + ); assert.equal(errors.length, 1, writeResult.stdout); - assert.equal(errors[0]?.data.code, "PERMISSION_PROMPT_UNAVAILABLE"); - assert.notEqual(errors[0]?.session_id, "unknown"); + assert.equal((errors[0]?.error as { code?: unknown } | undefined)?.code, -32603); + assert.notEqual( + (errors[0]?.error as { data?: { sessionId?: unknown } } | undefined)?.data + ?.sessionId, + "unknown", + ); + } finally { + if (blocker.exitCode === null && blocker.signalCode == null) { + blocker.kill("SIGKILL"); + await new Promise((resolve) => { + blocker.once("close", () => resolve()); + }); + } + } + }); +}); + +test("json-strict queued prompt failure emits JSON-RPC lines only", async () => { + await withTempHome(async (homeDir) => { + const cwd = path.join(homeDir, "workspace"); + await fs.mkdir(cwd, { recursive: true }); + await fs.mkdir(path.join(homeDir, ".acpx"), { recursive: true }); + await fs.writeFile( + path.join(homeDir, ".acpx", "config.json"), + `${JSON.stringify( + { + agents: { + codex: { + command: MOCK_AGENT_COMMAND, + }, + }, + }, + null, + 2, + )}\n`, + "utf8", + ); + + const session = await runCli( + ["--cwd", cwd, "--format", "json", "codex", "sessions", "new"], + homeDir, + ); + assert.equal(session.code, 0, session.stderr); + + const blocker = spawn( + process.execPath, + [CLI_PATH, "--cwd", cwd, "codex", "prompt", "sleep 1500"], + { + env: { ...process.env, HOME: homeDir }, + stdio: ["ignore", "ignore", "ignore"], + }, + ); + + try { + await new Promise((resolve) => { + setTimeout(resolve, 200); + }); + + const writeResult = await runCli( + [ + "--cwd", + cwd, + "--format", + "json", + "--json-strict", + "--non-interactive-permissions", + "fail", + "codex", + "prompt", + `write ${path.join(cwd, "x.txt")} hi`, + ], + homeDir, + ); + + assert.equal(writeResult.code, 5, writeResult.stderr); + assert.equal(writeResult.stderr.trim(), ""); + + const events = parseJsonRpcLines(writeResult.stdout); + assert.equal( + events.some( + (event) => + typeof event.error === "object" && + event.error !== null && + typeof (event.error as { code?: unknown }).code === "number", + ), + true, + ); } finally { if (blocker.exitCode === null && blocker.signalCode == null) { blocker.kill("SIGKILL"); @@ -537,7 +664,7 @@ test("queued prompt failures remain visible in quiet mode", async () => { ); assert.equal(writeResult.code, 5); - assert.equal(writeResult.stdout.trim(), ""); + assert.match(writeResult.stdout, /error:\s*Internal error/i); assert.match( writeResult.stderr, /Permission prompt unavailable in non-interactive mode/, @@ -580,10 +707,10 @@ test("--json-strict suppresses session banners on stderr", async () => { ); assert.equal(result.code, 0, result.stderr); assert.equal(result.stderr.trim(), ""); - const payload = JSON.parse(result.stdout.trim()) as AcpxEvent; - assert.equal(payload.type, "session_ensured"); - assert.equal((payload.data as { created?: boolean }).created, true); - assert.equal(typeof payload.session_id, "string"); + const payload = JSON.parse(result.stdout.trim()) as Record; + assert.equal(payload.action, "session_ensured"); + assert.equal(payload.created, true); + assert.equal(typeof payload.acpxRecordId, "string"); }); }); @@ -615,10 +742,10 @@ test("json format emits structured no-session error event", async () => { homeDir, ); assert.equal(result.code, 4); - const payload = JSON.parse(result.stdout.trim()) as AcpxEvent; - assert.equal(payload.type, "error"); - assert.equal(payload.data.code, "NO_SESSION"); - assert.match(payload.data.message, /No acpx session found/); + const error = parseSingleAcpErrorLine(result.stdout); + assert.equal(error.code, -32002); + assert.equal(error.data?.acpxCode, "NO_SESSION"); + assert.match(error.message ?? "", /No acpx session found/); }); }); @@ -683,10 +810,10 @@ test("cancel resolves named session when -s is before subcommand", async () => { ); assert.equal(result.code, 0, result.stderr); - const payload = JSON.parse(result.stdout.trim()) as AcpxEvent; - assert.equal(payload.type, "cancel_result"); - assert.equal(payload.session_id, "named-cancel-session"); - assert.equal((payload.data as { cancelled?: boolean }).cancelled, false); + const payload = JSON.parse(result.stdout.trim()) as Record; + assert.equal(payload.action, "cancel_result"); + assert.equal(payload.acpxRecordId, "named-cancel-session"); + assert.equal(payload.cancelled, false); }); }); @@ -712,12 +839,12 @@ test("status resolves named session when -s is before subcommand", async () => { ); assert.equal(result.code, 0, result.stderr); - const payload = JSON.parse(result.stdout.trim()) as AcpxEvent; - assert.equal(payload.type, "status_snapshot"); - assert.equal(payload.session_id, "named-status-session"); - assert.equal((payload.data as { status?: string }).status, "dead"); - assert.notEqual((payload.data as { status?: string }).status, "no-session"); - assert.equal(payload.agent_session_id, undefined); + const payload = JSON.parse(result.stdout.trim()) as Record; + assert.equal(payload.action, "status_snapshot"); + assert.equal(payload.acpxRecordId, "named-status-session"); + assert.equal(payload.status, "dead"); + assert.notEqual(payload.status, "no-session"); + assert.equal(payload.agentSessionId, undefined); }); }); @@ -1109,7 +1236,7 @@ function makeSessionRecord( lastSeq: record.lastSeq ?? 0, lastRequestId: record.lastRequestId, eventLog: record.eventLog ?? { - active_path: `.events.ndjson`, + active_path: `.stream.ndjson`, segment_count: 1, max_segment_bytes: 1024, max_segments: 1, diff --git a/test/events.test.ts b/test/events.test.ts index 7c6e7a8..236b73c 100644 --- a/test/events.test.ts +++ b/test/events.test.ts @@ -1,94 +1,113 @@ import assert from "node:assert/strict"; import test from "node:test"; -import { isAcpxEvent } from "../src/events.js"; -import { ACPX_EVENT_TYPES } from "../src/types.js"; +import { isAcpJsonRpcMessage } from "../src/acp-jsonrpc.js"; -function makeEvent(type: string, data: Record): unknown { - return { - schema: "acpx.event.v1", - event_id: "evt-1", - session_id: "session-1", - seq: 0, - ts: "2026-01-01T00:00:00.000Z", - type, - data, - }; -} - -test("isAcpxEvent accepts structured tool_call payload", () => { - const event = makeEvent(ACPX_EVENT_TYPES.TOOL_CALL, { - tool_call_id: "call_123", - title: "read_file", - status: "in_progress", - }); - - assert.equal(isAcpxEvent(event), true); -}); - -test("isAcpxEvent rejects tool_call payload without tool_call_id", () => { - const event = makeEvent(ACPX_EVENT_TYPES.TOOL_CALL, { - title: "read_file", - status: "in_progress", - }); - - assert.equal(isAcpxEvent(event), false); +test("isAcpJsonRpcMessage accepts JSON-RPC request", () => { + assert.equal( + isAcpJsonRpcMessage({ + jsonrpc: "2.0", + id: "req-1", + method: "session/prompt", + params: { + sessionId: "session-1", + prompt: [{ type: "text", text: "hi" }], + }, + }), + true, + ); }); -test("isAcpxEvent accepts structured plan payload", () => { - const event = makeEvent(ACPX_EVENT_TYPES.PLAN, { - entries: [ - { - content: "Implement health probe", - status: "in_progress", - priority: "high", +test("isAcpJsonRpcMessage accepts JSON-RPC notification", () => { + assert.equal( + isAcpJsonRpcMessage({ + jsonrpc: "2.0", + method: "session/update", + params: { + sessionId: "session-1", + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "hello" }, + }, }, - ], - }); + }), + true, + ); +}); - assert.equal(isAcpxEvent(event), true); +test("isAcpJsonRpcMessage accepts JSON-RPC success response", () => { + assert.equal( + isAcpJsonRpcMessage({ + jsonrpc: "2.0", + id: "req-1", + result: { stopReason: "end_turn" }, + }), + true, + ); }); -test("isAcpxEvent rejects malformed plan entries", () => { - const event = makeEvent(ACPX_EVENT_TYPES.PLAN, { - entries: [ - { - content: "Missing priority", - status: "pending", +test("isAcpJsonRpcMessage accepts JSON-RPC error response", () => { + assert.equal( + isAcpJsonRpcMessage({ + jsonrpc: "2.0", + id: "req-1", + error: { + code: -32000, + message: "runtime error", }, - ], - }); - - assert.equal(isAcpxEvent(event), false); + }), + true, + ); }); -test("isAcpxEvent validates new prompt_queued control event", () => { - const valid = makeEvent(ACPX_EVENT_TYPES.PROMPT_QUEUED, { - request_id: "req-1", - }); - const invalid = makeEvent(ACPX_EVENT_TYPES.PROMPT_QUEUED, { - request_id: 1, - }); - - assert.equal(isAcpxEvent(valid), true); - assert.equal(isAcpxEvent(invalid), false); +test("isAcpJsonRpcMessage rejects non-JSON-RPC payload", () => { + assert.equal( + isAcpJsonRpcMessage({ + type: "custom_event", + content: "hello", + }), + false, + ); }); -test("isAcpxEvent validates status_snapshot optional fields", () => { - const valid = makeEvent(ACPX_EVENT_TYPES.STATUS_SNAPSHOT, { - status: "dead", - pid: 123, - summary: "queue owner unavailable", - uptime: "00:00:03", - last_prompt_time: "2026-01-01T00:00:00.000Z", - exit_code: 1, - signal: "SIGTERM", - }); - - const invalid = makeEvent(ACPX_EVENT_TYPES.STATUS_SNAPSHOT, { - status: "dead", - pid: -1, - }); +test("isAcpJsonRpcMessage accepts request/notification/response fixtures after roundtrip", () => { + const fixtures: unknown[] = [ + { + jsonrpc: "2.0", + id: "req-1", + method: "session/prompt", + params: { + sessionId: "session-1", + prompt: [{ type: "text", text: "hi" }], + }, + }, + { + jsonrpc: "2.0", + method: "session/update", + params: { + sessionId: "session-1", + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "hello" }, + }, + }, + }, + { + jsonrpc: "2.0", + id: "req-2", + result: { stopReason: "end_turn" }, + }, + { + jsonrpc: "2.0", + id: "req-3", + error: { + code: -32000, + message: "runtime error", + }, + }, + ]; - assert.equal(isAcpxEvent(valid), true); - assert.equal(isAcpxEvent(invalid), false); + for (const fixture of fixtures) { + const roundTripped = JSON.parse(JSON.stringify(fixture)); + assert.equal(isAcpJsonRpcMessage(roundTripped), true); + } }); diff --git a/test/integration.test.ts b/test/integration.test.ts index 293472a..7e3d536 100644 --- a/test/integration.test.ts +++ b/test/integration.test.ts @@ -19,6 +19,19 @@ type CliRunResult = { stderr: string; }; +function parseJsonRpcOutputLines(stdout: string): Array> { + const lines = stdout + .split("\n") + .map((line) => line.trim()) + .filter((line) => line.length > 0); + assert(lines.length > 0, "expected at least one JSON-RPC line"); + return lines.map((line) => { + const parsed = JSON.parse(line) as Record; + assert.equal(parsed.jsonrpc, "2.0"); + return parsed; + }); +} + test("integration: exec echo baseline", async () => { await withTempHome(async (homeDir) => { const cwd = await fs.mkdtemp(path.join(os.tmpdir(), "acpx-integration-cwd-")); @@ -55,11 +68,22 @@ test("integration: timeout emits structured TIMEOUT json error", async () => { .trim() .split("\n") .filter((line) => line.trim().length > 0) - .map((line) => JSON.parse(line) as { type?: string; data?: { code?: string } }); + .map( + (line) => + JSON.parse(line) as { + jsonrpc?: string; + error?: { code?: number; data?: { acpxCode?: string } }; + }, + ); assert(payloads.length > 0, "expected at least one JSON payload"); - const timeoutError = payloads.find((payload) => payload.type === "error"); - assert(timeoutError, `expected error event in output:\n${result.stdout}`); - assert.equal(timeoutError.data?.code, "TIMEOUT"); + const timeoutError = payloads.find( + (payload) => + payload.jsonrpc === "2.0" && payload.error?.data?.acpxCode === "TIMEOUT", + ); + assert( + timeoutError, + `expected timeout error payload in output:\n${result.stdout}`, + ); } finally { await fs.rm(cwd, { recursive: true, force: true }); } @@ -94,11 +118,19 @@ test("integration: non-interactive fail emits structured permission error", asyn .trim() .split("\n") .filter((line) => line.trim().length > 0) - .map((line) => JSON.parse(line) as { type?: string; data?: { code?: string } }); + .map( + (line) => + JSON.parse(line) as { jsonrpc?: string; error?: { code?: unknown } }, + ); assert(payloads.length > 0, "expected at least one JSON payload"); - const permissionError = payloads.find((payload) => payload.type === "error"); - assert(permissionError, `expected error event in output:\n${result.stdout}`); - assert.equal(permissionError.data?.code, "PERMISSION_PROMPT_UNAVAILABLE"); + const permissionError = payloads.find( + (payload) => + payload.jsonrpc === "2.0" && typeof payload.error?.code === "number", + ); + assert( + permissionError, + `expected ACP error response in output:\n${result.stdout}`, + ); } finally { await fs.rm(cwd, { recursive: true, force: true }); } @@ -136,11 +168,49 @@ test("integration: json-strict suppresses runtime stderr diagnostics", async () .trim() .split("\n") .filter((line) => line.trim().length > 0) - .map((line) => JSON.parse(line) as { type?: string; data?: { code?: string } }); + .map( + (line) => + JSON.parse(line) as { jsonrpc?: string; error?: { code?: unknown } }, + ); assert(payloads.length > 0, "expected at least one JSON payload"); - const permissionError = payloads.find((payload) => payload.type === "error"); - assert(permissionError, `expected error event in output:\n${result.stdout}`); - assert.equal(permissionError.data?.code, "PERMISSION_PROMPT_UNAVAILABLE"); + const permissionError = payloads.find( + (payload) => + payload.jsonrpc === "2.0" && typeof payload.error?.code === "number", + ); + assert( + permissionError, + `expected ACP error response in output:\n${result.stdout}`, + ); + } finally { + await fs.rm(cwd, { recursive: true, force: true }); + } + }); +}); + +test("integration: json-strict exec success emits JSON-RPC lines only", async () => { + await withTempHome(async (homeDir) => { + const cwd = await fs.mkdtemp(path.join(os.tmpdir(), "acpx-integration-cwd-")); + + try { + const result = await runCli( + [ + ...baseAgentArgs(cwd), + "--format", + "json", + "--json-strict", + "exec", + "echo strict-success", + ], + homeDir, + ); + + assert.equal(result.code, 0, result.stderr); + assert.equal(result.stderr.trim(), ""); + const payloads = parseJsonRpcOutputLines(result.stdout); + assert( + payloads.some((payload) => Object.hasOwn(payload, "result")), + "expected at least one JSON-RPC result payload", + ); } finally { await fs.rm(cwd, { recursive: true, force: true }); } @@ -241,9 +311,9 @@ test("integration: prompt reuses warm queue owner pid across turns", async () => ); assert.equal(created.code, 0, created.stderr); const createdEvent = JSON.parse(created.stdout.trim()) as { - session_id?: string; + acpxRecordId?: string; }; - const sessionId = createdEvent.session_id; + const sessionId = createdEvent.acpxRecordId; assert.equal(typeof sessionId, "string"); const first = await runCli( @@ -304,9 +374,9 @@ test("integration: prompt recovers when loadSession fails on empty session", asy ); assert.equal(created.code, 0, created.stderr); const createdEvent = JSON.parse(created.stdout.trim()) as { - session_id?: string; + acpxRecordId?: string; }; - const originalSessionId = createdEvent.session_id; + const originalSessionId = createdEvent.acpxRecordId; assert.equal(typeof originalSessionId, "string"); const prompt = await runCli( @@ -329,14 +399,17 @@ test("integration: prompt recovers when loadSession fails on empty session", asy .trim() .split("\n") .filter((line) => line.trim().length > 0) - .map((line) => JSON.parse(line) as { type?: string }); + .map( + (line) => + JSON.parse(line) as { jsonrpc?: string; result?: { stopReason?: string } }, + ); assert.equal( - payloads.some((payload) => payload.type === "error"), - false, + payloads.some((payload) => Object.hasOwn(payload, "error")), + true, prompt.stdout, ); assert.equal( - payloads.some((payload) => payload.type === "turn_done"), + payloads.some((payload) => payload.result?.stopReason === "end_turn"), true, prompt.stdout, ); @@ -406,13 +479,11 @@ test("integration: cancel yields cancelled stopReason without queue error", asyn assert.equal(cancelResult.code, 0, cancelResult.stderr); const payload = JSON.parse(cancelResult.stdout.trim()) as { - type: string; - data: { - cancelled?: boolean; - }; + action?: string; + cancelled?: boolean; }; - assert.equal(payload.type, "cancel_result"); - cancelled = payload.data.cancelled === true; + assert.equal(payload.action, "cancel_result"); + cancelled = payload.cancelled === true; if (cancelled) { break; } @@ -428,15 +499,12 @@ test("integration: cancel yields cancelled stopReason without queue error", asyn const promptResult = await doneEventPromise; assert.equal( - promptResult.events.some( - (event) => - event.type === "turn_done" && event.data?.stop_reason === "cancelled", - ), + promptResult.events.some((event) => event.result?.stopReason === "cancelled"), true, promptResult.stdout, ); assert.equal( - promptResult.events.some((event) => event.type === "error"), + promptResult.events.some((event) => Object.hasOwn(event, "error")), false, promptResult.stdout, ); @@ -601,10 +669,15 @@ async function runCli( } type PromptEvent = { - type?: string; - data?: { - stop_reason?: string; - code?: string; + jsonrpc?: string; + method?: string; + params?: unknown; + result?: { + stopReason?: string; + }; + error?: { + code?: unknown; + message?: string; }; }; @@ -660,7 +733,7 @@ async function waitForPromptDoneEvent( } events.push(event); - if (event.type === "turn_done") { + if (event.result?.stopReason) { finish(() => { resolve({ events, diff --git a/test/jsonrpc-error.test.ts b/test/jsonrpc-error.test.ts new file mode 100644 index 0000000..f8c8c1a --- /dev/null +++ b/test/jsonrpc-error.test.ts @@ -0,0 +1,49 @@ +import assert from "node:assert/strict"; +import test from "node:test"; +import { buildJsonRpcErrorResponse } from "../src/jsonrpc-error.js"; + +test("buildJsonRpcErrorResponse preserves ACP payload when available", () => { + const response = buildJsonRpcErrorResponse({ + outputCode: "RUNTIME", + message: "fallback message", + sessionId: "session-1", + acp: { + code: -32099, + message: "adapter failure", + data: { + reason: "boom", + }, + }, + }); + + assert.equal(response.jsonrpc, "2.0"); + assert.equal(response.id, null); + assert.equal(response.error.code, -32099); + assert.equal(response.error.message, "adapter failure"); + assert.deepEqual(response.error.data, { + reason: "boom", + }); +}); + +test("buildJsonRpcErrorResponse shapes fallback ACPX metadata", () => { + const response = buildJsonRpcErrorResponse({ + outputCode: "NO_SESSION", + detailCode: "MISSING", + origin: "queue", + message: "No session found", + retryable: false, + timestamp: "2026-02-28T00:00:00.000Z", + sessionId: "session-2", + }); + + assert.equal(response.error.code, -32002); + assert.equal(response.error.message, "No session found"); + assert.deepEqual(response.error.data, { + acpxCode: "NO_SESSION", + detailCode: "MISSING", + origin: "queue", + retryable: false, + timestamp: "2026-02-28T00:00:00.000Z", + sessionId: "session-2", + }); +}); diff --git a/test/output.test.ts b/test/output.test.ts index 375e597..e108cf8 100644 --- a/test/output.test.ts +++ b/test/output.test.ts @@ -17,321 +17,168 @@ class CaptureWriter { function messageChunk(text: string): unknown { return { - update: { - sessionUpdate: "agent_message_chunk", - content: { type: "text", text }, + jsonrpc: "2.0", + method: "session/update", + params: { + sessionId: "session-1", + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text }, + }, }, }; } function thoughtChunk(text: string): unknown { return { - update: { - sessionUpdate: "agent_thought_chunk", - content: { type: "text", text }, + jsonrpc: "2.0", + method: "session/update", + params: { + sessionId: "session-1", + update: { + sessionUpdate: "agent_thought_chunk", + content: { type: "text", text }, + }, + }, + }; +} + +function doneResult(stopReason: string): unknown { + return { + jsonrpc: "2.0", + id: "req-1", + result: { + stopReason, }, }; } -test("text formatter batches thought tokens", () => { +test("text formatter batches thought chunks from ACP notifications", () => { const writer = new CaptureWriter(); const formatter = createOutputFormatter("text", { stdout: writer }); - formatter.onSessionUpdate(thoughtChunk("Investigating ") as never); - formatter.onSessionUpdate(thoughtChunk("the issue") as never); - formatter.onSessionUpdate(messageChunk("Done.") as never); - formatter.onDone("end_turn"); + formatter.onAcpMessage(thoughtChunk("Investigating ") as never); + formatter.onAcpMessage(thoughtChunk("the issue") as never); + formatter.onAcpMessage(messageChunk("Done.") as never); + formatter.onAcpMessage(doneResult("end_turn") as never); const output = writer.toString(); assert.equal((output.match(/\[thinking\]/g) ?? []).length, 1); assert.match(output, /\[thinking\] Investigating the issue/); + assert.match(output, /\[done\] end_turn/); }); -test("text formatter renders tool calls with input and output", () => { +test("text formatter renders tool call lifecycle from ACP updates", () => { const writer = new CaptureWriter(); const formatter = createOutputFormatter("text", { stdout: writer }); - formatter.onSessionUpdate({ - update: { - sessionUpdate: "tool_call", - toolCallId: "tool-1", - title: "run_command", - status: "in_progress", - rawInput: { command: "npm", args: ["test"] }, + formatter.onAcpMessage({ + jsonrpc: "2.0", + method: "session/update", + params: { + sessionId: "session-1", + update: { + sessionUpdate: "tool_call", + toolCallId: "tool-1", + title: "run_command", + status: "in_progress", + rawInput: { command: "npm", args: ["test"] }, + }, }, } as never); - - formatter.onSessionUpdate({ - update: { - sessionUpdate: "tool_call_update", - toolCallId: "tool-1", - title: "run_command", - status: "completed", - rawInput: { command: "npm", args: ["test"] }, - rawOutput: { stdout: "All tests passing" }, + formatter.onAcpMessage({ + jsonrpc: "2.0", + method: "session/update", + params: { + sessionId: "session-1", + update: { + sessionUpdate: "tool_call_update", + toolCallId: "tool-1", + title: "run_command", + status: "completed", + rawInput: { command: "npm", args: ["test"] }, + rawOutput: { stdout: "All tests passing" }, + }, }, } as never); const output = writer.toString(); assert.match(output, /\[tool\] run_command/); assert.match(output, /input: npm test/); - assert.match(output, /output:/); assert.match(output, /All tests passing/); }); -test("json formatter emits canonical NDJSON", () => { +test("json formatter passes through ACP messages", () => { const writer = new CaptureWriter(); const formatter = createOutputFormatter("json", { stdout: writer, jsonContext: { sessionId: "session-1", - requestId: "req-1", - nextSeq: 0, }, }); - formatter.onSessionUpdate(messageChunk("Hello") as never); - formatter.onSessionUpdate(thoughtChunk("Thinking") as never); - formatter.onDone("end_turn"); + const first = messageChunk("hello"); + const second = doneResult("end_turn"); + formatter.onAcpMessage(first as never); + formatter.onAcpMessage(second as never); const lines = writer .toString() .trim() .split("\n") - .filter((line) => line.length > 0); - const parsed = lines.map((line) => JSON.parse(line)); + .filter((line) => line.length > 0) + .map((line) => JSON.parse(line)); - assert.equal(parsed[0]?.schema, "acpx.event.v1"); - assert.equal(parsed[0]?.session_id, "session-1"); - assert.equal(parsed[0]?.request_id, "req-1"); - assert.equal(parsed[0]?.seq, 0); - assert.equal(parsed[1]?.seq, 1); - assert.equal(parsed[2]?.seq, 2); - assert.equal(parsed[0]?.type, "output_delta"); - assert.equal(parsed[0]?.data?.stream, "output"); - assert.equal(parsed[1]?.type, "output_delta"); - assert.equal(parsed[1]?.data?.stream, "thought"); - assert.equal(parsed[2]?.type, "turn_done"); + assert.equal(lines.length, 2); + assert.deepEqual(lines[0], first); + assert.deepEqual(lines[1], second); }); -test("text formatter renders client operation updates", () => { - const writer = new CaptureWriter(); - const formatter = createOutputFormatter("text", { stdout: writer }); - - formatter.onClientOperation({ - method: "fs/read_text_file", - status: "completed", - summary: "read_text_file: /tmp/demo.txt", - details: "line=1, limit=20", - timestamp: new Date().toISOString(), - }); - - const output = writer.toString(); - assert.match(output, /\[client\] read_text_file: \/tmp\/demo.txt \(completed\)/); - assert.match(output, /line=1, limit=20/); -}); - -test("json formatter emits client operation canonical events", () => { - const writer = new CaptureWriter(); - const formatter = createOutputFormatter("json", { stdout: writer }); - - formatter.onClientOperation({ - method: "terminal/create", - status: "running", - summary: "terminal/create: node -e \"console.log('hi')\"", - timestamp: new Date().toISOString(), - }); - - const line = writer.toString().trim(); - const parsed = JSON.parse(line) as { - type: string; - data: { method: string; status: string }; - }; - assert.equal(parsed.type, "client_operation"); - assert.equal(parsed.data.method, "terminal/create"); - assert.equal(parsed.data.status, "running"); -}); - -test("json formatter emits structured canonical error events", () => { +test("json formatter emits ACP JSON-RPC error response from onError", () => { const writer = new CaptureWriter(); const formatter = createOutputFormatter("json", { stdout: writer, jsonContext: { - sessionId: "session-error", - nextSeq: 0, + sessionId: "session-err", }, }); formatter.onError({ - code: "PERMISSION_PROMPT_UNAVAILABLE", - detailCode: "QUEUE_CONTROL_REQUEST_FAILED", - origin: "queue", - message: "Permission prompt unavailable in non-interactive mode", - retryable: false, - acp: { - code: -32000, - message: "Authentication required", - data: { - method: "token", - }, - }, + code: "RUNTIME", + message: "adapter failed", + origin: "runtime", }); - const line = writer.toString().trim(); - const parsed = JSON.parse(line) as { - type: string; - data: { - code: string; - detail_code?: string; - origin?: string; - message: string; - retryable?: boolean; - acp_error?: { - code: number; - message: string; - data?: unknown; + const parsed = JSON.parse(writer.toString().trim()) as { + jsonrpc?: string; + id?: unknown; + error?: { + code?: number; + message?: string; + data?: { + acpxCode?: string; + origin?: string; + sessionId?: string; }; }; - session_id: string; - seq: number; }; - assert.equal(parsed.type, "error"); - assert.equal(parsed.data.code, "PERMISSION_PROMPT_UNAVAILABLE"); - assert.equal(parsed.data.detail_code, "QUEUE_CONTROL_REQUEST_FAILED"); - assert.equal(parsed.data.origin, "queue"); - assert.equal( - parsed.data.message, - "Permission prompt unavailable in non-interactive mode", - ); - assert.equal(parsed.data.retryable, false); - assert.equal(parsed.data.acp_error?.code, -32000); - assert.equal(parsed.session_id, "session-error"); - assert.equal(parsed.seq, 0); -}); - -test("quiet formatter suppresses non-text output", () => { - const writer = new CaptureWriter(); - const formatter = createOutputFormatter("quiet", { stdout: writer }); - - formatter.onSessionUpdate(thoughtChunk("private") as never); - formatter.onSessionUpdate({ - update: { - sessionUpdate: "tool_call", - toolCallId: "tool-2", - title: "read_file", - status: "completed", - }, - } as never); - formatter.onSessionUpdate(messageChunk("Hello ") as never); - formatter.onSessionUpdate(messageChunk("world") as never); - formatter.onDone("end_turn"); - - assert.equal(writer.toString(), "Hello world\n"); + assert.equal(parsed.jsonrpc, "2.0"); + assert.equal(parsed.id, null); + assert.equal(parsed.error?.code, -32603); + assert.equal(parsed.error?.message, "adapter failed"); + assert.equal(parsed.error?.data?.acpxCode, "RUNTIME"); + assert.equal(parsed.error?.data?.origin, "runtime"); + assert.equal(parsed.error?.data?.sessionId, "session-err"); }); -test("quiet formatter flushes on turn_done event path", () => { +test("quiet formatter outputs only agent text and flushes on prompt result", () => { const writer = new CaptureWriter(); const formatter = createOutputFormatter("quiet", { stdout: writer }); - formatter.onEvent({ - schema: "acpx.event.v1", - event_id: "evt-1", - session_id: "session-1", - seq: 0, - ts: "2026-02-27T00:00:00.000Z", - type: "output_delta", - data: { - stream: "output", - text: "Hello ", - }, - } as never); - formatter.onEvent({ - schema: "acpx.event.v1", - event_id: "evt-2", - session_id: "session-1", - seq: 1, - ts: "2026-02-27T00:00:01.000Z", - type: "output_delta", - data: { - stream: "output", - text: "world", - }, - } as never); - formatter.onEvent({ - schema: "acpx.event.v1", - event_id: "evt-3", - session_id: "session-1", - seq: 2, - ts: "2026-02-27T00:00:02.000Z", - type: "turn_done", - data: { - stop_reason: "end_turn", - }, - } as never); + formatter.onAcpMessage(thoughtChunk("private-thought") as never); + formatter.onAcpMessage(messageChunk("Hello ") as never); + formatter.onAcpMessage(messageChunk("world") as never); + formatter.onAcpMessage(doneResult("end_turn") as never); assert.equal(writer.toString(), "Hello world\n"); }); - -test("quiet formatter avoids duplicate flush across turn_done and onDone", () => { - const writer = new CaptureWriter(); - const formatter = createOutputFormatter("quiet", { stdout: writer }); - - formatter.onEvent({ - schema: "acpx.event.v1", - event_id: "evt-10", - session_id: "session-1", - seq: 0, - ts: "2026-02-27T00:00:00.000Z", - type: "output_delta", - data: { - stream: "output", - text: "single", - }, - } as never); - formatter.onEvent({ - schema: "acpx.event.v1", - event_id: "evt-11", - session_id: "session-1", - seq: 1, - ts: "2026-02-27T00:00:01.000Z", - type: "turn_done", - data: { - stop_reason: "end_turn", - }, - } as never); - formatter.onDone("end_turn"); - - assert.equal(writer.toString(), "single\n"); -}); - -test("quiet formatter prefers event output when both session updates and events arrive", () => { - const writer = new CaptureWriter(); - const formatter = createOutputFormatter("quiet", { stdout: writer }); - - formatter.onSessionUpdate(messageChunk("dup") as never); - formatter.onEvent({ - schema: "acpx.event.v1", - event_id: "evt-20", - session_id: "session-1", - seq: 0, - ts: "2026-02-27T00:00:00.000Z", - type: "output_delta", - data: { - stream: "output", - text: "dup", - }, - } as never); - formatter.onEvent({ - schema: "acpx.event.v1", - event_id: "evt-21", - session_id: "session-1", - seq: 1, - ts: "2026-02-27T00:00:01.000Z", - type: "turn_done", - data: { - stop_reason: "end_turn", - }, - } as never); - - assert.equal(writer.toString(), "dup\n"); -}); diff --git a/test/persisted-key-policy.test.ts b/test/persisted-key-policy.test.ts index feab0d3..f42f7ea 100644 --- a/test/persisted-key-policy.test.ts +++ b/test/persisted-key-policy.test.ts @@ -20,7 +20,7 @@ function makeRecord(): SessionRecord { lastSeq: 4, lastRequestId: "req-1", eventLog: { - active_path: "/tmp/record-1.events.ndjson", + active_path: "/tmp/record-1.stream.ndjson", segment_count: 2, max_segment_bytes: 1024, max_segments: 2, diff --git a/test/queue-ipc-errors.test.ts b/test/queue-ipc-errors.test.ts index 7e6c39b..f64a446 100644 --- a/test/queue-ipc-errors.test.ts +++ b/test/queue-ipc-errors.test.ts @@ -28,16 +28,7 @@ const NOOP_OUTPUT_FORMATTER: OutputFormatter = { setContext() { // no-op }, - onEvent() { - // no-op - }, - onSessionUpdate() { - // no-op - }, - onClientOperation() { - // no-op - }, - onDone() { + onAcpMessage() { // no-op }, onError() { @@ -334,19 +325,14 @@ test("trySubmitToRunningOwner streams queued lifecycle and returns result", asyn const events: string[] = []; const formatter: OutputFormatter = { setContext(context) { - events.push(`context:${context.sessionId}:${context.requestId ?? "-"}`); - }, - onEvent(event) { - events.push(`event:${event.type}`); + events.push(`context:${context.sessionId}`); }, - onSessionUpdate() { - // queue transport forwards canonical events only - }, - onClientOperation() { - // queue transport forwards canonical events only - }, - onDone() { - // queue transport forwards canonical events only + onAcpMessage(message) { + if ("method" in message && typeof message.method === "string") { + events.push(`event:${message.method}`); + return; + } + events.push("event:response"); }, onError(params) { events.push(`error:${params.code}`); @@ -381,18 +367,15 @@ test("trySubmitToRunningOwner streams queued lifecycle and returns result", asyn `${JSON.stringify({ type: "event", requestId: request.requestId, - event: { - schema: "acpx.event.v1", - event_id: "evt-queued-1", - session_id: sessionId, - acp_session_id: "agent-session", - seq: 1, - ts: "2026-01-01T00:00:00.000Z", - type: "turn_started", - data: { - mode: "prompt", - resumed: true, - input_preview: "hello", + message: { + jsonrpc: "2.0", + method: "session/update", + params: { + sessionId: "agent-session", + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "hello" }, + }, }, }, })}\n`, @@ -421,7 +404,7 @@ test("trySubmitToRunningOwner streams queued lifecycle and returns result", asyn lastUsedAt: "2026-01-01T00:00:00.000Z", lastSeq: 2, eventLog: { - active_path: "/tmp/session.events.ndjson", + active_path: "/tmp/session.stream.ndjson", segment_count: 1, max_segment_bytes: 1024, max_segments: 1, @@ -461,10 +444,10 @@ test("trySubmitToRunningOwner streams queued lifecycle and returns result", asyn assert.equal(result.stopReason, "end_turn"); assert.equal(result.resumed, true); assert.equal( - events.some((entry) => entry.startsWith(`context:${sessionId}:`)), + events.some((entry) => entry === `context:${sessionId}`), true, ); - assert.equal(events.includes("event:turn_started"), true); + assert.equal(events.includes("event:session/update"), true); assert.equal(events.includes("flush"), true); assert.equal( events.some((entry) => entry.startsWith("error:")), diff --git a/test/session-events.test.ts b/test/session-events.test.ts index ccee8f6..9354c71 100644 --- a/test/session-events.test.ts +++ b/test/session-events.test.ts @@ -9,7 +9,7 @@ import { resolveSessionRecord, writeSessionRecord, } from "../src/session-persistence.js"; -import { ACPX_EVENT_TYPES, type SessionRecord } from "../src/types.js"; +import type { SessionRecord } from "../src/types.js"; async function withTempHome(run: (homeDir: string) => Promise): Promise { const homeDir = await fs.mkdtemp(path.join(os.tmpdir(), "acpx-events-home-")); @@ -54,15 +54,16 @@ function makeSessionRecord( updated_at: now, cumulative_token_usage: {}, request_token_usage: {}, + acpx: {}, }; } -test("listSessionEvents reads all configured event segments", async () => { +test("listSessionEvents reads all configured stream segments", async () => { await withTempHome(async (homeDir) => { const cwd = path.join(homeDir, "workspace"); await fs.mkdir(cwd, { recursive: true }); - const sessionId = "session-events-max-window"; + const sessionId = "session-stream-max-window"; const record = makeSessionRecord(sessionId, cwd, 7); await writeSessionRecord(record); @@ -72,30 +73,35 @@ test("listSessionEvents reads all configured event segments", async () => { }); for (let index = 0; index < 8; index += 1) { - await writer.appendDraft({ - type: ACPX_EVENT_TYPES.UPDATE, - data: { - update: `event-${index + 1}`, + await writer.appendMessage({ + jsonrpc: "2.0", + method: "session/update", + params: { + sessionId, + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: `event-${index + 1}` }, + }, }, - }); + } as never); } await writer.close({ checkpoint: true }); const events = await listSessionEvents(sessionId); assert.equal(events.length, 8); - assert.deepEqual( - events.map((event) => event.seq), - [1, 2, 3, 4, 5, 6, 7, 8], + assert.equal( + events.every((event) => event.jsonrpc === "2.0"), + true, ); }); }); -test("SessionEventWriter stores actual segment_count instead of max_segments", async () => { +test("SessionEventWriter stores actual segment_count and increments lastSeq", async () => { await withTempHome(async (homeDir) => { const cwd = path.join(homeDir, "workspace"); await fs.mkdir(cwd, { recursive: true }); - const sessionId = "session-events-segment-count"; + const sessionId = "session-stream-segment-count"; const record = makeSessionRecord(sessionId, cwd, 7); await writeSessionRecord(record); @@ -104,34 +110,87 @@ test("SessionEventWriter stores actual segment_count instead of max_segments", a maxSegments: 7, }); - await writer.appendDraft({ - type: ACPX_EVENT_TYPES.UPDATE, - data: { - update: "first", + await writer.appendMessage({ + jsonrpc: "2.0", + method: "session/update", + params: { + sessionId, + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "first" }, + }, }, - }); + } as never); assert.equal(writer.getRecord().eventLog.segment_count, 1); - await writer.appendDraft({ - type: ACPX_EVENT_TYPES.UPDATE, - data: { - update: "second", + await writer.appendMessage({ + jsonrpc: "2.0", + method: "session/update", + params: { + sessionId, + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "second" }, + }, }, - }); + } as never); assert.equal(writer.getRecord().eventLog.segment_count, 2); - await writer.appendDraft({ - type: ACPX_EVENT_TYPES.UPDATE, - data: { - update: "third", - }, - }); + await writer.appendMessage({ + jsonrpc: "2.0", + id: "req-3", + result: { stopReason: "end_turn" }, + } as never); assert.equal(writer.getRecord().eventLog.segment_count, 3); + assert.equal(writer.getRecord().lastSeq, 3); + assert.equal(writer.getRecord().lastRequestId, "req-3"); await writer.close({ checkpoint: true }); const stored = await resolveSessionRecord(sessionId); assert.equal(stored.eventLog.segment_count, 3); assert.equal(stored.eventLog.max_segments, 7); + assert.equal(stored.lastSeq, 3); + }); +}); + +test("listSessionEvents skips malformed NDJSON lines", async () => { + await withTempHome(async (homeDir) => { + const cwd = path.join(homeDir, "workspace"); + await fs.mkdir(cwd, { recursive: true }); + + const sessionId = "session-stream-skip-malformed"; + const record = makeSessionRecord(sessionId, cwd, 5); + await writeSessionRecord(record); + + await fs.mkdir(path.dirname(record.eventLog.active_path), { recursive: true }); + const validOne = JSON.stringify({ + jsonrpc: "2.0", + method: "session/update", + params: { + sessionId, + update: { + sessionUpdate: "agent_message_chunk", + content: { type: "text", text: "first" }, + }, + }, + }); + const validTwo = JSON.stringify({ + jsonrpc: "2.0", + id: "req-2", + result: { stopReason: "end_turn" }, + }); + await fs.writeFile( + record.eventLog.active_path, + `${validOne}\n{invalid-json\n${validTwo}\n`, + "utf8", + ); + + const events = await listSessionEvents(sessionId); + assert.equal(events.length, 2); + assert.equal( + events.every((event) => event.jsonrpc === "2.0"), + true, + ); }); }); diff --git a/test/session-persistence.test.ts b/test/session-persistence.test.ts index d4c3bb5..05c2d9f 100644 --- a/test/session-persistence.test.ts +++ b/test/session-persistence.test.ts @@ -305,7 +305,7 @@ function makeSessionRecord( lastSeq: overrides.lastSeq ?? 0, lastRequestId: overrides.lastRequestId, eventLog: overrides.eventLog ?? { - active_path: `.events.ndjson`, + active_path: `.stream.ndjson`, segment_count: 1, max_segment_bytes: 1024, max_segments: 1,