Skip to content

Commit 247ca37

Browse files
committed
Better handle snapshot changes with the new persistent task run process,
Also don’t re-import the task file if the task is already in the resourceCatalog
1 parent 809507e commit 247ca37

File tree

5 files changed

+198
-119
lines changed

5 files changed

+198
-119
lines changed

packages/cli-v3/src/entryPoints/dev-run-worker.ts

Lines changed: 61 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -389,64 +389,74 @@ const zodIpc = new ZodIpcConnection({
389389
return;
390390
}
391391

392-
try {
393-
await runTimelineMetrics.measureMetric(
394-
"trigger.dev/start",
395-
"import",
396-
{
397-
entryPoint: taskManifest.entryPoint,
398-
file: taskManifest.filePath,
399-
},
400-
async () => {
401-
const beforeImport = performance.now();
402-
resourceCatalog.setCurrentFileContext(taskManifest.entryPoint, taskManifest.filePath);
403-
404-
// Load init file if it exists
405-
if (workerManifest.initEntryPoint) {
406-
try {
407-
await import(normalizeImportPath(workerManifest.initEntryPoint));
408-
log(`Loaded init file from ${workerManifest.initEntryPoint}`);
409-
} catch (err) {
410-
logError(`Failed to load init file`, err);
411-
throw err;
392+
// First attempt to get the task from the resource catalog
393+
let task = resourceCatalog.getTask(execution.task.id);
394+
395+
if (!task) {
396+
log(`Could not find task ${execution.task.id} in resource catalog, importing...`);
397+
398+
try {
399+
await runTimelineMetrics.measureMetric(
400+
"trigger.dev/start",
401+
"import",
402+
{
403+
entryPoint: taskManifest.entryPoint,
404+
file: taskManifest.filePath,
405+
},
406+
async () => {
407+
const beforeImport = performance.now();
408+
resourceCatalog.setCurrentFileContext(
409+
taskManifest.entryPoint,
410+
taskManifest.filePath
411+
);
412+
413+
// Load init file if it exists
414+
if (workerManifest.initEntryPoint) {
415+
try {
416+
await import(normalizeImportPath(workerManifest.initEntryPoint));
417+
log(`Loaded init file from ${workerManifest.initEntryPoint}`);
418+
} catch (err) {
419+
logError(`Failed to load init file`, err);
420+
throw err;
421+
}
412422
}
413-
}
414423

415-
await import(normalizeImportPath(taskManifest.entryPoint));
416-
resourceCatalog.clearCurrentFileContext();
417-
const durationMs = performance.now() - beforeImport;
424+
await import(normalizeImportPath(taskManifest.entryPoint));
425+
resourceCatalog.clearCurrentFileContext();
426+
const durationMs = performance.now() - beforeImport;
418427

419-
log(
420-
`Imported task ${execution.task.id} [${taskManifest.entryPoint}] in ${durationMs}ms`
421-
);
422-
}
423-
);
424-
} catch (err) {
425-
logError(`Failed to import task ${execution.task.id}`, err);
428+
log(
429+
`Imported task ${execution.task.id} [${taskManifest.entryPoint}] in ${durationMs}ms`
430+
);
431+
}
432+
);
433+
} catch (err) {
434+
logError(`Failed to import task ${execution.task.id}`, err);
426435

427-
await sender.send("TASK_RUN_COMPLETED", {
428-
execution,
429-
result: {
430-
ok: false,
431-
id: execution.run.id,
432-
error: {
433-
type: "INTERNAL_ERROR",
434-
code: TaskRunErrorCodes.COULD_NOT_IMPORT_TASK,
435-
message: err instanceof Error ? err.message : String(err),
436-
stackTrace: err instanceof Error ? err.stack : undefined,
437-
},
438-
usage: {
439-
durationMs: 0,
436+
await sender.send("TASK_RUN_COMPLETED", {
437+
execution,
438+
result: {
439+
ok: false,
440+
id: execution.run.id,
441+
error: {
442+
type: "INTERNAL_ERROR",
443+
code: TaskRunErrorCodes.COULD_NOT_IMPORT_TASK,
444+
message: err instanceof Error ? err.message : String(err),
445+
stackTrace: err instanceof Error ? err.stack : undefined,
446+
},
447+
usage: {
448+
durationMs: 0,
449+
},
450+
metadata: runMetadataManager.stopAndReturnLastFlush(),
440451
},
441-
metadata: runMetadataManager.stopAndReturnLastFlush(),
442-
},
443-
});
452+
});
444453

445-
return;
446-
}
454+
return;
455+
}
447456

448-
// Import the task module
449-
const task = resourceCatalog.getTask(execution.task.id);
457+
// Now try and get the task again
458+
task = resourceCatalog.getTask(execution.task.id);
459+
}
450460

451461
if (!task) {
452462
logError(`Could not find task ${execution.task.id}`);

packages/cli-v3/src/entryPoints/managed-run-worker.ts

Lines changed: 60 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -388,66 +388,74 @@ const zodIpc = new ZodIpcConnection({
388388
return;
389389
}
390390

391-
try {
392-
await runTimelineMetrics.measureMetric(
393-
"trigger.dev/start",
394-
"import",
395-
{
396-
entryPoint: taskManifest.entryPoint,
397-
file: taskManifest.filePath,
398-
},
399-
async () => {
400-
const beforeImport = performance.now();
401-
resourceCatalog.setCurrentFileContext(taskManifest.entryPoint, taskManifest.filePath);
402-
403-
// Load init file if it exists
404-
if (workerManifest.initEntryPoint) {
405-
try {
406-
await import(normalizeImportPath(workerManifest.initEntryPoint));
407-
console.log(`Loaded init file from ${workerManifest.initEntryPoint}`);
408-
} catch (err) {
409-
console.error(`Failed to load init file`, err);
410-
throw err;
391+
// Import the task module
392+
let task = resourceCatalog.getTask(execution.task.id);
393+
394+
if (!task) {
395+
try {
396+
await runTimelineMetrics.measureMetric(
397+
"trigger.dev/start",
398+
"import",
399+
{
400+
entryPoint: taskManifest.entryPoint,
401+
file: taskManifest.filePath,
402+
},
403+
async () => {
404+
const beforeImport = performance.now();
405+
resourceCatalog.setCurrentFileContext(
406+
taskManifest.entryPoint,
407+
taskManifest.filePath
408+
);
409+
410+
// Load init file if it exists
411+
if (workerManifest.initEntryPoint) {
412+
try {
413+
await import(normalizeImportPath(workerManifest.initEntryPoint));
414+
console.log(`Loaded init file from ${workerManifest.initEntryPoint}`);
415+
} catch (err) {
416+
console.error(`Failed to load init file`, err);
417+
throw err;
418+
}
411419
}
412-
}
413420

414-
await import(normalizeImportPath(taskManifest.entryPoint));
415-
resourceCatalog.clearCurrentFileContext();
416-
const durationMs = performance.now() - beforeImport;
421+
await import(normalizeImportPath(taskManifest.entryPoint));
422+
resourceCatalog.clearCurrentFileContext();
423+
const durationMs = performance.now() - beforeImport;
417424

418-
console.log(
419-
`Imported task ${execution.task.id} [${taskManifest.entryPoint}] in ${durationMs}ms`
420-
);
421-
}
422-
);
423-
} catch (err) {
424-
console.error(`Failed to import task ${execution.task.id}`, err);
425+
console.log(
426+
`Imported task ${execution.task.id} [${taskManifest.entryPoint}] in ${durationMs}ms`
427+
);
428+
}
429+
);
430+
} catch (err) {
431+
console.error(`Failed to import task ${execution.task.id}`, err);
425432

426-
await sender.send("TASK_RUN_COMPLETED", {
427-
execution,
428-
result: {
429-
ok: false,
430-
id: execution.run.id,
431-
error: {
432-
type: "INTERNAL_ERROR",
433-
code: TaskRunErrorCodes.COULD_NOT_IMPORT_TASK,
434-
message: err instanceof Error ? err.message : String(err),
435-
stackTrace: err instanceof Error ? err.stack : undefined,
436-
},
437-
usage: {
438-
durationMs: 0,
433+
await sender.send("TASK_RUN_COMPLETED", {
434+
execution,
435+
result: {
436+
ok: false,
437+
id: execution.run.id,
438+
error: {
439+
type: "INTERNAL_ERROR",
440+
code: TaskRunErrorCodes.COULD_NOT_IMPORT_TASK,
441+
message: err instanceof Error ? err.message : String(err),
442+
stackTrace: err instanceof Error ? err.stack : undefined,
443+
},
444+
usage: {
445+
durationMs: 0,
446+
},
447+
metadata: runMetadataManager.stopAndReturnLastFlush(),
439448
},
440-
metadata: runMetadataManager.stopAndReturnLastFlush(),
441-
},
442-
});
449+
});
443450

444-
return;
445-
}
451+
return;
452+
}
446453

447-
process.title = `trigger-dev-worker: ${execution.task.id} ${execution.run.id}`;
454+
process.title = `trigger-dev-worker: ${execution.task.id} ${execution.run.id}`;
448455

449-
// Import the task module
450-
const task = resourceCatalog.getTask(execution.task.id);
456+
// Now try and get the task again
457+
task = resourceCatalog.getTask(execution.task.id);
458+
}
451459

452460
if (!task) {
453461
console.error(`Could not find task ${execution.task.id}`);

packages/cli-v3/src/entryPoints/managed/execution.ts

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,7 @@ export class RunExecution {
144144
}
145145

146146
private attachTaskRunProcessHandlers(taskRunProcess: TaskRunProcess): void {
147-
taskRunProcess.onTaskRunHeartbeat.detach();
148-
taskRunProcess.onSendDebugLog.detach();
149-
taskRunProcess.onSetSuspendable.detach();
147+
taskRunProcess.unsafeDetachEvtHandlers();
150148

151149
taskRunProcess.onTaskRunHeartbeat.attach(async (runId) => {
152150
if (!this.runFriendlyId) {
@@ -234,7 +232,11 @@ export class RunExecution {
234232
if (this.currentAttemptNumber && this.currentAttemptNumber !== run.attemptNumber) {
235233
this.sendDebugLog("error: attempt number mismatch", snapshotMetadata);
236234
// This is a rogue execution, a new one will already have been created elsewhere
237-
await this.exitTaskRunProcessWithoutFailingRun({ flush: false });
235+
// TODO: keep this one, kill the process even if it's a keep-alive one
236+
await this.exitTaskRunProcessWithoutFailingRun({
237+
flush: false,
238+
reason: "attempt number mismatch",
239+
});
238240
return;
239241
}
240242

@@ -248,7 +250,11 @@ export class RunExecution {
248250
if (deprecated) {
249251
this.sendDebugLog("run execution is deprecated", { incomingSnapshot: snapshot });
250252

251-
await this.exitTaskRunProcessWithoutFailingRun({ flush: false });
253+
// TODO: keep this one, kill the process even if it's a keep-alive one
254+
await this.exitTaskRunProcessWithoutFailingRun({
255+
flush: false,
256+
reason: "deprecated execution",
257+
});
252258
return;
253259
}
254260

@@ -271,13 +277,13 @@ export class RunExecution {
271277
case "QUEUED": {
272278
this.sendDebugLog("run was re-queued", snapshotMetadata);
273279

274-
await this.exitTaskRunProcessWithoutFailingRun({ flush: true });
280+
await this.exitTaskRunProcessWithoutFailingRun({ flush: true, reason: "re-queued" });
275281
return;
276282
}
277283
case "FINISHED": {
278284
this.sendDebugLog("run is finished", snapshotMetadata);
279285

280-
await this.exitTaskRunProcessWithoutFailingRun({ flush: true });
286+
// This can sometimes be called before the handleCompletionResult, so we don't need to do anything here
281287
return;
282288
}
283289
case "QUEUED_EXECUTING":
@@ -292,7 +298,7 @@ export class RunExecution {
292298

293299
// This will kill the process and fail the execution with a SuspendedProcessError
294300
// We don't flush because we already did before suspending
295-
await this.exitTaskRunProcessWithoutFailingRun({ flush: false });
301+
await this.exitTaskRunProcessWithoutFailingRun({ flush: false, reason: "suspended" });
296302
return;
297303
}
298304
case "PENDING_EXECUTING": {
@@ -825,11 +831,17 @@ export class RunExecution {
825831
this.restoreCount++;
826832
}
827833

828-
private async exitTaskRunProcessWithoutFailingRun({ flush }: { flush: boolean }) {
829-
await this.taskRunProcess?.suspend({ flush });
834+
private async exitTaskRunProcessWithoutFailingRun({
835+
flush,
836+
reason,
837+
}: {
838+
flush: boolean;
839+
reason: string;
840+
}) {
841+
await this.taskRunProcessProvider.suspendProcess(flush, this.taskRunProcess);
830842

831843
// No services should be left running after this line - let's make sure of it
832-
this.shutdown("exitTaskRunProcessWithoutFailingRun");
844+
this.shutdown(`exitTaskRunProcessWithoutFailingRun: ${reason}`);
833845
}
834846

835847
/**

0 commit comments

Comments
 (0)