From 5a93e7485b2718b44a59e595c5b1e896fe9802cb Mon Sep 17 00:00:00 2001 From: Michal Piechowiak Date: Fri, 2 Jul 2021 08:24:07 +0200 Subject: [PATCH] feat(gatsby-worker): add messaging api (#32159) --- .eslintignore | 1 + .prettierignore | 2 + packages/gatsby-worker/README.md | 128 +++++++++++++++++- .../src/__tests__/fixtures/test-child.ts | 121 +++++++++++++++-- .../src/__tests__/integration.ts | 74 ++++++++-- packages/gatsby-worker/src/child.ts | 41 +++++- packages/gatsby-worker/src/index.ts | 30 +++- packages/gatsby-worker/src/types.ts | 7 +- 8 files changed, 373 insertions(+), 31 deletions(-) diff --git a/.eslintignore b/.eslintignore index f46bbe00027a5..0cf3940554934 100644 --- a/.eslintignore +++ b/.eslintignore @@ -35,3 +35,4 @@ packages/gatsby-plugin-preact/fast-refresh packages/gatsby-source-wordpress/test-site/** !packages/gatsby-source-wordpress/test-site/__tests__ !packages/gatsby-source-wordpress/test-site/test-utils +!packages/gatsby-worker/src/__tests__/fixtures/** diff --git a/.prettierignore b/.prettierignore index 74c609d5c08fe..b3de8fe7d9187 100644 --- a/.prettierignore +++ b/.prettierignore @@ -29,5 +29,7 @@ packages/gatsby-source-wordpress/test-site/** **/__testfixtures__/** **/__tests__/fixtures/** +!packages/gatsby-worker/src/__tests__/fixtures/** + # coverage coverage diff --git a/packages/gatsby-worker/README.md b/packages/gatsby-worker/README.md index aa8bb57c1b44f..0298beae27fc7 100644 --- a/packages/gatsby-worker/README.md +++ b/packages/gatsby-worker/README.md @@ -17,7 +17,7 @@ export async function setupStep(param: string): Promise { } ``` -File `parent.ts` +File `parent.ts`: ```ts import { WorkerPool } from "gatsby-worker" @@ -99,6 +99,132 @@ if (isWorker) { } ``` +### Messaging + +`gatsby-worker` allows sending messages from worker to main and from main to worker at any time. + +#### Sending messages from worker + +File `message-types.ts`: + +```ts +// `gatsby-worker` supports message types. Creating common module that centralize possible messages +// that is shared by worker and parent will ensure messages type safety. +interface IPingMessage { + type: `PING` +} + +interface IAnotherMessageFromChild { + type: `OTHER_MESSAGE_FROM_CHILD` + payload: { + foo: string + } +} + +export type MessagesFromChild = IPingMessage | IAnotherMessageFromChild + +interface IPongMessage { + type: `PONG` +} + +interface IAnotherMessageFromParent { + type: `OTHER_MESSAGE_FROM_PARENT` + payload: { + foo: string + } +} + +export type MessagesFromParent = IPongMessage | IAnotherMessageFromParent +``` + +File `worker.ts`: + +```ts +import { getMessenger } from "gatsby-worker" + +import { MessagesFromParent, MessagesFromChild } from "./message-types" + +const messenger = getMessenger() +// messenger might be `undefined` if `getMessenger` +// is called NOT in worker context +if (messenger) { + // send a message to a parent + messenger.send({ type: `PING` }) + messenger.send({ + type: `OTHER_MESSAGE_FROM_CHILD`, + payload: { + foo: `bar`, + }, + }) + + // following would cause type error as message like that is + // not part of MessagesFromChild type union + // messenger.send({ type: `NOT_PART_OF_TYPES` }) + + // start listening to messages from parent + messenger.onMessage(msg => { + switch (msg.type) { + case `PONG`: { + // handle PONG message + break + } + case `OTHER_MESSAGE_FROM_PARENT`: { + // msg.payload.foo will be typed as `string` here + // handle + break + } + + // following would cause type error as there is no msg with + // given type as part of MessagesFromParent type union + // case `NOT_PART_OF_TYPES`: {} + } + }) +} +``` + +File `parent.ts`: + +```ts +import { getMessenger } from "gatsby-worker" + +import { MessagesFromParent, MessagesFromChild } from "./message-types" + +const workerPool = new WorkerPool< + typeof import("./worker"), + MessagesFromParent, + MessagesFromChild +>( + workerPath: require.resolve(`./worker`) +) + +// `sendMessage` on WorkerPool instance requires second parameter +// `workerId` to specify to which worker to send message to +// (`workerId` starts at 1 for first worker). +workerPool.sendMessage( + { + type: `OTHER_MESSAGE_FROM_PARENT`, + payload: { + foo: `baz` + } + }, + 1 +) + +// start listening to messages from child +// `onMessage` callback will be called with message sent from worker +// and `workerId` (to identify which worker send this message) +workerPool.onMessage((msg: MessagesFromChild, workerId: number): void => { + switch(msg.type) { + case: `PING`: { + // send message back making sure we send it back to same worker + // that sent `PING` message + workerPool.sendMessage({ type: `PONG` }, workerId) + break + } + } +}) +``` + ## Usage with unit tests If you are working with source files that need transpilation, you will need to make it possible to load untranspiled modules in child processes. diff --git a/packages/gatsby-worker/src/__tests__/fixtures/test-child.ts b/packages/gatsby-worker/src/__tests__/fixtures/test-child.ts index 0b3c5219b04af..dfaa806254bc2 100644 --- a/packages/gatsby-worker/src/__tests__/fixtures/test-child.ts +++ b/packages/gatsby-worker/src/__tests__/fixtures/test-child.ts @@ -1,9 +1,18 @@ +import { getMessenger } from "../../" + export function sync(a: string, opts?: { addWorkerId?: boolean }): string { - return `foo ${a}${opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : ``}` + return `foo ${a}${ + opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : `` + }` } -export async function async(a: string, opts?: { addWorkerId?: boolean }): Promise { - return `foo ${a}${opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : ``}` +export async function async( + a: string, + opts?: { addWorkerId?: boolean } +): Promise { + return `foo ${a}${ + opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : `` + }` } export function neverEnding(): Promise { @@ -12,23 +21,107 @@ export function neverEnding(): Promise { export const notAFunction = `string` -export function syncThrow(a: string, opts?: { addWorkerId?: boolean, throwOnWorker?: number }): string { - if (!opts?.throwOnWorker || opts?.throwOnWorker?.toString() === process.env.GATSBY_WORKER_ID) { - throw new Error(`sync throw${opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : ``}`) +export function syncThrow( + a: string, + opts?: { addWorkerId?: boolean; throwOnWorker?: number } +): string { + if ( + !opts?.throwOnWorker || + opts?.throwOnWorker?.toString() === process.env.GATSBY_WORKER_ID + ) { + throw new Error( + `sync throw${ + opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : `` + }` + ) } - return `foo ${a}${opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : ``}` + return `foo ${a}${ + opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : `` + }` } -export async function asyncThrow(a: string, opts?: { addWorkerId?: boolean, throwOnWorker?: number }): Promise { - if (!opts?.throwOnWorker || opts?.throwOnWorker?.toString() === process.env.GATSBY_WORKER_ID) { - throw new Error(`async throw${opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : ``}`) +export async function asyncThrow( + a: string, + opts?: { addWorkerId?: boolean; throwOnWorker?: number } +): Promise { + if ( + !opts?.throwOnWorker || + opts?.throwOnWorker?.toString() === process.env.GATSBY_WORKER_ID + ) { + throw new Error( + `async throw${ + opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : `` + }` + ) } - return `foo ${a}${opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : ``}` + return `foo ${a}${ + opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : `` + }` } // used in task queue as previous functions would be too often too fast -export async function async100ms(taskId: number, opts?: { addWorkerId?: boolean }): Promise<{taskId: number, workerId: string}> { - return new Promise(resolve => setTimeout(resolve, 100, {taskId, workerId: opts?.addWorkerId ? process.env.GATSBY_WORKER_ID : undefined})) -} \ No newline at end of file +export async function async100ms( + taskId: number, + opts?: { addWorkerId?: boolean } +): Promise<{ taskId: number; workerId: string }> { + return new Promise(resolve => + setTimeout(resolve, 100, { + taskId, + workerId: opts?.addWorkerId ? process.env.GATSBY_WORKER_ID : undefined, + }) + ) +} + +interface IPingMessage { + type: `PING` +} + +export type MessagesFromChild = IPingMessage + +interface IPongMessage { + type: `PONG` +} + +export type MessagesFromParent = IPongMessage + +let setupPingPongMessages = function (): Promise { + throw new Error(`gatsby-worker messenger not available`) +} +let getWasPonged = function (): boolean { + throw new Error(`gatsby-worker messenger not available`) +} + +const messenger = getMessenger() +if (messenger) { + let wasPonged = false + setupPingPongMessages = function (): Promise { + if (messenger.messagingVersion === 1) { + const pongPromise = new Promise(resolve => { + messenger.onMessage(msg => { + if (msg.type === `PONG`) { + wasPonged = true + resolve() + } + }) + }) + + messenger.sendMessage({ type: `PING` }) + + return pongPromise + } + + return Promise.reject( + new Error( + `Not supported messaging version: "${messenger.messagingVersion}"` + ) + ) + } + + getWasPonged = function getWasPonged(): boolean { + return wasPonged + } +} + +export { setupPingPongMessages, getWasPonged } diff --git a/packages/gatsby-worker/src/__tests__/integration.ts b/packages/gatsby-worker/src/__tests__/integration.ts index 4cdf990f96cbf..92e1817773ab3 100644 --- a/packages/gatsby-worker/src/__tests__/integration.ts +++ b/packages/gatsby-worker/src/__tests__/integration.ts @@ -1,9 +1,16 @@ import "jest-extended" import { WorkerPool } from "../" import { isPromise } from "../utils" +import { MessagesFromChild, MessagesFromParent } from "./fixtures/test-child" describe(`gatsby-worker`, () => { - let workerPool: WorkerPool | undefined + let workerPool: + | WorkerPool< + typeof import("./fixtures/test-child"), + MessagesFromParent, + MessagesFromChild + > + | undefined const numWorkers = 2 async function endWorkerPool(): Promise { @@ -14,17 +21,12 @@ describe(`gatsby-worker`, () => { } beforeEach(() => { - workerPool = new WorkerPool( - require.resolve(`./fixtures/test-child`), - { - numWorkers, - env: { - NODE_OPTIONS: `--require ${require.resolve( - `./fixtures/ts-register` - )}`, - }, - } - ) + workerPool = new WorkerPool(require.resolve(`./fixtures/test-child`), { + numWorkers, + env: { + NODE_OPTIONS: `--require ${require.resolve(`./fixtures/ts-register`)}`, + }, + }) }) afterEach(endWorkerPool) @@ -46,6 +48,8 @@ describe(`gatsby-worker`, () => { "syncThrow", "asyncThrow", "async100ms", + "setupPingPongMessages", + "getWasPonged", ] `) // .all and .single should have same methods @@ -282,4 +286,50 @@ describe(`gatsby-worker`, () => { ) }) }) + + describe(`messaging`, () => { + it(`worker can receive and send messages`, async () => { + if (!workerPool) { + fail(`worker pool not created`) + } + + workerPool.onMessage((msg, workerId) => { + if (msg.type === `PING`) { + if (!workerPool) { + fail(`worker pool not created`) + } + workerPool.sendMessage({ type: `PONG` }, workerId) + } + }) + + // baseline - workers shouldn't be PONGed yet + expect(await Promise.all(workerPool.all.getWasPonged())) + .toMatchInlineSnapshot(` + Array [ + false, + false, + ] + `) + + await Promise.all(workerPool.all.setupPingPongMessages()) + + expect(await Promise.all(workerPool.all.getWasPonged())) + .toMatchInlineSnapshot(` + Array [ + true, + true, + ] + `) + }) + + it(`sending message to worker that doesn't exist throws error`, async () => { + expect(() => { + if (!workerPool) { + fail(`worker pool not created`) + } + + workerPool.sendMessage({ type: `PONG` }, 9001) + }).toThrowError(`There is no worker with "9001" id.`) + }) + }) }) diff --git a/packages/gatsby-worker/src/child.ts b/packages/gatsby-worker/src/child.ts index 3619108caa6b9..9947e2bfc655b 100644 --- a/packages/gatsby-worker/src/child.ts +++ b/packages/gatsby-worker/src/child.ts @@ -5,16 +5,33 @@ import { END, ERROR, RESULT, + CUSTOM_MESSAGE, } from "./types" import { isPromise } from "./utils" +export interface IGatsbyWorkerMessenger< + MessagesFromParent = unknown, + MessagesFromChild = MessagesFromParent +> { + onMessage: (listener: (msg: MessagesFromParent) => void) => void + sendMessage: (msg: MessagesFromChild) => void + messagingVersion: 1 +} + /** * Used to check wether current context is executed in worker process */ let isWorker = false +let getMessenger = function < + MessagesFromParent = unknown, + MessagesFromChild = MessagesFromParent +>(): IGatsbyWorkerMessenger | undefined { + return undefined +} if (process.send && process.env.GATSBY_WORKER_MODULE_PATH) { isWorker = true + const listeners: Array<(msg: any) => void> = [] const ensuredSendToMain = process.send.bind(process) as ( msg: ChildMessageUnion ) => void @@ -40,6 +57,24 @@ if (process.send && process.env.GATSBY_WORKER_MODULE_PATH) { ensuredSendToMain(msg) } + const MESSAGING_VERSION = 1 + + getMessenger = function < + MessagesFromParent = unknown, + MessagesFromChild = MessagesFromParent + >(): IGatsbyWorkerMessenger { + return { + onMessage(listener: (msg: MessagesFromParent) => void): void { + listeners.push(listener) + }, + sendMessage(msg: MessagesFromChild): void { + const poolMsg: ChildMessageUnion = [CUSTOM_MESSAGE, msg] + ensuredSendToMain(poolMsg) + }, + messagingVersion: MESSAGING_VERSION, + } + } + const child = require(process.env.GATSBY_WORKER_MODULE_PATH) function messageHandler(msg: ParentMessageUnion): void { @@ -59,10 +94,14 @@ if (process.send && process.env.GATSBY_WORKER_MODULE_PATH) { } } else if (msg[0] === END) { process.off(`message`, messageHandler) + } else if (msg[0] === CUSTOM_MESSAGE) { + for (const listener of listeners) { + listener(msg[1]) + } } } process.on(`message`, messageHandler) } -export { isWorker } +export { isWorker, getMessenger } diff --git a/packages/gatsby-worker/src/index.ts b/packages/gatsby-worker/src/index.ts index f2bc96b2beef5..cc53a3aa915e7 100644 --- a/packages/gatsby-worker/src/index.ts +++ b/packages/gatsby-worker/src/index.ts @@ -6,6 +6,7 @@ import { END, ERROR, RESULT, + CUSTOM_MESSAGE, ParentMessageUnion, ChildMessageUnion, } from "./types" @@ -93,7 +94,11 @@ interface IWorkerInfo { * Worker pool allows queueing execution of a function on all workers (via `.all` property) * as well as distributing execution across workers (via `.single` property) */ -export class WorkerPool> { +export class WorkerPool< + WorkerModuleExports = Record, + MessagesFromParent = unknown, + MessagesFromChild = MessagesFromParent +> { /** * Schedule task execution on all workers. Useful for setting up workers */ @@ -115,6 +120,9 @@ export class WorkerPool> { private workers: Array> = [] private taskQueue = new TaskQueue>() private idleWorkers: Set> = new Set() + private listeners: Array< + (msg: MessagesFromChild, workerId: number) => void + > = [] constructor(workerPath: string, options?: IWorkerOptions) { const single: Partial["single"]> = {} @@ -226,6 +234,10 @@ export class WorkerPool> { workerInfo.currentTask = undefined this.checkForWork(workerInfo) task.reject(error) + } else if (msg[0] === CUSTOM_MESSAGE) { + for (const listener of this.listeners) { + listener(msg[1] as MessagesFromChild, workerId) + } } }) @@ -342,6 +354,22 @@ export class WorkerPool> { ) ) } + + onMessage( + listener: (msg: MessagesFromChild, workerId: number) => void + ): void { + this.listeners.push(listener) + } + + sendMessage(msg: MessagesFromParent, workerId: number): void { + const worker = this.workers[workerId - 1] + if (!worker) { + throw new Error(`There is no worker with "${workerId}" id.`) + } + + const poolMsg = [CUSTOM_MESSAGE, msg] + worker.worker.send(poolMsg) + } } export * from "./child" diff --git a/packages/gatsby-worker/src/types.ts b/packages/gatsby-worker/src/types.ts index 9e74cc66f5638..5a066c7c3af4c 100644 --- a/packages/gatsby-worker/src/types.ts +++ b/packages/gatsby-worker/src/types.ts @@ -2,6 +2,9 @@ export const EXECUTE = 0b01 export const ERROR = 0b10 export const RESULT = 0b11 export const END = 0b00 +export const CUSTOM_MESSAGE = 0b100 + +type CustomMessage = [typeof CUSTOM_MESSAGE, unknown] type FunctionName = string | number | symbol type FunctionArgs = Array @@ -9,7 +12,7 @@ type FunctionArgs = Array type ExecuteMessage = [typeof EXECUTE, FunctionName, FunctionArgs] type EndMessage = [typeof END] -export type ParentMessageUnion = ExecuteMessage | EndMessage +export type ParentMessageUnion = ExecuteMessage | EndMessage | CustomMessage type ErrorType = string type ErrorMessage = string @@ -27,4 +30,4 @@ type ResultType = unknown type TaskResult = [typeof RESULT, ResultType] -export type ChildMessageUnion = TaskError | TaskResult +export type ChildMessageUnion = TaskError | TaskResult | CustomMessage