Skip to content

Activity pause/unpause #1729

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Aug 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions packages/activity/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
*
* 1. `await` on {@link Context.cancelled | `Context.current().cancelled`} or
* {@link Context.sleep | `Context.current().sleep()`}, which each throw a {@link CancelledFailure}.
* 1. Pass the context's {@link https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal | `AbortSignal`} at
* 2. Pass the context's {@link https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal | `AbortSignal`} at
* {@link Context.cancellationSignal | `Context.current().cancellationSignal`} to a library that supports it.
*
* ### Examples
Expand All @@ -70,9 +70,18 @@
*/

import { AsyncLocalStorage } from 'node:async_hooks';
import { Logger, Duration, LogLevel, LogMetadata, MetricMeter, Priority } from '@temporalio/common';
import {
Logger,
Duration,
LogLevel,
LogMetadata,
MetricMeter,
Priority,
ActivityCancellationDetails,
} from '@temporalio/common';
import { msToNumber } from '@temporalio/common/lib/time';
import { SymbolBasedInstanceOfError } from '@temporalio/common/lib/type-helpers';
import { ActivityCancellationDetailsHolder } from '@temporalio/common/lib/activity-cancellation-details';

export {
ActivityFunction,
Expand Down Expand Up @@ -289,6 +298,11 @@ export class Context {
*/
public readonly metricMeter: MetricMeter;

/**
* Holder object for activity cancellation details
*/
private readonly _cancellationDetails: ActivityCancellationDetailsHolder;

/**
* **Not** meant to instantiated by Activity code, used by the worker.
*
Expand All @@ -300,14 +314,16 @@ export class Context {
cancellationSignal: AbortSignal,
heartbeat: (details?: any) => void,
log: Logger,
metricMeter: MetricMeter
metricMeter: MetricMeter,
details: ActivityCancellationDetailsHolder
) {
this.info = info;
this.cancelled = cancelled;
this.cancellationSignal = cancellationSignal;
this.heartbeatFn = heartbeat;
this.log = log;
this.metricMeter = metricMeter;
this._cancellationDetails = details;
}

/**
Expand Down Expand Up @@ -347,6 +363,16 @@ export class Context {
});
return Promise.race([this.cancelled.finally(() => clearTimeout(handle)), timer]);
};

/**
* Return the cancellation details for this activity, if any.
* @returns an object with boolean properties that describes the reason for cancellation, or undefined if not cancelled.
*
* @experimental Activity cancellation details include usage of experimental features such as activity pause, and may be subject to change.
*/
public get cancellationDetails(): ActivityCancellationDetails | undefined {
return this._cancellationDetails.details;
}
}

/**
Expand Down Expand Up @@ -427,6 +453,16 @@ export function cancelled(): Promise<never> {
return Context.current().cancelled;
}

/**
* Return the cancellation details for this activity, if any.
* @returns an object with boolean properties that describes the reason for cancellation, or undefined if not cancelled.
*
* @experimental Activity cancellation details include usage of experimental features such as activity pause, and may be subject to change.
*/
export function cancellationDetails(): ActivityCancellationDetails | undefined {
return Context.current().cancellationDetails;
}

/**
* Return an {@link https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal | `AbortSignal`} that can be used to
* react to Activity cancellation.
Expand Down
13 changes: 13 additions & 0 deletions packages/client/src/async-completion-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ export class ActivityCompletionError extends Error {}
@SymbolBasedInstanceOfError('ActivityCancelledError')
export class ActivityCancelledError extends Error {}

/**
* Thrown by {@link AsyncCompletionClient.heartbeat} when the reporting Activity
* has been paused.
*/
@SymbolBasedInstanceOfError('ActivityPausedError')
export class ActivityPausedError extends Error {}

/**
* Options used to configure {@link AsyncCompletionClient}
*/
Expand Down Expand Up @@ -211,6 +218,7 @@ export class AsyncCompletionClient extends BaseClient {
async heartbeat(taskTokenOrFullActivityId: Uint8Array | FullActivityId, details?: unknown): Promise<void> {
const payloads = await encodeToPayloads(this.dataConverter, details);
let cancelRequested = false;
let paused = false;
try {
if (taskTokenOrFullActivityId instanceof Uint8Array) {
const response = await this.workflowService.recordActivityTaskHeartbeat({
Expand All @@ -220,6 +228,7 @@ export class AsyncCompletionClient extends BaseClient {
details: { payloads },
});
cancelRequested = !!response.cancelRequested;
paused = !!response.activityPaused;
} else {
const response = await this.workflowService.recordActivityTaskHeartbeatById({
identity: this.options.identity,
Expand All @@ -228,12 +237,16 @@ export class AsyncCompletionClient extends BaseClient {
details: { payloads },
});
cancelRequested = !!response.cancelRequested;
paused = !!response.activityPaused;
}
} catch (err) {
this.handleError(err);
}
if (cancelRequested) {
throw new ActivityCancelledError('cancelled');
}
if (paused) {
throw new ActivityPausedError('paused');
}
}
}
52 changes: 52 additions & 0 deletions packages/common/src/activity-cancellation-details.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import type { coresdk } from '@temporalio/proto';

// ts-prune-ignore-next
export interface ActivityCancellationDetailsHolder {
details?: ActivityCancellationDetails;
}

export interface ActivityCancellationDetailsOptions {
notFound?: boolean;
cancelRequested?: boolean;
paused?: boolean;
timedOut?: boolean;
workerShutdown?: boolean;
reset?: boolean;
}

/**
* Provides the reasons for the activity's cancellation. Cancellation details are set once and do not change once set.
*/
export class ActivityCancellationDetails {
readonly notFound: boolean;
readonly cancelRequested: boolean;
readonly paused: boolean;
readonly timedOut: boolean;
readonly workerShutdown: boolean;
readonly reset: boolean;

public constructor(options: ActivityCancellationDetailsOptions = {}) {
this.notFound = options.notFound ?? false;
this.cancelRequested = options.cancelRequested ?? false;
this.paused = options.paused ?? false;
this.timedOut = options.timedOut ?? false;
this.workerShutdown = options.workerShutdown ?? false;
this.reset = options.reset ?? false;
}

static fromProto(
proto: coresdk.activity_task.IActivityCancellationDetails | null | undefined
): ActivityCancellationDetails {
if (proto == null) {
return new ActivityCancellationDetails();
}
return new ActivityCancellationDetails({
notFound: proto.isNotFound ?? false,
cancelRequested: proto.isCancelled ?? false,
paused: proto.isPaused ?? false,
timedOut: proto.isTimedOut ?? false,
workerShutdown: proto.isWorkerShutdown ?? false,
reset: proto.isReset ?? false,
});
}
}
1 change: 1 addition & 0 deletions packages/common/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import * as encoding from './encoding';
import * as helpers from './type-helpers';

export * from './activity-options';
export { ActivityCancellationDetailsOptions, ActivityCancellationDetails } from './activity-cancellation-details';
export * from './converter/data-converter';
export * from './converter/failure-converter';
export * from './converter/payload-codec';
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Payload } from '../interfaces';
import { arrayFromPayloads, fromPayloadsAtIndex, PayloadConverter, toPayloads } from '../converter/payload-converter';
import { arrayFromPayloads, fromPayloadsAtIndex, toPayloads } from '../converter/payload-converter';
import { PayloadConverterError } from '../errors';
import { PayloadCodec } from '../converter/payload-codec';
import { ProtoFailure } from '../failure';
Expand Down
24 changes: 24 additions & 0 deletions packages/test/src/activities/heartbeat-cancellation-details.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { ActivityCancellationDetails } from '@temporalio/common';
import * as activity from '@temporalio/activity';

export async function heartbeatCancellationDetailsActivity(
catchErr: boolean
): Promise<ActivityCancellationDetails | undefined> {
// Exit early if we've already run this activity.
if (activity.activityInfo().heartbeatDetails === 'finally-complete') {
return activity.cancellationDetails();
}
// eslint-disable-next-line no-constant-condition
while (true) {
try {
activity.heartbeat('heartbeated');
await activity.sleep(300);
} catch (err) {
if (err instanceof activity.CancelledFailure && catchErr) {
return activity.cancellationDetails();
}
activity.heartbeat('finally-complete');
throw err;
}
}
}
54 changes: 53 additions & 1 deletion packages/test/src/helpers-integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ import * as workflow from '@temporalio/workflow';
import { temporal } from '@temporalio/proto';
import { defineSearchAttributeKey, SearchAttributeType } from '@temporalio/common/lib/search-attributes';
import { ConnectionInjectorInterceptor } from './activities/interceptors';
import { Worker, TestWorkflowEnvironment, test as anyTest, bundlerOptions } from './helpers';
import { Worker, TestWorkflowEnvironment, test as anyTest, bundlerOptions, waitUntil } from './helpers';

export interface Context {
env: TestWorkflowEnvironment;
workflowBundle: WorkflowBundle;
}

const defaultDynamicConfigOptions = [
'frontend.activityAPIsEnabled=true',
'frontend.enableExecuteMultiOperation=true',
'frontend.workerVersioningDataAPIs=true',
'frontend.workerVersioningWorkflowAPIs=true',
Expand Down Expand Up @@ -284,6 +285,57 @@ export function configurableHelpers<T>(
};
}

export async function setActivityPauseState(handle: WorkflowHandle, activityId: string, pause: boolean): Promise<void> {
const desc = await handle.describe();
const req = {
namespace: handle.client.options.namespace,
execution: {
workflowId: desc.raw.workflowExecutionInfo?.execution?.workflowId,
runId: desc.raw.workflowExecutionInfo?.execution?.runId,
},
id: activityId,
};
if (pause) {
await handle.client.workflowService.pauseActivity(req);
} else {
await handle.client.workflowService.unpauseActivity(req);
}
await waitUntil(async () => {
const { raw } = await handle.describe();
const activityInfo = raw.pendingActivities?.find((act) => act.activityId === activityId);
// If we are pausing: success when either
// • paused flag is true OR
// • the activity vanished (it completed / retried)
if (pause) return activityInfo ? activityInfo.paused ?? false : true;
// If we are unpausing: success when either
// • paused flag is false OR
// • the activity vanished (already completed)
return activityInfo ? !activityInfo.paused : true;
}, 15000);
}

// Helper function to check if an activity has heartbeated
export async function hasActivityHeartbeat(
handle: WorkflowHandle,
activityId: string,
expectedContent?: string
): Promise<boolean> {
const { raw } = await handle.describe();
const activityInfo = raw.pendingActivities?.find((act) => act.activityId === activityId);
const heartbeatData = activityInfo?.heartbeatDetails?.payloads?.[0]?.data;
if (!heartbeatData) return false;

// If no expected content specified, just check that heartbeat data exists
if (!expectedContent) return true;

try {
const decoded = Buffer.from(heartbeatData).toString();
return decoded.includes(expectedContent);
} catch {
return false;
}
}

export function helpers(t: ExecutionContext<Context>, testEnv: TestWorkflowEnvironment = t.context.env): Helpers {
return configurableHelpers(t, t.context.workflowBundle, testEnv);
}
Loading
Loading