Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 4 additions & 11 deletions src/component/complete.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -238,9 +238,7 @@ describe("complete", () => {
});

it("should call onComplete handler for successful jobs", async () => {
// Create a spy on runMutation
// const runMutationSpy = vi.fn();
const schedulerSpy = vi.fn();
const runMutationSpy = vi.fn();

// Enqueue a work item with onComplete handler
const workId = await t.mutation(api.lib.enqueue, {
Expand All @@ -264,11 +262,7 @@ describe("complete", () => {
// Create a modified context with a spy on runMutation
const spyCtx = {
...ctx,
// runMutation: runMutationSpy,
scheduler: {
...ctx.scheduler,
runAfter: schedulerSpy,
},
runMutation: runMutationSpy,
};

await completeHandler(spyCtx, {
Expand All @@ -281,9 +275,8 @@ describe("complete", () => {
],
});

// Verify onComplete was called with the right arguments
expect(schedulerSpy).toHaveBeenCalledWith(
0,
// Verify onComplete was called transactionally via runMutation
expect(runMutationSpy).toHaveBeenCalledWith(
"testOnComplete",
expect.objectContaining({
workId,
Expand Down
112 changes: 47 additions & 65 deletions src/component/complete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,78 +117,60 @@ export async function completeHandler(
job.runResult.kind === "failed" &&
!!maxAttempts &&
work.attempts < maxAttempts;
if (!retry) {
let scheduledId = undefined;
if (work.onComplete) {
try {
// Retrieve large context if stored separately
let context = work.onComplete.context;
if (context === undefined && work.payloadId) {
const payload = await ctx.db.get(work.payloadId);
if (payload) {
context = payload.context;
}
}

const handle = work.onComplete.fnHandle as FunctionHandle<
"mutation",
OnCompleteArgs,
void
>;
const onCompleteArgs = {
workId: work._id,
context,
result: job.runResult,
};
if (job.runOnCompleteInline) {
try {
await ctx.runMutation(handle, onCompleteArgs);
} catch (e) {
console.error(
`[complete] caught error while running onComplete inline for ${job.workId}, scheduling instead: ${e}`,
);
scheduledId = await ctx.scheduler.runAfter(
0,
handle,
onCompleteArgs,
);
}
} else {
scheduledId = await ctx.scheduler.runAfter(
0,
handle,
onCompleteArgs,
);
console.debug(
`[complete] onComplete for ${job.workId} scheduled`,
);
}
} catch (e) {
console.error(
`[complete] error running onComplete for ${job.workId}`,
e,
);
// TODO: store failures in a table for later debugging
}
}
recordCompleted(console, work, job.runResult.kind, scheduledId);

// Clean up any large data that was stored separately.
// TODO: consider async deletion in the future to avoid bandwidth limits.
if (work.payloadId) {
await ctx.db.delete(work.payloadId);
}

// This is the terminating state for work.
await ctx.db.delete(job.workId);
}
if (job.runResult.kind !== "canceled") {
pendingCompletions.push({
runResult: stripResult(job.runResult),
workId: job.workId,
retry,
});
}
if (retry) {
return;
}
if (work.onComplete) {
// Retrieve large context if stored separately
let context = work.onComplete.context;
if (context === undefined && work.payloadId) {
const payload = await ctx.db.get(work.payloadId);
if (payload) {
context = payload.context;
}
}

const handle = work.onComplete.fnHandle as FunctionHandle<
"mutation",
OnCompleteArgs,
void
>;
const onCompleteArgs = {
workId: work._id,
context,
result: job.runResult,
};
try {
await ctx.runMutation(handle, onCompleteArgs);
} catch (e) {
console.error(
`[complete] error running onComplete for ${job.workId}`,
e,
);
await ctx.db.insert("failedOnComplete", {
workId: job.workId,
context,
runResult: stripResult(job.runResult),
});
}
}
recordCompleted(console, work, job.runResult.kind);

// Clean up any large data that was stored separately.
// TODO: consider async deletion in the future to avoid bandwidth limits.
if (work.payloadId) {
await ctx.db.delete(work.payloadId);
}

// This is the terminating state for work.
await ctx.db.delete(job.workId);
}),
);
if (pendingCompletions.length > 0) {
Expand Down
2 changes: 1 addition & 1 deletion src/component/loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ async function handleCompletions(
const retried = await rescheduleJob(ctx, work, console);
if (retried) {
state.report.retries++;
recordCompleted(console, work, "retrying", undefined);
recordCompleted(console, work, "retrying");
} else {
// We don't retry if it's been canceled in the mean time.
state.report.canceled++;
Expand Down
6 changes: 6 additions & 0 deletions src/component/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,10 @@ export default defineSchema({
args: v.optional(v.record(v.string(), v.any())),
context: v.optional(v.any()),
}),

failedOnComplete: defineTable({
workId: v.id("work"),
context: v.optional(v.any()),
runResult: vResult,
}),
});
2 changes: 0 additions & 2 deletions src/component/stats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,11 @@ export function recordCompleted(
console: Logger,
work: Doc<"work">,
status: "success" | "failed" | "canceled" | "retrying",
onCompleteScheduledFunctionId: Id<"_scheduled_functions"> | undefined,
) {
console.event("completed", {
workId: work._id,
fnName: work.fnName,
completedAt: Date.now(),
onCompleteScheduledFunctionId,
attempts: work.attempts,
status,
});
Expand Down
18 changes: 8 additions & 10 deletions src/component/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,18 @@
fnArgs = payload.args;
}

let runResult: RunResult = { kind: "failed", error: "unknown" };

Check failure on line 39 in src/component/worker.ts

View workflow job for this annotation

GitHub Actions / Test and lint

This assigned value is not used in subsequent statements
try {
const returnValue = await (args.fnType === "query"
? ctx.runQuery(args.fnHandle as FunctionHandle<"query">, fnArgs)
: ctx.runMutation(args.fnHandle as FunctionHandle<"mutation">, fnArgs));
// NOTE: we could run the `saveResult` handler here, or call `ctx.runMutation`,
// but we want the mutation to be a separate transaction to reduce the window for OCCs.
await ctx.scheduler.runAfter(0, internal.complete.complete, {
jobs: [
{ workId, runResult: { kind: "success", returnValue }, attempt },
],
});
runResult = { kind: "success", returnValue };
} catch (e: unknown) {
console.error(e);
const runResult = { kind: "failed" as const, error: formatError(e) };
runResult = { kind: "failed" as const, error: formatError(e) };
} finally {
await ctx.scheduler.runAfter(0, internal.complete.complete, {
jobs: [{ workId, runResult, attempt }],
});
Expand Down Expand Up @@ -92,17 +90,17 @@
// and `ctx.scheduler.runAfter` won't OCC.
const runResult: RunResult = { kind: "success", returnValue };
try {
// Attempt to run complete inline and onComplete inline
// Attempt to run complete inline
await ctx.runMutation(internal.complete.complete, {
jobs: [{ workId, runResult, attempt, runOnCompleteInline: true }],
jobs: [{ workId, runResult, attempt }],
});
console.info("[runActionWrapper] onComplete succeeded");
console.debug("[runActionWrapper] completed inline");
return;
} catch (e) {
console.error(
`[runActionWrapper] caught error while attempting to run complete inline, scheduling instead: ${e}`,
);
// Fall through and schedule complete instead (without running onComplete inline)
// Fall through and schedule complete instead
}
await ctx.scheduler.runAfter(0, internal.complete.complete, {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should save the scheduled job id somewhere so that recovery can look it up if complete failed for an un-catchable reason (e.g. too many reads).

jobs: [{ workId, runResult, attempt }],
Expand Down
Loading