Skip to content

Commit

Permalink
task retry ts workflow (#2047)
Browse files Browse the repository at this point in the history
  • Loading branch information
DatGuyJonathan authored Feb 18, 2025
1 parent 44b7425 commit 0076b67
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 4 deletions.
15 changes: 15 additions & 0 deletions packages/ts-moose-lib/src/scripts/activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,21 @@ export const activities = {
throw new Error(errorMsg);
}
},

async getActivityRetry(filePath: string): Promise<number> {
try {
const scriptModule = await require(filePath);
const execResult = await scriptModule.default();
const retriesConfig = execResult?.config?.retries;
const retries = typeof retriesConfig === "number" ? retriesConfig : 3;
logger.info(`Using retries in ${filePath}: ${retries}`);
return retries;
} catch (error) {
const errorMsg = `Failed to get task retry for ${filePath}: ${error}`;
logger.error(errorMsg);
throw new Error(errorMsg);
}
},
};

// Helper function to create activity for a specific script
Expand Down
32 changes: 28 additions & 4 deletions packages/ts-moose-lib/src/scripts/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ import { log as logger, proxyActivities } from "@temporalio/workflow";
import { WorkflowState } from "./types";
import { mooseJsonEncode } from "./serialization";

const { executeScript, readDirectory } = proxyActivities({
startToCloseTimeout: "10 minutes",
const { getActivityRetry, readDirectory } = proxyActivities({
startToCloseTimeout: "1 minutes",
retry: {
// TODO: Use user's retry config
maximumAttempts: 3,
maximumAttempts: 1,
},
});

Expand Down Expand Up @@ -34,6 +33,15 @@ export async function ScriptWorkflow(
currentData = JSON.parse(mooseJsonEncode({ data: currentData }));

if (path.endsWith(".ts")) {
const maximumAttempts = await getActivityRetry(path);

const { executeScript } = proxyActivities({
startToCloseTimeout: "10 minutes",
retry: {
maximumAttempts,
},
});

const result = await executeScript({
scriptPath: path,
inputData: currentData,
Expand All @@ -47,6 +55,14 @@ export async function ScriptWorkflow(
const fullPath = `${path}/${item}`;

if (item.endsWith(".ts")) {
const maximumAttempts = await getActivityRetry(fullPath);
const { executeScript } = proxyActivities({
startToCloseTimeout: "10 minutes",
retry: {
maximumAttempts,
},
});

const result = await executeScript({
scriptPath: fullPath,
inputData: currentData,
Expand Down Expand Up @@ -87,6 +103,14 @@ async function handleParallelExecution(
for (const scriptFile of parallelFiles.sort()) {
if (scriptFile.endsWith(".ts")) {
const scriptPath = `${path}/${item}/${scriptFile}`;
const maximumAttempts = await getActivityRetry(scriptPath);

const { executeScript } = proxyActivities({
startToCloseTimeout: "10 minutes",
retry: {
maximumAttempts,
},
});
parallelTasks.push(
executeScript({
scriptPath,
Expand Down

0 comments on commit 0076b67

Please sign in to comment.