Skip to content

Eager Workflow Start #1757

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions packages/client/src/interceptors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ export interface WorkflowStartUpdateInput {
readonly options: WorkflowUpdateOptions;
}

/** Output for WorkflowClientInterceptor.startWithDetails */
export interface WorkflowStartOutput {
readonly runId: string;
readonly eagerlyStarted: boolean;
}

/** Output for WorkflowClientInterceptor.startUpdate */
export interface WorkflowStartUpdateOutput {
readonly updateId: string;
Expand Down Expand Up @@ -120,6 +126,18 @@ export interface WorkflowClientInterceptor {
* {@link signalWithStart} most likely needs to be implemented too
*/
start?: (input: WorkflowStartInput, next: Next<this, 'start'>) => Promise<string /* runId */>;

/**
* Intercept a service call to startWorkflowExecution
*
* Successor to {@link start}. Unlike {@link start}, this method returns
* start details via {@link WorkflowStartOutput}.
*
* If you implement this method,
* {@link signalWithStart} most likely needs to be implemented too
*/
startWithDetails?: (input: WorkflowStartInput, next: Next<this, 'startWithDetails'>) => Promise<WorkflowStartOutput>;

/**
* Intercept a service call to updateWorkflowExecution
*/
Expand Down
6 changes: 6 additions & 0 deletions packages/client/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ export interface ConnectionLike {
* @see https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal
*/
withAbortSignal<R>(abortSignal: AbortSignal, fn: () => Promise<R>): Promise<R>;

/**
* Capability flag that determines whether the connection supports eager workflow start.
* This will only be true if the underlying connection is a {@link NativeConnection}.
*/
readonly supportsEagerStart?: boolean;
}

export const QueryRejectCondition = {
Expand Down
92 changes: 78 additions & 14 deletions packages/client/src/workflow-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import {
WorkflowStartUpdateOutput,
WorkflowStartUpdateWithStartInput,
WorkflowStartUpdateWithStartOutput,
WorkflowStartOutput,
} from './interceptors';
import {
CountWorkflowExecution,
Expand Down Expand Up @@ -262,6 +263,15 @@ export interface WorkflowHandleWithFirstExecutionRunId<T extends Workflow = Work
readonly firstExecutionRunId: string;
}

/**
* This interface is exactly the same as {@link WorkflowHandleWithFirstExecutionRunId} except it
* includes the `eagerlyStarted` returned from {@link WorkflowClient.start}.
*/
export interface WorkflowHandleWithStartDetails<T extends Workflow = Workflow>
extends WorkflowHandleWithFirstExecutionRunId<T> {
readonly eagerlyStarted: boolean;
}

/**
* This interface is exactly the same as {@link WorkflowHandle} except it
* includes the `signaledRunId` returned from `signalWithStart`.
Expand Down Expand Up @@ -513,14 +523,53 @@ export class WorkflowClient extends BaseClient {
workflowTypeOrFunc: string | T,
options: WorkflowStartOptions<T>,
interceptors: WorkflowClientInterceptor[]
): Promise<string> {
): Promise<WorkflowStartOutput> {
const workflowType = extractWorkflowType(workflowTypeOrFunc);
assertRequiredWorkflowOptions(options);
const compiledOptions = compileWorkflowOptions(ensureArgs(options));

const start = composeInterceptors(interceptors, 'start', this._startWorkflowHandler.bind(this));
// Adapt legacy `start` interceptors to the new `startWithDetails` interface.
const adaptedInterceptors: WorkflowClientInterceptor[] = interceptors.map((i) => {
// If it already has the new method, or doesn't have the legacy one, no adaptation is needed.
if (i.startWithDetails || !i.start) {
return i;
}

// This interceptor has a legacy `start` but not `startWithDetails`. We'll adapt it.
return {
...i,
startWithDetails: async (input, next): Promise<WorkflowStartOutput> => {
let downstreamOut: WorkflowStartOutput | undefined;

// Patched `next` for legacy `start` interceptors.
// Captures the full `WorkflowStartOutput` while returning `runId` as a string.
const patchedNext = async (patchedInput: WorkflowStartInput): Promise<string> => {
downstreamOut = await next(patchedInput);
return downstreamOut.runId;
};

const runIdFromLegacyInterceptor = await i.start!(input, patchedNext);

// If the interceptor short-circuited (didn't call `next`), `downstreamOut` will be undefined.
// In that case, we can't have an eager start.
if (downstreamOut === undefined) {
return { runId: runIdFromLegacyInterceptor, eagerlyStarted: false };
}

// If `next` was called, honor the `runId` from the legacy interceptor but preserve
// the `eagerlyStarted` status from the actual downstream call.
return { ...downstreamOut, runId: runIdFromLegacyInterceptor };
},
};
});

const startWithDetails = composeInterceptors(
adaptedInterceptors,
'startWithDetails',
this._startWorkflowHandler.bind(this)
);

return start({
return startWithDetails({
options: compiledOptions,
headers: {},
workflowType,
Expand All @@ -536,7 +585,6 @@ export class WorkflowClient extends BaseClient {
const { signal, signalArgs, ...rest } = options;
assertRequiredWorkflowOptions(rest);
const compiledOptions = compileWorkflowOptions(ensureArgs(rest));

const signalWithStart = composeInterceptors(
interceptors,
'signalWithStart',
Expand All @@ -560,22 +608,25 @@ export class WorkflowClient extends BaseClient {
public async start<T extends Workflow>(
workflowTypeOrFunc: string | T,
options: WorkflowStartOptions<T>
): Promise<WorkflowHandleWithFirstExecutionRunId<T>> {
): Promise<WorkflowHandleWithStartDetails<T>> {
const { workflowId } = options;
const interceptors = this.getOrMakeInterceptors(workflowId);
const runId = await this._start(workflowTypeOrFunc, { ...options, workflowId }, interceptors);
const wfStartOutput = await this._start(workflowTypeOrFunc, { ...options, workflowId }, interceptors);
// runId is not used in handles created with `start*` calls because these
// handles should allow interacting with the workflow if it continues as new.
const handle = this._createWorkflowHandle({
const baseHandle = this._createWorkflowHandle({
workflowId,
runId: undefined,
firstExecutionRunId: runId,
runIdForResult: runId,
firstExecutionRunId: wfStartOutput.runId,
runIdForResult: wfStartOutput.runId,
interceptors,
followRuns: options.followRuns ?? true,
}) as WorkflowHandleWithFirstExecutionRunId<T>; // Cast is safe because we know we add the firstExecutionRunId below
(handle as any) /* readonly */.firstExecutionRunId = runId;
return handle;
});
return {
...baseHandle,
firstExecutionRunId: wfStartOutput.runId,
eagerlyStarted: wfStartOutput.eagerlyStarted,
};
}

/**
Expand Down Expand Up @@ -1245,11 +1296,15 @@ export class WorkflowClient extends BaseClient {
*
* Used as the final function of the start interceptor chain
*/
protected async _startWorkflowHandler(input: WorkflowStartInput): Promise<string> {
protected async _startWorkflowHandler(input: WorkflowStartInput): Promise<WorkflowStartOutput> {
const req = await this.createStartWorkflowRequest(input);
const { options: opts, workflowType } = input;
try {
return (await this.workflowService.startWorkflowExecution(req)).runId;
const resp = await this.workflowService.startWorkflowExecution(req);
return {
runId: resp.runId,
eagerlyStarted: resp.eagerWorkflowTask != null,
};
} catch (err: any) {
if (err.code === grpcStatus.ALREADY_EXISTS) {
throw new WorkflowExecutionAlreadyStartedError(
Expand All @@ -1265,6 +1320,14 @@ export class WorkflowClient extends BaseClient {
protected async createStartWorkflowRequest(input: WorkflowStartInput): Promise<StartWorkflowExecutionRequest> {
const { options: opts, workflowType, headers } = input;
const { identity, namespace } = this.options;

if (opts.requestEagerStart && !('supportsEagerStart' in this.connection && this.connection.supportsEagerStart)) {
throw new Error(
'Eager workflow start requires a NativeConnection shared between client and worker. ' +
'Pass a NativeConnection via ClientOptions.connection, or disable requestEagerStart.'
);
}

return {
namespace,
identity,
Expand Down Expand Up @@ -1295,6 +1358,7 @@ export class WorkflowClient extends BaseClient {
userMetadata: await encodeUserMetadata(this.dataConverter, opts.staticSummary, opts.staticDetails),
priority: opts.priority ? compilePriority(opts.priority) : undefined,
versioningOverride: opts.versioningOverride ?? undefined,
requestEagerExecution: opts.requestEagerStart,
};
}

Expand Down
12 changes: 10 additions & 2 deletions packages/client/src/workflow-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ export interface WorkflowOptions extends CommonWorkflowOptions {
* @experimental Deployment based versioning is experimental and may change in the future.
*/
versioningOverride?: VersioningOverride;

/**
* Potentially reduce the latency to start this workflow by encouraging the server to
* start it on a local worker running with this same client.
*/
requestEagerStart?: boolean;
}

export type WithCompiledWorkflowOptions<T extends WorkflowOptions> = Replace<
Expand Down Expand Up @@ -97,7 +103,8 @@ export type WorkflowSignalWithStartOptions<SignalArgs extends any[] = []> = Sign
? WorkflowSignalWithStartOptionsWithArgs<SignalArgs>
: WorkflowSignalWithStartOptionsWithoutArgs<SignalArgs>;

export interface WorkflowSignalWithStartOptionsWithoutArgs<SignalArgs extends any[]> extends WorkflowOptions {
export interface WorkflowSignalWithStartOptionsWithoutArgs<SignalArgs extends any[]>
extends Omit<WorkflowOptions, 'requestEagerStart'> {
/**
* SignalDefinition or name of signal
*/
Expand All @@ -109,7 +116,8 @@ export interface WorkflowSignalWithStartOptionsWithoutArgs<SignalArgs extends an
signalArgs?: SignalArgs;
}

export interface WorkflowSignalWithStartOptionsWithArgs<SignalArgs extends any[]> extends WorkflowOptions {
export interface WorkflowSignalWithStartOptionsWithArgs<SignalArgs extends any[]>
extends Omit<WorkflowOptions, 'requestEagerStart'> {
/**
* SignalDefinition or name of signal
*/
Expand Down
48 changes: 47 additions & 1 deletion packages/test/src/test-integration-workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { randomUUID } from 'crypto';
import asyncRetry from 'async-retry';
import { ExecutionContext } from 'ava';
import { firstValueFrom, Subject } from 'rxjs';
import { WorkflowFailedError, WorkflowHandle } from '@temporalio/client';
import { Client, Connection, WorkflowClient, WorkflowFailedError, WorkflowHandle } from '@temporalio/client';
import * as activity from '@temporalio/activity';
import { msToNumber, tsToMs } from '@temporalio/common/lib/time';
import { TestWorkflowEnvironment } from '@temporalio/testing';
Expand Down Expand Up @@ -1731,3 +1731,49 @@ test('Default handlers fail given reserved prefix', async (t) => {
await handle.terminate();
});
});

export async function helloWorkflow(name: string): Promise<string> {
return `Hello, ${name}!`;
}

test('Workflow can be started eagerly with shared NativeConnection', async (t) => {
const { createWorker, taskQueue } = helpers(t);
const client = new Client({
connection: t.context.env.nativeConnection,
namespace: t.context.env.client.options.namespace,
});

const worker = await createWorker();
await worker.runUntil(async () => {
const handle = await client.workflow.start(helloWorkflow, {
args: ['Temporal'],
workflowId: `eager-workflow-${randomUUID()}`,
taskQueue,
requestEagerStart: true,
workflowTaskTimeout: '1h', // hang if retry needed
});

t.true(handle.eagerlyStarted);

const result = await handle.result();
t.is(result, 'Hello, Temporal!');
});
});

test('Error thrown when requestEagerStart is used with regular Connection', async (t) => {
const { taskQueue } = helpers(t);

const client = new WorkflowClient({ connection: t.context.env.connection });

await t.throwsAsync(
client.start(helloWorkflow, {
args: ['Temporal'],
workflowId: `eager-workflow-error-${randomUUID()}`,
taskQueue,
requestEagerStart: true,
}),
{
message: /Eager workflow start requires a NativeConnection/,
}
);
});
1 change: 1 addition & 0 deletions packages/worker/src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { Runtime } from './runtime';
* This class can be used to power `@temporalio/client`'s Client objects.
*/
export class NativeConnection implements ConnectionLike {
public readonly supportsEagerStart = true;
/**
* referenceHolders is used internally by the framework, it can be accessed with `extractReferenceHolders` (below)
*/
Expand Down
Loading