Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .eslintrc.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
module.exports = {
extends: "eslint-config-typescript-library",
extends: 'eslint-config-typescript-library',
rules: {
camelcase: 'off',
},
};
6 changes: 4 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"markdown"
],
"editor.codeActionsOnSave": {
"source.fixAll.eslint": true
"source.fixAll.eslint": "explicit"
},
"search.exclude": {
"**/.git": true,
Expand Down Expand Up @@ -43,7 +43,9 @@
"liveServer.settings.port": 5501,
"js/ts.implicitProjectConfig.strictNullChecks": false,
"cSpell.words": [
"camelcase",
"insx",
"Unport"
"Unport",
"Unrpc"
]
}
128 changes: 128 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Each of these JSContexts exhibits distinct methods of communicating with the ext
- [Channel](#channel-1)
- [.pipe()](#pipe)
- [ChannelMessage](#channelmessage)
- [Unrpc (Experimental)](#unrpc-experimental)
- [🤝 Contributing](#-contributing)
- [🤝 Credits](#-credits)
- [LICENSE](#license)
Expand Down Expand Up @@ -299,6 +300,133 @@ The `ChannelMessage` type is used for the message in the `onMessage` method.
import { ChannelMessage } from 'unport';
```

### Unrpc (Experimental)

Starting with the 0.6.0 release, we are experimentally introducing support for Typed [RPC (Remote Procedure Call)](https://en.wikipedia.org/wiki/Remote_procedure_call).

When dealing with a single Port that requires RPC definition, we encounter a problem related to the programming paradigm. It's necessary to define `Request` and `Response` messages such as:

```ts
export type IpcDefinition = {
a2b: {
callFoo: {
input: string;
};
};
b2a: {
callFooCallback: {
result: string;
};
};
};
```

In the case where an RPC call needs to be encapsulated, the API might look like this:

```ts
function rpcCall(request: { input: string; }): Promise<{ result: string; }>;
```

Consequently, to associate a callback function, it becomes a requirement to include a `CallbackId` at the **application layer** for every RPC method:

```diff
export type IpcDefinition = {
a2b: {
callFoo: {
input: string;
+ callbackId: string;
};
};
b2a: {
callFooCallback: {
result: string;
+ callbackId: string;
};
};
};
```

`Unrpc` is provided to address this issue, enabling support for Typed RPC starting from the **protocol layer**:

```ts
import { Unrpc } from 'unport';

// "parentPort" is a Port defined based on Unport in the previous example.
const parent = new Unrpc(parentPort);

// Implementing an RPC method.
parent.implement('callFoo', request => ({
user: `parent (${request.id})`,
}));

// Emit a SYN event.
parent.port.postMessage('syn', { pid: 'parent' });

// Listen for the ACK message.
parent.port.onMessage('ack', async payload => {
// Call an RPC method as defined by the "child" port.
const response = await parent.call('getChildInfo', {
name: 'parent',
});
});
```

The implementation on the `child` side is as follows:

```ts
import { Unrpc } from 'unport';

// "parentPort" is a Port also defined based on Unport.
const child = new Unrpc(childPort);

child.implement('getChildInfo', request => ({
clientKey: `[child] ${request.name}`,
}));

// Listen for the SYN message.
child.port.onMessage('syn', async payload => {
const response = await child.call('getInfo', { id: '<child>' });
// Acknowledge the SYN event.
child.port.postMessage('ack', { pid: 'child' });
});
```

The types are defined as such:

```ts
import { Unport } from 'unport';

export type Definition = {
parent2child: {
syn: {
pid: string;
};
getInfo__callback: {
user: string;
};
getChildInfo: {
name: string;
}
};
child2parent: {
getInfo: {
id: string;
};
getChildInfo__callback: {
clientKey: string;
};
ack: {
pid: string;
};
};
};

export type ChildPort = Unport<Definition, 'child'>;
export type ParentPort = Unport<Definition, 'parent'>;
```

In comparison to Unport, the only new concept to grasp is that the RPC response message key must end with `__callback`. Other than that, no additional changes are necessary! `Unrpc` also offers comprehensive type inference based on this convention; for instance, you won't be able to implement an RPC method that is meant to serve as a response.

## 🤝 Contributing

Contributions, issues and feature requests are welcome!
Expand Down
152 changes: 152 additions & 0 deletions __tests__/rpc.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { ChannelMessage, Unport, Unrpc, UnrpcExecutionErrorError, UnrpcNotImplementationError } from '../src';

export type Definition = {
parent2child: {
syn: {
pid: string;
};
getInfo__callback: {
user: string;
};
getChildInfo: {
name: string;
}
};
child2parent: {
getInfo: {
id: string;
};
getChildInfo__callback: {
clientKey: string;
};
ack: {
pid: string;
};
};
};

describe('Unrpc', () => {
let childPort: Unport<Definition, 'child'>;
let parentPort: Unport<Definition, 'parent'>;
let child: Unrpc<Definition, 'child'>;
let parent: Unrpc<Definition, 'parent'>;

beforeEach(() => {
const messageChannel = new MessageChannel();
if (childPort) childPort.destroy();
childPort = new Unport();
childPort.implementChannel({
send(message) {
messageChannel.port1.postMessage(message);
},
accept(pipe) {
messageChannel.port1.onmessage = (message: MessageEvent<ChannelMessage>) => pipe(message.data);
},
destroy() {
messageChannel.port1.close();
},
});
child = new Unrpc(childPort);

parentPort = new Unport();
parentPort.implementChannel({
send(message) {
console.log(message);
messageChannel.port2.postMessage(message);
},
accept(pipe) {
messageChannel.port2.onmessage = (message: MessageEvent<ChannelMessage>) => pipe(message.data);
},
destroy() {
messageChannel.port2.close();
},
});

parent = new Unrpc(parentPort);
});

it('implemented method - asynchronous implementation', async () => {
parent.implement('getInfo', async ({ id }) => ({ user: id }));
const response = child.call('getInfo', { id: 'name' });
expect(response).resolves.toMatchObject({ user: 'name' });
});

it('implemented method - synchronous implementation', async () => {
parent.implement('getInfo', ({ id }) => ({ user: id }));
const response = child.call('getInfo', { id: 'name' });
expect(response).resolves.toMatchObject({ user: 'name' });
});

it('Error: UnrpcNotImplementationError', async () => {
expect(child.call('getInfo', { id: 'name' })).rejects.toMatchObject(
new UnrpcNotImplementationError('Method getInfo is not implemented'),
);
});

it('Error: UnrpcExecutionErrorError - script error - asynchronous implementation', async () => {
parent.implement('getInfo', async () => {
// @ts-expect-error mock execution error here.
const result = foo;
return result;
});
expect(child.call('getInfo', { id: 'name' })).rejects.toMatchObject(
new UnrpcExecutionErrorError('foo is not defined'),
);
});

it('Error: UnrpcExecutionErrorError - script error - synchronous implementation', async () => {
parent.implement('getInfo', () => {
// @ts-expect-error mock execution error here.
const result = foo;
return result;
});
expect(child.call('getInfo', { id: 'name' })).rejects.toMatchObject(
new UnrpcExecutionErrorError('foo is not defined'),
);
});

it('Error: UnrpcExecutionErrorError - user throws error', async () => {
parent.implement('getInfo', () => {
throw new Error('mock error');
});
expect(child.call('getInfo', { id: 'name' })).rejects.toMatchObject(
new UnrpcExecutionErrorError('mock error'),
);
});

it('complicated case', async () => {
parent.implement('getInfo', async ({ id }) => ({ user: id }));
child.implement('getChildInfo', async ({ name }) => ({ clientKey: name }));

let finishHandshake: (value?: unknown) => void;
const handshakePromise = new Promise(resolve => {
finishHandshake = resolve;
});

/**
* Simulates a handshake
*/
parent.port.postMessage('syn', { pid: 'parent' });
parent.port.onMessage('ack', async payload => {
expect(payload.pid).toBe('child');
finishHandshake();
});
child.port.onMessage('syn', async payload => {
expect(payload.pid).toBe('parent');
child.port.postMessage('ack', { pid: 'child' });
});

/**
* Wait handshake finished
*/
await handshakePromise;

const [response1, response2] = await Promise.all([
child.call('getInfo', { id: 'child' }),
parent.call('getChildInfo', { name: 'parent' }),
]);
expect(response1).toMatchObject({ user: 'child' });
expect(response2).toMatchObject({ clientKey: 'parent' });
});
});
29 changes: 29 additions & 0 deletions examples/child-process-rpc/child.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { Unport, Unrpc, ChannelMessage } from '../../lib';
import { ChildPort } from './port';

// 1. Initialize a port
const childPort: ChildPort = new Unport();

// 2. Implement a Channel based on underlying IPC capabilities
childPort.implementChannel({
send(message) {
process.send && process.send(message);
},
accept(pipe) {
process.on('message', (message: ChannelMessage) => {
pipe(message);
});
},
});

// 3. Initialize a rpc client
const childRpcClient = new Unrpc(childPort);
childRpcClient.implement('getChildInfo', request => ({
clientKey: `[child] ${request.name}`,
}));
childRpcClient.port.onMessage('syn', async payload => {
console.log('[child] [event] [syn] [result]', payload);
const response = await childRpcClient.call('getInfo', { id: '<child>' });
console.log('[child] [rpc] [getInfo] [response]', response);
childPort.postMessage('ack', { pid: 'child' });
});
43 changes: 43 additions & 0 deletions examples/child-process-rpc/parent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { join } from 'path';
import { fork } from 'child_process';
import { Unport, Unrpc, ChannelMessage } from '../../lib';
import { ParentPort } from './port';

// 1. Initialize a port
const parentPort: ParentPort = new Unport();

// 2. Implement a Channel based on underlying IPC capabilities
const childProcess = fork(join(__dirname, './child.js'));
parentPort.implementChannel({
send(message) {
childProcess.send(message);
},
accept(pipe) {
childProcess.on('message', (message: ChannelMessage) => {
pipe(message);
});
},
destroy() {
childProcess.removeAllListeners('message');
childProcess.kill();
},
});

// 3. Initialize a rpc client from port.
const parentRpcClient = new Unrpc(parentPort);

parentRpcClient.implement('getInfo', request => ({
user: `parent (${request.id})`,
}));
parentRpcClient.port.postMessage('syn', { pid: 'parent' });
parentRpcClient.port.onMessage('ack', async payload => {
console.log('[parent] [event] [ack] [result]', payload);
const response = await parentRpcClient.call('getChildInfo', {
name: 'parent',
});
console.log('[parent] [rpc] [getChildInfo] [response]', response);
setTimeout(() => {
console.log('destroy');
parentPort.destroy();
}, 1000);
});
Loading