Skip to content

Commit

Permalink
feat(gatsby-worker): add messaging api (gatsbyjs#32159)
Browse files Browse the repository at this point in the history
  • Loading branch information
pieh authored Jul 2, 2021
1 parent dfef2fb commit 5a93e74
Show file tree
Hide file tree
Showing 8 changed files with 373 additions and 31 deletions.
1 change: 1 addition & 0 deletions .eslintignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ packages/gatsby-plugin-preact/fast-refresh
packages/gatsby-source-wordpress/test-site/**
!packages/gatsby-source-wordpress/test-site/__tests__
!packages/gatsby-source-wordpress/test-site/test-utils
!packages/gatsby-worker/src/__tests__/fixtures/**
2 changes: 2 additions & 0 deletions .prettierignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,7 @@ packages/gatsby-source-wordpress/test-site/**
**/__testfixtures__/**
**/__tests__/fixtures/**

!packages/gatsby-worker/src/__tests__/fixtures/**

# coverage
coverage
128 changes: 127 additions & 1 deletion packages/gatsby-worker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export async function setupStep(param: string): Promise<void> {
}
```

File `parent.ts`
File `parent.ts`:

```ts
import { WorkerPool } from "gatsby-worker"
Expand Down Expand Up @@ -99,6 +99,132 @@ if (isWorker) {
}
```

### Messaging

`gatsby-worker` allows sending messages from worker to main and from main to worker at any time.

#### Sending messages from worker

File `message-types.ts`:

```ts
// `gatsby-worker` supports message types. Creating common module that centralize possible messages
// that is shared by worker and parent will ensure messages type safety.
interface IPingMessage {
type: `PING`
}

interface IAnotherMessageFromChild {
type: `OTHER_MESSAGE_FROM_CHILD`
payload: {
foo: string
}
}

export type MessagesFromChild = IPingMessage | IAnotherMessageFromChild

interface IPongMessage {
type: `PONG`
}

interface IAnotherMessageFromParent {
type: `OTHER_MESSAGE_FROM_PARENT`
payload: {
foo: string
}
}

export type MessagesFromParent = IPongMessage | IAnotherMessageFromParent
```
File `worker.ts`:
```ts
import { getMessenger } from "gatsby-worker"

import { MessagesFromParent, MessagesFromChild } from "./message-types"

const messenger = getMessenger<MessagesFromParent, MessagesFromChild>()
// messenger might be `undefined` if `getMessenger`
// is called NOT in worker context
if (messenger) {
// send a message to a parent
messenger.send({ type: `PING` })
messenger.send({
type: `OTHER_MESSAGE_FROM_CHILD`,
payload: {
foo: `bar`,
},
})

// following would cause type error as message like that is
// not part of MessagesFromChild type union
// messenger.send({ type: `NOT_PART_OF_TYPES` })

// start listening to messages from parent
messenger.onMessage(msg => {
switch (msg.type) {
case `PONG`: {
// handle PONG message
break
}
case `OTHER_MESSAGE_FROM_PARENT`: {
// msg.payload.foo will be typed as `string` here
// handle
break
}

// following would cause type error as there is no msg with
// given type as part of MessagesFromParent type union
// case `NOT_PART_OF_TYPES`: {}
}
})
}
```

File `parent.ts`:

```ts
import { getMessenger } from "gatsby-worker"

import { MessagesFromParent, MessagesFromChild } from "./message-types"

const workerPool = new WorkerPool<
typeof import("./worker"),
MessagesFromParent,
MessagesFromChild
>(
workerPath: require.resolve(`./worker`)
)

// `sendMessage` on WorkerPool instance requires second parameter
// `workerId` to specify to which worker to send message to
// (`workerId` starts at 1 for first worker).
workerPool.sendMessage(
{
type: `OTHER_MESSAGE_FROM_PARENT`,
payload: {
foo: `baz`
}
},
1
)

// start listening to messages from child
// `onMessage` callback will be called with message sent from worker
// and `workerId` (to identify which worker send this message)
workerPool.onMessage((msg: MessagesFromChild, workerId: number): void => {
switch(msg.type) {
case: `PING`: {
// send message back making sure we send it back to same worker
// that sent `PING` message
workerPool.sendMessage({ type: `PONG` }, workerId)
break
}
}
})
```

## Usage with unit tests

If you are working with source files that need transpilation, you will need to make it possible to load untranspiled modules in child processes.
Expand Down
121 changes: 107 additions & 14 deletions packages/gatsby-worker/src/__tests__/fixtures/test-child.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
import { getMessenger } from "../../"

export function sync(a: string, opts?: { addWorkerId?: boolean }): string {
return `foo ${a}${opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : ``}`
return `foo ${a}${
opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : ``
}`
}

export async function async(a: string, opts?: { addWorkerId?: boolean }): Promise<string> {
return `foo ${a}${opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : ``}`
export async function async(
a: string,
opts?: { addWorkerId?: boolean }
): Promise<string> {
return `foo ${a}${
opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : ``
}`
}

export function neverEnding(): Promise<string> {
Expand All @@ -12,23 +21,107 @@ export function neverEnding(): Promise<string> {

export const notAFunction = `string`

export function syncThrow(a: string, opts?: { addWorkerId?: boolean, throwOnWorker?: number }): string {
if (!opts?.throwOnWorker || opts?.throwOnWorker?.toString() === process.env.GATSBY_WORKER_ID) {
throw new Error(`sync throw${opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : ``}`)
export function syncThrow(
a: string,
opts?: { addWorkerId?: boolean; throwOnWorker?: number }
): string {
if (
!opts?.throwOnWorker ||
opts?.throwOnWorker?.toString() === process.env.GATSBY_WORKER_ID
) {
throw new Error(
`sync throw${
opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : ``
}`
)
}

return `foo ${a}${opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : ``}`
return `foo ${a}${
opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : ``
}`
}

export async function asyncThrow(a: string, opts?: { addWorkerId?: boolean, throwOnWorker?: number }): Promise<string> {
if (!opts?.throwOnWorker || opts?.throwOnWorker?.toString() === process.env.GATSBY_WORKER_ID) {
throw new Error(`async throw${opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : ``}`)
export async function asyncThrow(
a: string,
opts?: { addWorkerId?: boolean; throwOnWorker?: number }
): Promise<string> {
if (
!opts?.throwOnWorker ||
opts?.throwOnWorker?.toString() === process.env.GATSBY_WORKER_ID
) {
throw new Error(
`async throw${
opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : ``
}`
)
}

return `foo ${a}${opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : ``}`
return `foo ${a}${
opts?.addWorkerId ? ` (worker #${process.env.GATSBY_WORKER_ID})` : ``
}`
}

// used in task queue as previous functions would be too often too fast
export async function async100ms(taskId: number, opts?: { addWorkerId?: boolean }): Promise<{taskId: number, workerId: string}> {
return new Promise(resolve => setTimeout(resolve, 100, {taskId, workerId: opts?.addWorkerId ? process.env.GATSBY_WORKER_ID : undefined}))
}
export async function async100ms(
taskId: number,
opts?: { addWorkerId?: boolean }
): Promise<{ taskId: number; workerId: string }> {
return new Promise(resolve =>
setTimeout(resolve, 100, {
taskId,
workerId: opts?.addWorkerId ? process.env.GATSBY_WORKER_ID : undefined,
})
)
}

interface IPingMessage {
type: `PING`
}

export type MessagesFromChild = IPingMessage

interface IPongMessage {
type: `PONG`
}

export type MessagesFromParent = IPongMessage

let setupPingPongMessages = function (): Promise<void> {
throw new Error(`gatsby-worker messenger not available`)
}
let getWasPonged = function (): boolean {
throw new Error(`gatsby-worker messenger not available`)
}

const messenger = getMessenger<MessagesFromParent, MessagesFromChild>()
if (messenger) {
let wasPonged = false
setupPingPongMessages = function (): Promise<void> {
if (messenger.messagingVersion === 1) {
const pongPromise = new Promise<void>(resolve => {
messenger.onMessage(msg => {
if (msg.type === `PONG`) {
wasPonged = true
resolve()
}
})
})

messenger.sendMessage({ type: `PING` })

return pongPromise
}

return Promise.reject(
new Error(
`Not supported messaging version: "${messenger.messagingVersion}"`
)
)
}

getWasPonged = function getWasPonged(): boolean {
return wasPonged
}
}

export { setupPingPongMessages, getWasPonged }
74 changes: 62 additions & 12 deletions packages/gatsby-worker/src/__tests__/integration.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import "jest-extended"
import { WorkerPool } from "../"
import { isPromise } from "../utils"
import { MessagesFromChild, MessagesFromParent } from "./fixtures/test-child"

describe(`gatsby-worker`, () => {
let workerPool: WorkerPool<typeof import("./fixtures/test-child")> | undefined
let workerPool:
| WorkerPool<
typeof import("./fixtures/test-child"),
MessagesFromParent,
MessagesFromChild
>
| undefined
const numWorkers = 2

async function endWorkerPool(): Promise<void> {
Expand All @@ -14,17 +21,12 @@ describe(`gatsby-worker`, () => {
}

beforeEach(() => {
workerPool = new WorkerPool<typeof import("./fixtures/test-child")>(
require.resolve(`./fixtures/test-child`),
{
numWorkers,
env: {
NODE_OPTIONS: `--require ${require.resolve(
`./fixtures/ts-register`
)}`,
},
}
)
workerPool = new WorkerPool(require.resolve(`./fixtures/test-child`), {
numWorkers,
env: {
NODE_OPTIONS: `--require ${require.resolve(`./fixtures/ts-register`)}`,
},
})
})

afterEach(endWorkerPool)
Expand All @@ -46,6 +48,8 @@ describe(`gatsby-worker`, () => {
"syncThrow",
"asyncThrow",
"async100ms",
"setupPingPongMessages",
"getWasPonged",
]
`)
// .all and .single should have same methods
Expand Down Expand Up @@ -282,4 +286,50 @@ describe(`gatsby-worker`, () => {
)
})
})

describe(`messaging`, () => {
it(`worker can receive and send messages`, async () => {
if (!workerPool) {
fail(`worker pool not created`)
}

workerPool.onMessage((msg, workerId) => {
if (msg.type === `PING`) {
if (!workerPool) {
fail(`worker pool not created`)
}
workerPool.sendMessage({ type: `PONG` }, workerId)
}
})

// baseline - workers shouldn't be PONGed yet
expect(await Promise.all(workerPool.all.getWasPonged()))
.toMatchInlineSnapshot(`
Array [
false,
false,
]
`)

await Promise.all(workerPool.all.setupPingPongMessages())

expect(await Promise.all(workerPool.all.getWasPonged()))
.toMatchInlineSnapshot(`
Array [
true,
true,
]
`)
})

it(`sending message to worker that doesn't exist throws error`, async () => {
expect(() => {
if (!workerPool) {
fail(`worker pool not created`)
}

workerPool.sendMessage({ type: `PONG` }, 9001)
}).toThrowError(`There is no worker with "9001" id.`)
})
})
})
Loading

0 comments on commit 5a93e74

Please sign in to comment.