diff --git a/src/webkit/client.ts b/src/webkit/client.ts index b168c8cb..f6d4fa49 100644 --- a/src/webkit/client.ts +++ b/src/webkit/client.ts @@ -1,8 +1,9 @@ -import WebSocket from 'ws'; import http from 'http'; import { EventEmitter } from 'events'; import { ConnectionError, TimeoutError, ProtocolError, EvaluationError } from './errors'; export { ConnectionError, TimeoutError, ProtocolError, EvaluationError } from './errors'; +import { ProtocolTransport, WebSocketProtocolTransport } from './protocol-transport'; +export type { ProtocolTransport, ProtocolEventHandler } from './protocol-transport'; import { BrowserBackend, NavigateOptions, @@ -38,16 +39,8 @@ export interface WebKitTarget { } export class WebKitClient extends EventEmitter implements BrowserBackend { - private ws: WebSocket | null = null; + private transport: ProtocolTransport; private messageId = 0; - private pendingRequests: Map< - number, - { - resolve: (value: any) => void; - reject: (error: Error) => void; - timer: ReturnType; - } - > = new Map(); private enabledDomains: Set = new Set(); private enabledDomainsPerTarget: Map> = new Map(); private connected = false; @@ -60,24 +53,99 @@ export class WebKitClient extends EventEmitter implements BrowserBackend { private activeTargetId: string | null = null; private knownTargets: Set = new Set(); private innerMessageId = 0; - private innerPendingRequests: Map< - number, - { - resolve: (value: any) => void; - reject: (error: Error) => void; - timer: ReturnType; - } - > = new Map(); private targetReady: Promise | null = null; private targetReadyResolve: (() => void) | null = null; constructor(private options: WebKitClientOptions) { super(); + this.transport = new WebSocketProtocolTransport({ + connectTimeout: options.connectTimeout, + sendTimeout: options.sendTimeout, + }); + this.bindTransportEvents(); } getHost(): string { return this.options.host; } getPort(): number { return this.options.port; } + // ========== Transport Event Binding ========== + + /** + * Forward all transport-emitted protocol events (domain events, target + * lifecycle events) to this EventEmitter so that callers using + * `client.on('Page.loadEventFired', ...)` continue to work. + * + * Subscribes via the formal `onProtocolEvent` channel rather than monkey- + * patching the transport's `emit` — the channel only relays raw RDP + * events, so transport lifecycle (`transport:close`, `transport:error`) + * and EventEmitter housekeeping (`newListener`, `removeListener`) never + * leak through and stay scoped to this client. + */ + private bindTransportEvents(): void { + this.transport.onProtocolEvent((event, ...args) => { + this.emit(event, ...args); + }); + + this.transport.on('transport:close', () => { + if (this.connected && !this.reconnecting) { + this.connected = false; + this.handleDisconnect(); + } + }); + + this.transport.on('transport:error', (_err: Error) => { + // Error already handled — connection state tracked via transport:close + }); + + // Target lifecycle events emitted by the transport + this.transport.on('Target.targetCreated', (params: any) => { + this.handleTargetCreated(params); + }); + + this.transport.on('Target.targetDestroyed', (params: any) => { + this.handleTargetDestroyed(params); + }); + } + + // ========== Target Lifecycle Handlers (moved from handleMessage) ========== + + private handleTargetCreated(params: any): void { + const info = params?.targetInfo; + if (info?.type !== 'page') return; + + this.knownTargets.add(info.targetId); + this.emit('target:created', { targetId: info.targetId, url: info.url }); + + if (!this.activeTargetId) { + this.activeTargetId = info.targetId; + } + + const globalDomains = [...this.enabledDomains]; + const perTargetDomains = this.enabledDomainsPerTarget.get(info.targetId); + const domainsToEnable = new Set([...globalDomains, ...(perTargetDomains ?? [])]); + Promise.all( + [...domainsToEnable].map(domain => + this.sendToTarget(`${domain}.enable`, undefined, info.targetId).catch(err => { + console.error(`[WebKitClient] Failed to re-enable ${domain} on new target: ${(err as Error).message}`); + }) + ) + ).then(() => { + this.targetReadyResolve?.(); + }); + } + + private handleTargetDestroyed(params: any): void { + const destroyedId = params?.targetId; + this.knownTargets.delete(destroyedId); + this.enabledDomainsPerTarget.delete(destroyedId); + this.emit('target:destroyed', { targetId: destroyedId }); + if (destroyedId === this.activeTargetId) { + this.activeTargetId = this.knownTargets.size > 0 + ? this.knownTargets.values().next().value ?? null + : null; + } + } + /** * Connect directly to a specific WebSocket URL (e.g., per-tab endpoint). */ @@ -134,14 +202,7 @@ export class WebKitClient extends EventEmitter implements BrowserBackend { async disconnect(): Promise { this.stopHeartbeat(); - this.clearPendingRequests(); - if (this.ws) { - this.ws.removeAllListeners(); - if (this.ws.readyState === WebSocket.OPEN) { - this.ws.close(); - } - this.ws = null; - } + await this.transport.disconnect(); this.connected = false; this.enabledDomains.clear(); this.enabledDomainsPerTarget.clear(); @@ -152,7 +213,7 @@ export class WebKitClient extends EventEmitter implements BrowserBackend { } isConnected(): boolean { - return this.connected && this.ws?.readyState === WebSocket.OPEN; + return this.transport.isConnected(); } // ========== Target Discovery ========== @@ -216,9 +277,6 @@ export class WebKitClient extends EventEmitter implements BrowserBackend { params?: Record, targetId?: string | null, ): Promise { - if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { - throw new ConnectionError('WebSocket not connected'); - } const resolvedTargetId = targetId ?? this.activeTargetId; if (!resolvedTargetId) { throw new ConnectionError( @@ -228,157 +286,9 @@ export class WebKitClient extends EventEmitter implements BrowserBackend { const innerId = ++this.innerMessageId; const outerId = ++this.messageId; - const timeout = - this.options.sendTimeout ?? DEFAULT_WEBKIT_SEND_TIMEOUT_MS; - - const innerPromise = new Promise((resolve, reject) => { - const timer = setTimeout(() => { - this.innerPendingRequests.delete(innerId); - reject(new TimeoutError(`${method} timed out after ${timeout}ms`)); - }, timeout); - this.innerPendingRequests.set(innerId, { resolve, reject, timer }); - }); - - // Track outer message for error propagation (e.g., invalid targetId) - const outerTimer = setTimeout(() => { - this.pendingRequests.delete(outerId); - }, timeout); - this.pendingRequests.set(outerId, { - resolve: () => { /* outer ack ignored — real response via dispatchMessageFromTarget */ }, - reject: (err: Error) => { - // Outer error means inner will never resolve — reject inner too - const innerPending = this.innerPendingRequests.get(innerId); - if (innerPending) { - clearTimeout(innerPending.timer); - this.innerPendingRequests.delete(innerId); - innerPending.reject(err); - } - }, - timer: outerTimer, - }); - - // Wrap in Target.sendMessageToTarget - const innerMessage = JSON.stringify({ id: innerId, method, params }); - this.ws.send( - JSON.stringify({ - id: outerId, - method: 'Target.sendMessageToTarget', - params: { targetId: resolvedTargetId, message: innerMessage }, - }), - ); + const timeout = this.options.sendTimeout ?? DEFAULT_WEBKIT_SEND_TIMEOUT_MS; - return innerPromise; - } - - private handleMessage(data: string): void { - let msg: any; - try { - msg = JSON.parse(data); - } catch { - return; - } - - // Handle Target events (multiplexing protocol) - if (msg.method === 'Target.targetCreated') { - const info = msg.params?.targetInfo; - if (info?.type === 'page') { - this.knownTargets.add(info.targetId); - this.emit('target:created', { targetId: info.targetId, url: info.url }); - - // Set as active target only if none is active yet (single-tab compat) - // In multi-tab mode, callers manage targets explicitly via TabPool - if (!this.activeTargetId) { - this.activeTargetId = info.targetId; - } - - // Re-enable domains on new target (e.g., after navigation destroys old target) - // Merge global domains + any per-target domains that were tracked for this target - const globalDomains = [...this.enabledDomains]; - const perTargetDomains = this.enabledDomainsPerTarget.get(info.targetId); - const domainsToEnable = new Set([...globalDomains, ...(perTargetDomains ?? [])]); - // Re-enable domains then signal target readiness - Promise.all( - [...domainsToEnable].map(domain => - this.sendToTarget(`${domain}.enable`, undefined, info.targetId).catch(err => { - console.error(`[WebKitClient] Failed to re-enable ${domain} on new target: ${(err as Error).message}`); - }) - ) - ).then(() => { - this.targetReadyResolve?.(); - }); - return; - } - return; - } - - if (msg.method === 'Target.targetDestroyed') { - const destroyedId = msg.params?.targetId; - this.knownTargets.delete(destroyedId); - this.enabledDomainsPerTarget.delete(destroyedId); - this.emit('target:destroyed', { targetId: destroyedId }); - if (destroyedId === this.activeTargetId) { - // Fallback to another known target, or null - this.activeTargetId = this.knownTargets.size > 0 - ? this.knownTargets.values().next().value ?? null - : null; - } - return; - } - - if (msg.method === 'Target.dispatchMessageFromTarget') { - // This contains the REAL response to our domain commands - let innerMsg: any; - try { - innerMsg = JSON.parse(msg.params.message); - } catch { - return; - } - if (innerMsg.id !== undefined) { - const pending = this.innerPendingRequests.get(innerMsg.id); - if (pending) { - clearTimeout(pending.timer); - this.innerPendingRequests.delete(innerMsg.id); - if (innerMsg.error) { - pending.reject( - new ProtocolError( - innerMsg.error.message ?? JSON.stringify(innerMsg.error), - innerMsg.error.code, - ), - ); - } else { - pending.resolve(innerMsg.result); - } - } - } else if (innerMsg.method) { - // Inner event (e.g., Page.loadEventFired, Runtime.consoleAPICalled) - // Include targetId so multi-tab consumers can filter by target - const sourceTargetId = msg.params.targetId; - this.emit(innerMsg.method, innerMsg.params, { targetId: sourceTargetId }); - } - return; - } - - if (msg.id !== undefined) { - // Outer ack response to Target.sendMessageToTarget — just clean up - const pending = this.pendingRequests.get(msg.id); - if (pending) { - clearTimeout(pending.timer); - this.pendingRequests.delete(msg.id); - // Don't resolve/reject caller — real response comes via dispatchMessageFromTarget - // But if there's an outer error (e.g., invalid targetId), propagate it - if (msg.error) { - pending.reject( - new ProtocolError( - msg.error.message ?? JSON.stringify(msg.error), - msg.error.code, - ), - ); - } - } - } else if (msg.method) { - // Other event notifications not handled above - this.emit(msg.method, msg.params); - } + return this.transport.sendToTarget(method, params, resolvedTargetId, innerId, outerId, timeout); } // ========== Domain Management ========== @@ -451,12 +361,11 @@ export class WebKitClient extends EventEmitter implements BrowserBackend { this.reconnecting = true; this.connected = false; - // Clear stale inner requests before reconnect - for (const [, pending] of this.innerPendingRequests) { - clearTimeout(pending.timer); - pending.reject(new ConnectionError('Connection lost during reconnect')); - } - this.innerPendingRequests.clear(); + // Tear down the transport so any stale pending requests are rejected + // before reconnecting. `disconnect()` is the documented public path — + // it clears pending state and releases the socket without leaning on + // any concrete transport implementation. + await this.transport.disconnect(); this.activeTargetId = null; this.knownTargets.clear(); @@ -1197,46 +1106,9 @@ export class WebKitClient extends EventEmitter implements BrowserBackend { this.targetReadyResolve = resolve; }); - await new Promise((resolve, reject) => { - const connectTimeout = - this.options.connectTimeout ?? DEFAULT_WEBKIT_CONNECT_TIMEOUT_MS; - - const timeout = setTimeout(() => { - reject( - new ConnectionError( - `Connection timeout after ${connectTimeout}ms`, - ), - ); - }, connectTimeout); - - const ws = new WebSocket(wsUrl); - - ws.on('open', () => { - clearTimeout(timeout); - this.ws = ws; - this.connected = true; - this.startHeartbeat(); - resolve(); - }); - - ws.on('message', (data: WebSocket.Data) => { - this.handleMessage(data.toString()); - }); - - ws.on('close', () => { - if (this.connected && !this.reconnecting) { - this.connected = false; - this.handleDisconnect(); - } - }); - - ws.on('error', (err: Error) => { - clearTimeout(timeout); - if (!this.connected) { - reject(new ConnectionError(`WebSocket error: ${err.message}`)); - } - }); - }); + await this.transport.connect(wsUrl); + this.connected = true; + this.startHeartbeat(); // Wait for first page target to be discovered const connectTimeout = @@ -1255,20 +1127,6 @@ export class WebKitClient extends EventEmitter implements BrowserBackend { } } - private clearPendingRequests(): void { - for (const [, req] of this.pendingRequests) { - clearTimeout(req.timer); - req.reject(new ConnectionError('Connection closed')); - } - this.pendingRequests.clear(); - - for (const [, req] of this.innerPendingRequests) { - clearTimeout(req.timer); - req.reject(new ConnectionError('Connection closed')); - } - this.innerPendingRequests.clear(); - } - private httpGet(url: string): Promise { return new Promise((resolve, reject) => { http diff --git a/src/webkit/protocol-transport.ts b/src/webkit/protocol-transport.ts new file mode 100644 index 00000000..266dfe6a --- /dev/null +++ b/src/webkit/protocol-transport.ts @@ -0,0 +1,339 @@ +/** + * ProtocolTransport — WebSocket / RDP message-passing layer for WebKit Remote Debugging Protocol. + * + * Extracted from client.ts (#706 2/5). Behavior-preserving; same wire protocol; same error semantics. + * + * Owns: + * - WebSocket connection lifecycle (connect, disconnect, isConnected) + * - Outer pending-request map (Target.sendMessageToTarget acks) + * - Inner pending-request map (dispatchMessageFromTarget responses) + * - Message-ID generation (outer + inner) + * - Response routing / event emission + * + * Does NOT own: target lifecycle, domain management, heartbeat, browser commands. + */ + +import WebSocket from 'ws'; +import { EventEmitter } from 'events'; +import { ConnectionError, TimeoutError, ProtocolError } from './errors'; +import { DEFAULT_WEBKIT_CONNECT_TIMEOUT_MS } from '../config/defaults'; + +// ========== Interface ========== + +/** Handler signature for raw RDP protocol events relayed by the transport. */ +export type ProtocolEventHandler = ( + event: string, + ...args: unknown[] +) => void; + +/** + * Adapter interface used by WebKitClient to interact with the transport layer. + * Using an interface avoids circular dependencies and allows test doubles. + */ +export interface ProtocolTransport extends NodeJS.EventEmitter { + /** Connect to a specific WebSocket URL. Resolves when the socket is open. */ + connect(wsUrl: string): Promise; + + /** + * Close the WebSocket and reject all pending requests. The transport also + * rejects pending requests automatically when the underlying socket emits + * `close`, so callers do not need to clear pending state out-of-band. + */ + disconnect(): Promise; + + /** True when the WebSocket is in OPEN state. */ + isConnected(): boolean; + + /** + * Subscribe to RDP protocol events relayed by the transport (e.g. + * `Page.loadEventFired`, `Target.targetCreated`). Lifecycle events + * (`transport:close`, `transport:error`) and EventEmitter housekeeping + * events (`newListener`, `removeListener`) are NOT delivered here — + * subscribe to those via the `EventEmitter` surface directly. + * + * Returns an unsubscribe function for symmetry with other reactive APIs. + */ + onProtocolEvent(handler: ProtocolEventHandler): () => void; + + /** + * Send a protocol command wrapped in Target.sendMessageToTarget. + * @param method RDP method name (e.g. "Page.navigate") + * @param params Optional method params + * @param targetId The page target to address + * @param innerMessageId Caller-allocated inner-message ID (avoids ID-space collision) + * @param outerMessageId Caller-allocated outer-message ID + * @param timeout Milliseconds before TimeoutError is thrown + */ + sendToTarget( + method: string, + params: Record | undefined, + targetId: string, + innerMessageId: number, + outerMessageId: number, + timeout: number, + ): Promise; +} + +// ========== Pending Request Slot ========== + +interface PendingSlot { + resolve: (value: any) => void; + reject: (err: Error) => void; + timer: ReturnType; +} + +// ========== Concrete Implementation ========== + +export interface WebSocketProtocolTransportOptions { + connectTimeout?: number; + sendTimeout?: number; +} + +/** + * Concrete ProtocolTransport backed by a WebSocket. + * Translates WebKit RDP wire-protocol messages into Promise resolution / EventEmitter events. + */ +export class WebSocketProtocolTransport extends EventEmitter implements ProtocolTransport { + private ws: WebSocket | null = null; + private _connected = false; + + /** Outer pending requests: Target.sendMessageToTarget ack tracking */ + private readonly outerPending: Map = new Map(); + + /** Inner pending requests: actual domain-command response tracking */ + private readonly innerPending: Map = new Map(); + + /** Subscribers registered via onProtocolEvent (raw RDP event relay). */ + private readonly protocolEventHandlers: Set = new Set(); + + constructor(private readonly options: WebSocketProtocolTransportOptions = {}) { + super(); + } + + // ========== Protocol Event Subscription ========== + + onProtocolEvent(handler: ProtocolEventHandler): () => void { + this.protocolEventHandlers.add(handler); + return () => { + this.protocolEventHandlers.delete(handler); + }; + } + + /** + * Emit an RDP protocol event on the EventEmitter surface AND notify every + * subscriber registered via `onProtocolEvent`. Used in place of the bare + * `this.emit(event, ...)` calls inside `handleMessage`. + */ + private emitProtocolEvent(event: string, ...args: unknown[]): void { + this.emit(event, ...args); + for (const handler of this.protocolEventHandlers) { + try { + handler(event, ...args); + } catch (err) { + // Surface listener errors via the existing `transport:error` + // lifecycle channel that WebKitClient already subscribes to. + // Avoid the standard `'error'` event because Node throws and + // terminates the process if no listener is registered, which + // would let a misbehaving subscriber crash the host even + // though the routing loop is otherwise resilient. + this.emit('transport:error', err as Error); + } + } + } + + // ========== Lifecycle ========== + + async connect(wsUrl: string): Promise { + const connectTimeout = + this.options.connectTimeout ?? DEFAULT_WEBKIT_CONNECT_TIMEOUT_MS; + + await new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + reject(new ConnectionError(`Connection timeout after ${connectTimeout}ms`)); + }, connectTimeout); + + const ws = new WebSocket(wsUrl); + + ws.on('open', () => { + clearTimeout(timeout); + this.ws = ws; + this._connected = true; + resolve(); + }); + + ws.on('message', (data: WebSocket.Data) => { + this.handleMessage(data.toString()); + }); + + ws.on('close', () => { + this._connected = false; + // Reject any awaiters immediately rather than letting them hang + // until their per-request timers expire. + this.clearPendingRequests(); + this.emit('transport:close'); + }); + + ws.on('error', (err: Error) => { + clearTimeout(timeout); + if (!this._connected) { + reject(new ConnectionError(`WebSocket error: ${err.message}`)); + } else { + this.emit('transport:error', err); + } + }); + }); + } + + async disconnect(): Promise { + this.clearPendingRequests(); + if (this.ws) { + this.ws.removeAllListeners(); + if (this.ws.readyState === WebSocket.OPEN) { + this.ws.close(); + } + this.ws = null; + } + this._connected = false; + } + + isConnected(): boolean { + return this._connected && this.ws?.readyState === WebSocket.OPEN; + } + + // ========== Send ========== + + sendToTarget( + method: string, + params: Record | undefined, + targetId: string, + innerMessageId: number, + outerMessageId: number, + timeout: number, + ): Promise { + if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { + return Promise.reject(new ConnectionError('WebSocket not connected')); + } + + const innerPromise = new Promise((resolve, reject) => { + const timer = setTimeout(() => { + this.innerPending.delete(innerMessageId); + reject(new TimeoutError(`${method} timed out after ${timeout}ms`)); + }, timeout); + this.innerPending.set(innerMessageId, { resolve, reject, timer }); + }); + + // Outer slot: tracks ack from Target.sendMessageToTarget. + // On outer error (e.g., invalid targetId), propagate to inner so the caller rejects. + const outerTimer = setTimeout(() => { + this.outerPending.delete(outerMessageId); + }, timeout); + + this.outerPending.set(outerMessageId, { + resolve: () => { /* ack only — real response comes via dispatchMessageFromTarget */ }, + reject: (err: Error) => { + const inner = this.innerPending.get(innerMessageId); + if (inner) { + clearTimeout(inner.timer); + this.innerPending.delete(innerMessageId); + inner.reject(err); + } + }, + timer: outerTimer, + }); + + const innerMessage = JSON.stringify({ id: innerMessageId, method, params }); + this.ws.send( + JSON.stringify({ + id: outerMessageId, + method: 'Target.sendMessageToTarget', + params: { targetId, message: innerMessage }, + }), + ); + + return innerPromise; + } + + // ========== Message Routing ========== + + private handleMessage(data: string): void { + let msg: any; + try { + msg = JSON.parse(data); + } catch { + return; + } + + // Multiplexed inner response from a page target + if (msg.method === 'Target.dispatchMessageFromTarget') { + let innerMsg: any; + try { + innerMsg = JSON.parse(msg.params.message); + } catch { + return; + } + + if (innerMsg.id !== undefined) { + const pending = this.innerPending.get(innerMsg.id); + if (pending) { + clearTimeout(pending.timer); + this.innerPending.delete(innerMsg.id); + if (innerMsg.error) { + pending.reject( + new ProtocolError( + innerMsg.error.message ?? JSON.stringify(innerMsg.error), + innerMsg.error.code, + ), + ); + } else { + pending.resolve(innerMsg.result); + } + } + } else if (innerMsg.method) { + // Inner event (e.g., Page.loadEventFired) — include targetId for multi-tab filtering + const sourceTargetId = msg.params.targetId; + this.emitProtocolEvent(innerMsg.method, innerMsg.params, { targetId: sourceTargetId }); + } + return; + } + + // Outer ack for Target.sendMessageToTarget + if (msg.id !== undefined) { + const pending = this.outerPending.get(msg.id); + if (pending) { + clearTimeout(pending.timer); + this.outerPending.delete(msg.id); + if (msg.error) { + pending.reject( + new ProtocolError( + msg.error.message ?? JSON.stringify(msg.error), + msg.error.code, + ), + ); + } + // No resolve path — outer ack does not carry the real response + } + return; + } + + // Non-multiplexed event (Target.targetCreated, Target.targetDestroyed, etc.) + if (msg.method) { + this.emitProtocolEvent(msg.method, msg.params); + } + } + + // ========== Internal Helpers ========== + + clearPendingRequests(): void { + for (const [, req] of this.outerPending) { + clearTimeout(req.timer); + req.reject(new ConnectionError('Connection closed')); + } + this.outerPending.clear(); + + for (const [, req] of this.innerPending) { + clearTimeout(req.timer); + req.reject(new ConnectionError('Connection closed')); + } + this.innerPending.clear(); + } +} diff --git a/tests/unit/protocol-transport.test.ts b/tests/unit/protocol-transport.test.ts new file mode 100644 index 00000000..4a6dd4da --- /dev/null +++ b/tests/unit/protocol-transport.test.ts @@ -0,0 +1,370 @@ +/** + * Tests for WebSocketProtocolTransport (#706 2/5). + * + * Covers: + * - send happy path (inner response resolves) + * - pending-request resolution for out-of-order messages + * - TimeoutError on stalled inner request + * - ConnectionError on connection failure + * - ProtocolError propagated from outer ack error (invalid targetId) + * - ProtocolError propagated from inner response error + * - Events emitted for non-multiplexed protocol events + * - Inner events (domain events) forwarded with targetId metadata + * - disconnect rejects all pending requests + */ + +import { EventEmitter } from 'events'; +import { WebSocketProtocolTransport } from '../../src/webkit/protocol-transport'; +import { ConnectionError, TimeoutError, ProtocolError } from '../../src/webkit/errors'; + +// ─── Minimal WebSocket stub ─────────────────────────────────────────────────── + +class FakeWebSocket extends EventEmitter { + static OPEN = 1; + static CLOSED = 3; + + readyState = FakeWebSocket.OPEN; + sent: string[] = []; + + send(data: string): void { + this.sent.push(data); + } + + close(): void { + this.readyState = FakeWebSocket.CLOSED; + this.emit('close'); + } + + /** Helper: simulate an inbound raw message from the WebKit proxy. */ + receive(payload: object): void { + this.emit('message', JSON.stringify(payload)); + } +} + +// ─── Test helpers ───────────────────────────────────────────────────────────── + +/** Build a transport that is already "connected" to a FakeWebSocket. */ +function makeConnectedTransport(opts: { sendTimeout?: number } = {}): { + transport: WebSocketProtocolTransport; + ws: FakeWebSocket; +} { + const transport = new WebSocketProtocolTransport({ sendTimeout: opts.sendTimeout ?? 200 }); + const ws = new FakeWebSocket(); + + // Inject ws without going through real WebSocket constructor + (transport as any).ws = ws; + (transport as any)._connected = true; + + // Wire message routing: transport.handleMessage is private; invoke via ws 'message' event + ws.on('message', (data: string) => { + (transport as any).handleMessage(data); + }); + + // Wire close event — mirrors the production handler installed inside + // `WebSocketProtocolTransport.connect`: flip the connected flag, reject + // every pending request, then emit the lifecycle event. + ws.on('close', () => { + (transport as any)._connected = false; + (transport as any).clearPendingRequests(); + transport.emit('transport:close'); + }); + + return { transport, ws }; +} + +/** Parse the most recent message sent over the fake WebSocket. */ +function lastSent(ws: FakeWebSocket): any { + return JSON.parse(ws.sent[ws.sent.length - 1]); +} + +/** Simulate a successful dispatchMessageFromTarget response. */ +function dispatchResponse(ws: FakeWebSocket, innerId: number, result: unknown): void { + ws.receive({ + method: 'Target.dispatchMessageFromTarget', + params: { + targetId: 'target-1', + message: JSON.stringify({ id: innerId, result }), + }, + }); +} + +/** Simulate a dispatchMessageFromTarget error response. */ +function dispatchError(ws: FakeWebSocket, innerId: number, message: string, code?: number): void { + ws.receive({ + method: 'Target.dispatchMessageFromTarget', + params: { + targetId: 'target-1', + message: JSON.stringify({ id: innerId, error: { message, code } }), + }, + }); +} + +/** Simulate an outer ack error (e.g., invalid targetId). */ +function outerError(ws: FakeWebSocket, outerId: number, message: string, code?: number): void { + ws.receive({ id: outerId, error: { message, code } }); +} + +// ─── Tests ──────────────────────────────────────────────────────────────────── + +describe('WebSocketProtocolTransport', () => { + beforeEach(() => { + jest.useFakeTimers(); + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + // ── Happy path ───────────────────────────────────────────────────────────── + + it('resolves with result when dispatchMessageFromTarget carries matching innerId', async () => { + const { transport, ws } = makeConnectedTransport(); + + const promise = transport.sendToTarget('Runtime.evaluate', { expression: '1+1' }, 'target-1', 1, 101, 500); + + // Verify wire format + const sent = lastSent(ws); + expect(sent.method).toBe('Target.sendMessageToTarget'); + expect(sent.params.targetId).toBe('target-1'); + const inner = JSON.parse(sent.params.message); + expect(inner.id).toBe(1); + expect(inner.method).toBe('Runtime.evaluate'); + + // Deliver response + dispatchResponse(ws, 1, { result: { value: 2 } }); + + const result = await promise; + expect(result).toEqual({ result: { value: 2 } }); + }); + + // ── Out-of-order resolution ──────────────────────────────────────────────── + + it('routes responses to the correct pending request when out of order', async () => { + const { transport, ws } = makeConnectedTransport(); + + const p1 = transport.sendToTarget('Page.navigate', { url: 'https://a.com' }, 'target-1', 10, 201, 500); + const p2 = transport.sendToTarget('Page.navigate', { url: 'https://b.com' }, 'target-1', 11, 202, 500); + + // Deliver in reverse order + dispatchResponse(ws, 11, { frameId: 'b' }); + dispatchResponse(ws, 10, { frameId: 'a' }); + + const [r1, r2] = await Promise.all([p1, p2]); + expect((r1 as any).frameId).toBe('a'); + expect((r2 as any).frameId).toBe('b'); + }); + + // ── Timeout ─────────────────────────────────────────────────────────────── + + it('rejects with TimeoutError when inner response never arrives', async () => { + const { transport } = makeConnectedTransport({ sendTimeout: 100 }); + + const promise = transport.sendToTarget('Page.navigate', {}, 'target-1', 20, 301, 100); + + jest.advanceTimersByTime(101); + + await expect(promise).rejects.toBeInstanceOf(TimeoutError); + await expect(promise).rejects.toMatchObject({ message: expect.stringContaining('timed out after 100ms') }); + }); + + // ── Connection failure ──────────────────────────────────────────────────── + + it('rejects immediately with ConnectionError when WebSocket is not open', async () => { + const transport = new WebSocketProtocolTransport(); + // No ws set — transport not connected + + await expect( + transport.sendToTarget('Runtime.evaluate', {}, 'target-1', 1, 1, 500), + ).rejects.toBeInstanceOf(ConnectionError); + }); + + it('rejects with ConnectionError when connect() times out', async () => { + const transport = new WebSocketProtocolTransport({ connectTimeout: 50 }); + + // connect() opens a real WebSocket — we short-circuit by advancing timers + // We need a URL that never connects; the timeout fires before 'open' + const connectPromise = transport.connect('ws://127.0.0.1:19999/devtools/page/never'); + + jest.advanceTimersByTime(51); + + await expect(connectPromise).rejects.toBeInstanceOf(ConnectionError); + await expect(connectPromise).rejects.toMatchObject({ message: expect.stringContaining('timeout') }); + }); + + // ── Outer ack error (invalid targetId) ─────────────────────────────────── + + it('propagates outer ack ProtocolError to the inner pending promise', async () => { + const { transport, ws } = makeConnectedTransport(); + + const promise = transport.sendToTarget('Runtime.evaluate', {}, 'bad-target', 30, 401, 500); + + outerError(ws, 401, 'No target with given id found', -32000); + + await expect(promise).rejects.toBeInstanceOf(ProtocolError); + await expect(promise).rejects.toMatchObject({ message: 'No target with given id found', code: -32000 }); + }); + + // ── Inner response error ────────────────────────────────────────────────── + + it('rejects with ProtocolError when inner response carries an error', async () => { + const { transport, ws } = makeConnectedTransport(); + + const promise = transport.sendToTarget('Page.unknown', {}, 'target-1', 40, 501, 500); + + dispatchError(ws, 40, "'unknown' was not found", -32601); + + await expect(promise).rejects.toBeInstanceOf(ProtocolError); + await expect(promise).rejects.toMatchObject({ message: "'unknown' was not found", code: -32601 }); + }); + + // ── Non-multiplexed event emission ─────────────────────────────────────── + + it('emits non-multiplexed protocol events directly', async () => { + const { transport, ws } = makeConnectedTransport(); + + const received: any[] = []; + transport.on('Target.targetCreated', (params: any) => received.push(params)); + + ws.receive({ + method: 'Target.targetCreated', + params: { targetInfo: { targetId: 'new-target', type: 'page', url: 'about:blank' } }, + }); + + expect(received).toHaveLength(1); + expect(received[0].targetInfo.targetId).toBe('new-target'); + }); + + // ── Inner domain event emission ─────────────────────────────────────────── + + it('emits inner domain events with targetId metadata', async () => { + const { transport, ws } = makeConnectedTransport(); + + const received: Array<{ params: any; meta: any }> = []; + transport.on('Page.loadEventFired', (params: any, meta: any) => { + received.push({ params, meta }); + }); + + ws.receive({ + method: 'Target.dispatchMessageFromTarget', + params: { + targetId: 'target-1', + message: JSON.stringify({ method: 'Page.loadEventFired', params: { timestamp: 1234 } }), + }, + }); + + expect(received).toHaveLength(1); + expect(received[0].params).toEqual({ timestamp: 1234 }); + expect(received[0].meta).toEqual({ targetId: 'target-1' }); + }); + + // ── disconnect rejects pending ──────────────────────────────────────────── + + it('rejects all pending requests with ConnectionError on disconnect()', async () => { + const { transport } = makeConnectedTransport(); + + const p1 = transport.sendToTarget('Runtime.evaluate', {}, 'target-1', 50, 601, 5000); + const p2 = transport.sendToTarget('Page.navigate', {}, 'target-1', 51, 602, 5000); + + await transport.disconnect(); + + await expect(p1).rejects.toBeInstanceOf(ConnectionError); + await expect(p2).rejects.toBeInstanceOf(ConnectionError); + }); + + // ── isConnected ─────────────────────────────────────────────────────────── + + it('returns true when ws is OPEN, false after disconnect', async () => { + const { transport } = makeConnectedTransport(); + expect(transport.isConnected()).toBe(true); + + await transport.disconnect(); + expect(transport.isConnected()).toBe(false); + }); + + // ── ws close auto-clears pending requests ───────────────────────────────── + + it('rejects pending requests when the underlying socket closes', async () => { + const { transport, ws } = makeConnectedTransport({ sendTimeout: 10_000 }); + + const p1 = transport.sendToTarget('Runtime.evaluate', {}, 'target-1', 60, 701, 10_000); + const p2 = transport.sendToTarget('Page.navigate', {}, 'target-1', 61, 702, 10_000); + + // Trip ws close without going through transport.disconnect() — + // pending awaiters should reject immediately rather than wait + // for their per-request timers to expire. + ws.close(); + + await expect(p1).rejects.toBeInstanceOf(ConnectionError); + await expect(p2).rejects.toBeInstanceOf(ConnectionError); + }); + + // ── onProtocolEvent subscription ────────────────────────────────────────── + + it('relays raw RDP events via onProtocolEvent without leaking lifecycle events', () => { + const { transport, ws } = makeConnectedTransport(); + + const received: Array<{ event: string; args: unknown[] }> = []; + const unsubscribe = transport.onProtocolEvent((event, ...args) => { + received.push({ event, args }); + }); + + // Non-multiplexed event + ws.receive({ + method: 'Target.targetCreated', + params: { targetInfo: { targetId: 't-1', type: 'page', url: 'about:blank' } }, + }); + + // Inner (dispatchMessageFromTarget) event + ws.receive({ + method: 'Target.dispatchMessageFromTarget', + params: { + targetId: 't-1', + message: JSON.stringify({ method: 'Page.loadEventFired', params: { timestamp: 1 } }), + }, + }); + + // Lifecycle event — must NOT be relayed by onProtocolEvent + transport.emit('transport:close'); + + const events = received.map(r => r.event); + expect(events).toEqual(['Target.targetCreated', 'Page.loadEventFired']); + expect(received[1].args[1]).toEqual({ targetId: 't-1' }); + + // Unsubscribe stops further deliveries + unsubscribe(); + ws.receive({ + method: 'Target.targetDestroyed', + params: { targetId: 't-1' }, + }); + expect(received).toHaveLength(2); + }); + + it('routes a throwing onProtocolEvent handler through transport:error without using the standard error channel', () => { + const { transport, ws } = makeConnectedTransport(); + + const transportErrors: Error[] = []; + transport.on('transport:error', err => transportErrors.push(err)); + + const errorListenerCalls: Error[] = []; + transport.on('error', err => errorListenerCalls.push(err)); + + const boom = new Error('listener boom'); + transport.onProtocolEvent(() => { + throw boom; + }); + + // Drive any protocol event — the handler will throw on delivery. + expect(() => { + ws.receive({ + method: 'Target.targetCreated', + params: { targetInfo: { targetId: 't-x', type: 'page', url: 'about:blank' } }, + }); + }).not.toThrow(); + + // Failure surfaced on the lifecycle channel that the client subscribes to, + // not via Node's default 'error' event (which would kill the process when + // no listener is registered). + expect(transportErrors).toEqual([boom]); + expect(errorListenerCalls).toEqual([]); + }); +});