diff --git a/README.md b/README.md index 3f436fc..fc9958a 100644 --- a/README.md +++ b/README.md @@ -692,27 +692,36 @@ Note that you should not use a `Window` object itself as a port for RPC -- you s ### Custom transports -You can implement a custom RPC transport across any bidirectional stream. To do so, implement the interface `RpcTransport`, which is defined as follows: +You can implement a custom RPC transport across any bidirectional stream. For string-based transports, implement `RpcTransport`: ```ts -// Interface for an RPC transport, which is a simple bidirectional message stream. export interface RpcTransport { - // Sends a message to the other end. - send(message: string): Promise; + // Sends a JSON string message. Returns the byte size of the message. + // Transport errors should be propagated via receive() rejecting. + send(message: string): number; // Receives a message sent by the other end. - // - // If and when the transport becomes disconnected, this will reject. The thrown error will be - // propagated to all outstanding calls and future calls on any stubs associated with the session. - // If there are no outstanding calls (and none are made in the future), then the error does not - // propagate anywhere -- this is considered a "clean" shutdown. receive(): Promise; - // Indicates that the RPC system has suffered an error that prevents the session from continuing. - // The transport should ideally try to send any queued messages if it can, and then close the - // connection. (It's not strictly necessary to deliver queued messages, but the last message sent - // before abort() is called is often an "abort" message, which communicates the error to the - // peer, so if that is dropped, the peer may have less information about what happened.) + // Called when the RPC system needs to abort the session. + abort?(reason: any): void; +} +``` + +For transports with custom encoding (CBOR, MessagePack, structured clone, etc.), implement `RpcTransportWithCustomEncoding`: + +```ts +export interface RpcTransportWithCustomEncoding { + // Declares what encoding level this transport uses. + readonly encodingLevel: "json" | "jsonWithBytes" | "structuredClone"; + + // Encodes and sends a message. Returns the encoded byte size if known + // (for flow control), or void if unavailable (e.g. structured clone). + send(message: unknown): number | void; + + // Receives and decodes a message. + receive(): Promise; + abort?(reason: any): void; } ``` @@ -735,4 +744,33 @@ let stub: RemoteMainInterface = session.getRemoteMain(); // Now we can call methods on the stub. ``` -Note that sessions are entirely symmetric: neither side is defined as the "client" nor the "server". Each side can optionally expose a "main interface" to the other. In typical scenarios with a logical client and server, the server exposes a main interface but the client does not. +Note that sessions are entirely symmetric: neither side is defined as the "client" nor the "server". Each side can optionally expose a "main interface" to the other. In typical scenarios with a logical client and server, the server exposes a main interface but the client does not.ś + +#### Custom encoding levels + +By default, `RpcTransport` sends and receives JSON strings. To use a binary format like CBOR or MessagePack, implement `RpcTransportWithCustomEncoding` instead, which declares an `encodingLevel` so the RPC system knows how much serialization to do before handing messages to the transport: + +```ts +import { RpcTransportWithCustomEncoding, RpcSession } from "capnweb"; +import * as cbor from "cbor-x"; + +class CborTransport implements RpcTransportWithCustomEncoding { + readonly encodingLevel = "jsonWithBytes"; // Uint8Array stays raw for CBOR + + send(msg: unknown): number { + const encoded = cbor.encode(msg); + this.ws.send(encoded); + return encoded.byteLength; + } + + async receive(): Promise { + return cbor.decode(new Uint8Array(await this.nextMessage())); + } + + abort(reason: any) { this.ws.close(3000, String(reason)); } +} + +const session = new RpcSession(new CborTransport(ws)); +``` + +The available encoding levels are `"json"` (JSON-compatible object tree), `"jsonWithBytes"` (same but `Uint8Array` stays raw), and `"structuredClone"` (native types like `Date`, `BigInt`, `Error` also pass through). The built-in `MessagePort` transport uses `"structuredClone"` automatically. diff --git a/__tests__/index.test.ts b/__tests__/index.test.ts index 35c68c2..f82767b 100644 --- a/__tests__/index.test.ts +++ b/__tests__/index.test.ts @@ -144,7 +144,7 @@ class TestTransport implements RpcTransport { public log = false; private fenced = false; - async send(message: string): Promise { + send(message: string): void { // HACK: If the string "$remove$" appears in the message, remove it. This is used in some // tests to hack the RPC protocol. message = message.replaceAll("$remove$", ""); @@ -1846,7 +1846,7 @@ describe("WritableStream over RPC", () => { // Collect all messages sent by the server (which appear in the client's queue). let serverMessages: any[] = []; let origServerSend = harness.serverTransport.send; - harness.serverTransport.send = async function(message: string) { + harness.serverTransport.send = function(message: string) { serverMessages.push(JSON.parse(message)); return origServerSend.call(this, message); }; @@ -1854,7 +1854,7 @@ describe("WritableStream over RPC", () => { // Collect all messages sent by the client (which appear in the server's queue). let clientMessages: any[] = []; let origClientSend = harness.clientTransport.send; - harness.clientTransport.send = async function(message: string) { + harness.clientTransport.send = function(message: string) { clientMessages.push(JSON.parse(message)); return origClientSend.call(this, message); }; diff --git a/src/batch.ts b/src/batch.ts index 4f423c7..81927cf 100644 --- a/src/batch.ts +++ b/src/batch.ts @@ -19,7 +19,7 @@ class BatchClientTransport implements RpcTransport { #batchToSend: string[] | null = []; #batchToReceive: string[] | null = null; - async send(message: string): Promise { + send(message: string): void { // If the batch was already sent, we just ignore the message, because throwing may cause the // RPC system to abort prematurely. Once the last receive() is done then we'll throw an error // that aborts the RPC system at the right time and will propagate to all other requests. @@ -98,8 +98,9 @@ class BatchServerTransport implements RpcTransport { #batchToReceive: string[]; #allReceived: PromiseWithResolvers = Promise.withResolvers(); - async send(message: string): Promise { + send(message: string): number { this.#batchToSend.push(message); + return message.length; } async receive(): Promise { diff --git a/src/index.ts b/src/index.ts index 10e4b7a..5dfc35a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,8 +3,8 @@ // https://opensource.org/license/mit import { RpcTarget as RpcTargetImpl, RpcStub as RpcStubImpl, RpcPromise as RpcPromiseImpl } from "./core.js"; -import { serialize, deserialize } from "./serialize.js"; -import { RpcTransport, RpcSession as RpcSessionImpl, RpcSessionOptions } from "./rpc.js"; +import { serialize, deserialize, EncodingLevel } from "./serialize.js"; +import { RpcTransport, RpcTransportWithCustomEncoding, AnyRpcTransport, RpcSession as RpcSessionImpl, RpcSessionOptions } from "./rpc.js"; import { RpcTargetBranded, RpcCompatible, Stub, Stubify, __RPC_TARGET_BRAND } from "./types.js"; import { newWebSocketRpcSession as newWebSocketRpcSessionImpl, newWorkersWebSocketRpcResponse } from "./websocket.js"; @@ -20,7 +20,8 @@ forceInitStreams(); // Re-export public API types. export { serialize, deserialize, newWorkersWebSocketRpcResponse, newHttpBatchRpcResponse, nodeHttpBatchRpcResponse }; -export type { RpcTransport, RpcSessionOptions, RpcCompatible }; +export type { RpcTransport, RpcTransportWithCustomEncoding, AnyRpcTransport, + RpcSessionOptions, RpcCompatible, EncodingLevel }; // Hack the type system to make RpcStub's types work nicely! /** @@ -76,7 +77,7 @@ export interface RpcSession = undefined> { } export const RpcSession: { new = undefined>( - transport: RpcTransport, localMain?: any, options?: RpcSessionOptions): RpcSession; + transport: AnyRpcTransport, localMain?: any, options?: RpcSessionOptions): RpcSession; } = RpcSessionImpl; // RpcTarget needs some hackage too to brand it properly and account for the implementation diff --git a/src/messageport.ts b/src/messageport.ts index f9b5a3c..2086e40 100644 --- a/src/messageport.ts +++ b/src/messageport.ts @@ -3,7 +3,7 @@ // https://opensource.org/license/mit import { RpcStub } from "./core.js"; -import { RpcTransport, RpcSession, RpcSessionOptions } from "./rpc.js"; +import { RpcTransportWithCustomEncoding, RpcSession, RpcSessionOptions } from "./rpc.js"; // Start a MessagePort session given a MessagePort or a pair of MessagePorts. // @@ -16,7 +16,9 @@ export function newMessagePortRpcSession( return rpc.getRemoteMain(); } -class MessagePortTransport implements RpcTransport { +class MessagePortTransport implements RpcTransportWithCustomEncoding { + readonly encodingLevel = "structuredClone" as const; + constructor (port: MessagePort) { this.#port = port; @@ -29,7 +31,8 @@ class MessagePortTransport implements RpcTransport { } else if (event.data === null) { // Peer is signaling that they're closing the connection this.#receivedError(new Error("Peer closed MessagePort connection.")); - } else if (typeof event.data === "string") { + } else { + // Accept any structured-clonable data if (this.#receiveResolver) { this.#receiveResolver(event.data); this.#receiveResolver = undefined; @@ -37,8 +40,6 @@ class MessagePortTransport implements RpcTransport { } else { this.#receiveQueue.push(event.data); } - } else { - this.#receivedError(new TypeError("Received non-string message from MessagePort.")); } }); @@ -48,32 +49,32 @@ class MessagePortTransport implements RpcTransport { } #port: MessagePort; - #receiveResolver?: (message: string) => void; + #receiveResolver?: (message: unknown) => void; #receiveRejecter?: (err: any) => void; - #receiveQueue: string[] = []; + #receiveQueue: unknown[] = []; #error?: any; - async send(message: string): Promise { + send(message: unknown): void { if (this.#error) { throw this.#error; } this.#port.postMessage(message); } - async receive(): Promise { + async receive(): Promise { if (this.#receiveQueue.length > 0) { return this.#receiveQueue.shift()!; } else if (this.#error) { throw this.#error; } else { - return new Promise((resolve, reject) => { + return new Promise((resolve, reject) => { this.#receiveResolver = resolve; this.#receiveRejecter = reject; }); } } - abort?(reason: any): void { + abort(reason: any): void { // Send close signal to peer before closing try { this.#port.postMessage(null); @@ -99,4 +100,4 @@ class MessagePortTransport implements RpcTransport { } } } -} \ No newline at end of file +} diff --git a/src/rpc.ts b/src/rpc.ts index 4801a27..dcc3a31 100644 --- a/src/rpc.ts +++ b/src/rpc.ts @@ -3,17 +3,18 @@ // https://opensource.org/license/mit import { StubHook, RpcPayload, RpcStub, PropertyPath, PayloadStubHook, ErrorStubHook, RpcTarget, unwrapStubAndPath, streamImpl } from "./core.js"; -import { Devaluator, Evaluator, ExportId, ImportId, Exporter, Importer, serialize } from "./serialize.js"; +import { Devaluator, Evaluator, ExportId, ImportId, Exporter, Importer, serialize, EncodingLevel } from "./serialize.js"; /** - * Interface for an RPC transport, which is a simple bidirectional message stream. Implement this - * interface if the built-in transports (e.g. for HTTP batch and WebSocket) don't meet your needs. + * Interface for a string-based RPC transport. This is the default transport type — no + * `encodingLevel` field is needed. Messages are JSON strings. Implement this interface if the + * built-in transports (e.g. for HTTP batch and WebSocket) don't meet your needs. */ export interface RpcTransport { /** * Sends a message to the other end. */ - send(message: string): Promise; + send(message: string): void; /** * Receives a message sent by the other end. @@ -35,6 +36,52 @@ export interface RpcTransport { abort?(reason: any): void; } +/** + * Interface for a transport with custom binary encoding (e.g. CBOR, MessagePack). The transport + * is responsible for encoding/decoding messages and reporting the encoded byte size for flow + * control. + */ +export interface RpcTransportWithCustomEncoding { + /** + * The encoding level this transport works with. + * + * - "json": Transport encodes/decodes JS objects (JSON-compatible). + * - "jsonWithBytes": Like "json" but Uint8Array values are left raw (not base64-encoded). + * - "structuredClone": Native types like Date, BigInt, Error pass through (e.g. MessagePort). + */ + readonly encodingLevel: "json" | "jsonWithBytes" | "structuredClone"; + + /** + * Encodes and sends a message to the other end. Returns the encoded byte size if known + * (for flow control), or void if the size is unavailable (e.g. structured clone transports). + * When void is returned, stream writes are serialized (no overlapping) instead of using + * window-based flow control. Send errors should be propagated via `receive()` rejecting. + */ + send(message: unknown): number | void; + + /** + * Receives and decodes a message sent by the other end. + * + * If and when the transport becomes disconnected, this will reject. The thrown error will be + * propagated to all outstanding calls and future calls on any stubs associated with the session. + * If there are no outstanding calls (and none are made in the future), then the error does not + * propagate anywhere -- this is considered a "clean" shutdown. + */ + receive(): Promise; + + /** + * Indicates that the RPC system has suffered an error that prevents the session from continuing. + * The transport should ideally try to send any queued messages if it can, and then close the + * connection. (It's not strictly necessary to deliver queued messages, but the last message sent + * before abort() is called is often an "abort" message, which communicates the error to the + * peer, so if that is dropped, the peer may have less information about what happened.) + */ + abort?(reason: any): void; +} + +/** Any supported transport type. */ +export type AnyRpcTransport = RpcTransport | RpcTransportWithCustomEncoding; + // Entry on the exports table. type ExportTableEntry = { hook: StubHook, @@ -340,8 +387,12 @@ class RpcSessionImpl implements Importer, Exporter { // may be deleted from the middle (hence leaving the array sparse). onBrokenCallbacks: ((error: any) => void)[] = []; - constructor(private transport: RpcTransport, mainHook: StubHook, + // Encoding level from the transport (defaults to "string") + private encodingLevel: EncodingLevel; + + constructor(private transport: AnyRpcTransport, mainHook: StubHook, private options: RpcSessionOptions) { + this.encodingLevel = 'encodingLevel' in transport ? transport.encodingLevel : "string"; // Export zero is automatically the bootstrap object. this.exports.push({hook: mainHook, refcount: 1}); @@ -460,12 +511,12 @@ class RpcSessionImpl implements Importer, Exporter { payload => { // We don't transfer ownership of stubs in the payload since the payload // belongs to the hook which sticks around to handle pipelined requests. - let value = Devaluator.devaluate(payload.value, undefined, this, payload); + let value = Devaluator.devaluate(payload.value, undefined, this, payload, this.encodingLevel); this.send(["resolve", exportId, value]); if (autoRelease) this.releaseExport(exportId, 1); }, error => { - this.send(["reject", exportId, Devaluator.devaluate(error, undefined, this)]); + this.send(["reject", exportId, Devaluator.devaluate(error, undefined, this, undefined, this.encodingLevel)]); if (autoRelease) this.releaseExport(exportId, 1); } ).catch( @@ -473,7 +524,7 @@ class RpcSessionImpl implements Importer, Exporter { // If serialization failed, report the serialization error, which should // itself always be serializable. try { - this.send(["reject", exportId, Devaluator.devaluate(error, undefined, this)]); + this.send(["reject", exportId, Devaluator.devaluate(error, undefined, this, undefined, this.encodingLevel)]); if (autoRelease) this.releaseExport(exportId, 1); } catch (error2) { // TODO: Shouldn't happen, now what? @@ -560,29 +611,32 @@ class RpcSessionImpl implements Importer, Exporter { return importId; } - // Serializes and sends a message. Returns the byte length of the serialized message. - private send(msg: any): number { + // Serializes and sends a message. Returns the byte length of the serialized message, + // or undefined if the transport doesn't report size (e.g. structured clone). + private send(msg: any): number | void { if (this.abortReason !== undefined) { // Ignore sends after we've aborted. return 0; } - let msgText: string; try { - msgText = JSON.stringify(msg); + if (this.encodingLevel === "string") { + // Stringify and send via string transport. We know the size from the string length. + let msgText = JSON.stringify(msg); + (this.transport as RpcTransport).send(msgText); + return msgText.length; + } else { + // Custom encoding transport encodes and returns the actual encoded size, + // or void if size is unavailable (e.g. structured clone). + return (this.transport as RpcTransportWithCustomEncoding).send(msg); + } } catch (err) { // If JSON stringification failed, there's something wrong with the devaluator, as it should - // not allow non-JSONable values to be injected in the first place. + // not allow non-JSONable values to be injected in the first place. If send() threw, the + // transport is broken. Either way, abort the session. try { this.abort(err); } catch (err2) {} throw err; } - - this.transport.send(msgText) - // If send fails, abort the connection, but don't try to send an abort message since - // that'll probably also fail. - .catch(err => this.abort(err, false)); - - return msgText.length; } sendCall(id: ImportId, path: PropertyPath, args?: RpcPayload): RpcImportHook { @@ -590,7 +644,7 @@ class RpcSessionImpl implements Importer, Exporter { let value: Array = ["pipeline", id, path]; if (args) { - let devalue = Devaluator.devaluate(args.value, undefined, this, args); + let devalue = Devaluator.devaluate(args.value, undefined, this, args, this.encodingLevel); // HACK: Since the args is an array, devaluator will wrap in a second array. Need to unwrap. // TODO: Clean this up somehow. @@ -607,17 +661,17 @@ class RpcSessionImpl implements Importer, Exporter { } sendStream(id: ImportId, path: PropertyPath, args: RpcPayload) - : {promise: Promise, size: number} { + : {promise: Promise, size?: number} { if (this.abortReason) throw this.abortReason; let value: Array = ["pipeline", id, path]; - let devalue = Devaluator.devaluate(args.value, undefined, this, args); + let devalue = Devaluator.devaluate(args.value, undefined, this, args, this.encodingLevel); // HACK: Since the args is an array, devaluator will wrap in a second array. Need to unwrap. // TODO: Clean this up somehow. value.push((>devalue)[0]); - let size = this.send(["stream", value]); + let size = this.send(["stream", value]) ?? undefined; // Create the import entry in "already pulling" state (pulling=true), since stream messages // are automatically pulled. Set remoteRefcount to 0 so that resolve() won't send a release @@ -688,9 +742,12 @@ class RpcSessionImpl implements Importer, Exporter { if (trySendAbortMessage) { try { - this.transport.send(JSON.stringify(["abort", Devaluator - .devaluate(error, undefined, this)])) - .catch(err => {}); + let abortMsg = ["abort", Devaluator.devaluate(error, undefined, this, undefined, this.encodingLevel)]; + if (this.encodingLevel === "string") { + (this.transport as RpcTransport).send(JSON.stringify(abortMsg)); + } else { + (this.transport as RpcTransportWithCustomEncoding).send(abortMsg); + } } catch (err) { // ignore, probably the whole reason we're aborting is because the transport is broken } @@ -736,14 +793,17 @@ class RpcSessionImpl implements Importer, Exporter { private async readLoop(abortPromise: Promise) { while (!this.abortReason) { - let msg = JSON.parse(await Promise.race([this.transport.receive(), abortPromise])); + let raw = await Promise.race([this.transport.receive(), abortPromise]); if (this.abortReason) break; // check again before processing + // Only parse JSON at "string" level; otherwise message is already an object + let msg = this.encodingLevel === "string" ? JSON.parse(raw as string) : raw; + if (msg instanceof Array) { switch (msg[0]) { case "push": // ["push", Expression] if (msg.length > 1) { - let payload = new Evaluator(this).evaluate(msg[1]); + let payload = new Evaluator(this, this.encodingLevel).evaluate(msg[1]); let hook = new PayloadStubHook(payload); // It's possible for a rejection to occur before the client gets a chance to send @@ -762,7 +822,7 @@ class RpcSessionImpl implements Importer, Exporter { // - The export is automatically considered "pulled". // - Once the "resolve" is sent, the export is implicitly released. if (msg.length > 1) { - let payload = new Evaluator(this).evaluate(msg[1]); + let payload = new Evaluator(this, this.encodingLevel).evaluate(msg[1]); let hook = new PayloadStubHook(payload); hook.ignoreUnhandledRejections(); @@ -802,11 +862,11 @@ class RpcSessionImpl implements Importer, Exporter { let imp = this.imports[importId]; if (imp) { if (msg[0] == "resolve") { - imp.resolve(new PayloadStubHook(new Evaluator(this).evaluate(msg[2]))); + imp.resolve(new PayloadStubHook(new Evaluator(this, this.encodingLevel).evaluate(msg[2]))); } else { // HACK: We expect errors are always simple values (no stubs) so we can just // pull the value out of the payload. - let payload = new Evaluator(this).evaluate(msg[2]); + let payload = new Evaluator(this, this.encodingLevel).evaluate(msg[2]); payload.dispose(); // just in case -- should be no-op imp.resolve(new ErrorStubHook(payload.value)); } @@ -817,7 +877,7 @@ class RpcSessionImpl implements Importer, Exporter { if (msg[0] == "resolve") { // We need to evaluate the resolution and immediately dispose it so that we // release any stubs it contains. - new Evaluator(this).evaluate(msg[2]).dispose(); + new Evaluator(this, this.encodingLevel).evaluate(msg[2]).dispose(); } } continue; @@ -836,7 +896,7 @@ class RpcSessionImpl implements Importer, Exporter { } case "abort": { - let payload = new Evaluator(this).evaluate(msg[1]); + let payload = new Evaluator(this, this.encodingLevel).evaluate(msg[1]); payload.dispose(); // just in case -- should be no-op this.abort(payload, false); break; @@ -879,7 +939,7 @@ export class RpcSession { #session: RpcSessionImpl; #mainStub: RpcStub; - constructor(transport: RpcTransport, localMain?: any, options: RpcSessionOptions = {}) { + constructor(transport: AnyRpcTransport, localMain?: any, options: RpcSessionOptions = {}) { let mainHook: StubHook; if (localMain) { mainHook = new PayloadStubHook(RpcPayload.fromAppReturn(localMain)); diff --git a/src/serialize.ts b/src/serialize.ts index bea1282..c9608ef 100644 --- a/src/serialize.ts +++ b/src/serialize.ts @@ -7,6 +7,26 @@ import { StubHook, RpcPayload, typeForRpc, RpcStub, RpcPromise, LocatedPromise, export type ImportId = number; export type ExportId = number; +/** + * Encoding levels determine how much pre-processing the RPC system does before handing + * messages to the transport. + * + * - `"string"`: Full JSON encoding (string output). Default, used by HTTP batch. + * - `"json"`: JS object tree with all types encoded (JSON-compatible). For custom encoders. + * - `"jsonWithBytes"`: Like json but Uint8Array stays raw. For CBOR/MessagePack. + * - `"structuredClone"`: Only encode stubs/functions, pass native types through. For MessagePort. + * + * @example + * ```ts + * // What happens to Uint8Array([1, 2, 3]) at each level: + * "string" → '["bytes","AQID"]' // JSON string with base64 + * "json" → ["bytes", "AQID"] // JS array with base64 + * "jsonWithBytes" → ["bytes", Uint8Array] // JS array with raw bytes + * "structuredClone" → ["bytes", Uint8Array] // + Date, BigInt, Error stay native + * ``` + */ +export type EncodingLevel = "string" | "json" | "jsonWithBytes" | "structuredClone"; + // ======================================================================================= export interface Exporter { @@ -73,7 +93,11 @@ interface FromBase64 { // actually converting to a string. (The name is meant to be the opposite of "Evaluator", which // implements the opposite direction.) export class Devaluator { - private constructor(private exporter: Exporter, private source: RpcPayload | undefined) {} + private constructor( + private exporter: Exporter, + private source: RpcPayload | undefined, + private encodingLevel: EncodingLevel + ) {} // Devaluate the given value. // * value: The value to devaluate. @@ -81,12 +105,15 @@ export class Devaluator { // as a function. // * exporter: Callbacks to the RPC session for exporting capabilities found in this message. // * source: The RpcPayload which contains the value, and therefore owns stubs within. + // * encodingLevel: How much encoding to apply (default "string"). // - // Returns: The devaluated value, ready to be JSON-serialized. + // Returns: The devaluated value, ready to be JSON-serialized (or passed to transport directly + // for non-string levels). public static devaluate( - value: unknown, parent?: object, exporter: Exporter = NULL_EXPORTER, source?: RpcPayload) + value: unknown, parent?: object, exporter: Exporter = NULL_EXPORTER, source?: RpcPayload, + encodingLevel: EncodingLevel = "string") : unknown { - let devaluator = new Devaluator(exporter, source); + let devaluator = new Devaluator(exporter, source, encodingLevel); try { return devaluator.devaluateImpl(value, parent, 0); } catch (err) { @@ -123,6 +150,10 @@ export class Devaluator { case "primitive": if (typeof value === "number" && !isFinite(value)) { + // At passthrough level, keep Infinity/NaN as native values + if (this.encodingLevel === "structuredClone") { + return value; + } if (value === Infinity) { return ["inf"]; } else if (value === -Infinity) { @@ -156,13 +187,26 @@ export class Devaluator { } case "bigint": + // At structuredClone level, keep BigInt as native value + if (this.encodingLevel === "structuredClone") { + return value; + } return ["bigint", (value).toString()]; case "date": + // At structuredClone level, keep Date as native value + if (this.encodingLevel === "structuredClone") { + return value; + } return ["date", (value).getTime()]; case "bytes": { let bytes = value as Uint8Array; + // At structuredClone or jsonWithBytes level, keep Uint8Array raw + if (this.encodingLevel === "structuredClone" || this.encodingLevel === "jsonWithBytes") { + return ["bytes", bytes]; + } + // Otherwise encode as base64 if (bytes.toBase64) { return ["bytes", bytes.toBase64({omitPadding: true})]; } else { @@ -311,6 +355,11 @@ export class Devaluator { e = rewritten; } + // At structuredClone level, keep Error as native value (still call onSendError above) + if (this.encodingLevel === "structuredClone") { + return rewritten || value; + } + let result = ["error", e.name, e.message]; if (rewritten && rewritten.stack) { result.push(rewritten.stack); @@ -319,6 +368,10 @@ export class Devaluator { } case "undefined": + // At structuredClone level, keep undefined as native value + if (this.encodingLevel === "structuredClone") { + return undefined; + } return ["undefined"]; case "stub": @@ -465,7 +518,7 @@ function fixBrokenRequestBody(request: Request, body: ReadableStream): RpcPromis // delivery to the app. This is used to implement deserialization, except that it doesn't actually // start from a raw string. export class Evaluator { - constructor(private importer: Importer) {} + constructor(private importer: Importer, private encodingLevel: EncodingLevel = "string") {} private hooks: StubHook[] = []; private promises: LocatedPromise[] = []; @@ -487,6 +540,14 @@ export class Evaluator { } private evaluateImpl(value: unknown, parent: object, property: string | number): unknown { + // At structuredClone level, native types come through directly + if (this.encodingLevel === "structuredClone" || this.encodingLevel === "jsonWithBytes") { + if (value instanceof Date || value instanceof Uint8Array || + value instanceof Error || typeof value === "bigint") { + return value; + } + } + if (value instanceof Array) { if (value.length == 1 && value[0] instanceof Array) { // Escaped array. Evaluate the contents. @@ -507,6 +568,11 @@ export class Evaluator { } break; case "bytes": { + // At jsonWithBytes/structuredClone level, bytes may already be a Uint8Array + if (value[1] instanceof Uint8Array) { + return value[1]; + } + // Otherwise decode from base64 let b64 = Uint8Array as FromBase64; if (typeof value[1] == "string") { if (b64.fromBase64) { diff --git a/src/websocket.ts b/src/websocket.ts index 32dbefa..b081766 100644 --- a/src/websocket.ts +++ b/src/websocket.ts @@ -38,10 +38,17 @@ export function newWorkersWebSocketRpcResponse( }); } -class WebSocketTransport implements RpcTransport { +/** + * Generic WebSocket transport. Default `T = string` is backward-compatible and satisfies + * `RpcTransport`. Use `T = ArrayBuffer` as a building block for binary transports. + */ +export class WebSocketTransport { constructor (webSocket: WebSocket) { this.#webSocket = webSocket; + // Always set binaryType — harmless for string mode, required for ArrayBuffer mode. + webSocket.binaryType = "arraybuffer"; + if (webSocket.readyState === WebSocket.CONNECTING) { this.#sendQueue = []; webSocket.addEventListener("open", event => { @@ -59,16 +66,16 @@ class WebSocketTransport implements RpcTransport { webSocket.addEventListener("message", (event: MessageEvent) => { if (this.#error) { // Ignore further messages. - } else if (typeof event.data === "string") { + } else if (typeof event.data === "string" || event.data instanceof ArrayBuffer) { if (this.#receiveResolver) { - this.#receiveResolver(event.data); + this.#receiveResolver(event.data as T); this.#receiveResolver = undefined; this.#receiveRejecter = undefined; } else { - this.#receiveQueue.push(event.data); + this.#receiveQueue.push(event.data as T); } } else { - this.#receivedError(new TypeError("Received non-string message from WebSocket.")); + this.#receivedError(new TypeError("Received unexpected message type from WebSocket.")); } }); @@ -82,13 +89,13 @@ class WebSocketTransport implements RpcTransport { } #webSocket: WebSocket; - #sendQueue?: string[]; // only if not opened yet - #receiveResolver?: (message: string) => void; + #sendQueue?: T[]; // only if not opened yet + #receiveResolver?: (message: T) => void; #receiveRejecter?: (err: any) => void; - #receiveQueue: string[] = []; + #receiveQueue: T[] = []; #error?: any; - async send(message: string): Promise { + send(message: T): void { if (this.#sendQueue === undefined) { this.#webSocket.send(message); } else { @@ -97,20 +104,20 @@ class WebSocketTransport implements RpcTransport { } } - async receive(): Promise { + receive(): Promise { if (this.#receiveQueue.length > 0) { - return this.#receiveQueue.shift()!; + return Promise.resolve(this.#receiveQueue.shift()!); } else if (this.#error) { - throw this.#error; + return Promise.reject(this.#error); } else { - return new Promise((resolve, reject) => { + return new Promise((resolve, reject) => { this.#receiveResolver = resolve; this.#receiveRejecter = reject; }); } } - abort?(reason: any): void { + abort(reason: any): void { let message: string; if (reason instanceof Error) { message = reason.message; @@ -136,3 +143,6 @@ class WebSocketTransport implements RpcTransport { } } } + +// WebSocketTransport satisfies RpcTransport (can't use `implements` on generic class). +const _typeCheck: RpcTransport = null! as WebSocketTransport;