Skip to content

Commit

Permalink
fix(flow): recursive ignoreDependencyOnFailure option (#2712)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Aug 17, 2024
1 parent 0e3436c commit 53bc9eb
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 2 deletions.
2 changes: 1 addition & 1 deletion src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ export class Scripts {
const queueKeys = this.queue.keys;

const parent: Record<string, any> = job.parent
? { ...job.parent, fpof: opts.fpof, rdof: opts.rdof }
? { ...job.parent, fpof: opts.fpof, rdof: opts.rdof, idof: opts.idof }
: null;

const args = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,16 @@ local function moveParentFromWaitingChildrenToFailed( parentQueueKey, parentKey,
parentKey,
timestamp
)
elseif parentData['rdof'] then
elseif parentData['idof'] or parentData['rdof'] then
local grandParentKey = parentData['queueKey'] .. ':' .. parentData['id']
local grandParentDependenciesSet = grandParentKey .. ":dependencies"
if rcall("SREM", grandParentDependenciesSet, parentKey) == 1 then
moveParentToWaitIfNeeded(parentData['queueKey'], grandParentDependenciesSet,
grandParentKey, parentData['id'], timestamp)
if parentData['idof'] then
local grandParentFailedSet = grandParentKey .. ":failed"
rcall("HSET", grandParentFailedSet, parentKey, failedReason)
end
end
end
end
Expand Down
150 changes: 150 additions & 0 deletions tests/test_flow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2285,6 +2285,156 @@ describe('flows', () => {
);
}).timeout(8000);
});

describe('when ignoreDependencyOnFailure is provided', async () => {
it('moves parent to wait after children fail', async () => {
const name = 'child-job';

const parentQueueName = `parent-queue-${v4()}`;
const grandChildrenQueueName = `grand-children-queue-${v4()}`;

const parentQueue = new Queue(parentQueueName, {
connection,
prefix,
});
const grandChildrenQueue = new Queue(grandChildrenQueueName, {
connection,
prefix,
});
const queueEvents = new QueueEvents(queueName, { connection, prefix });
await queueEvents.waitUntilReady();

let grandChildrenProcessor,
processedGrandChildren = 0;
const processingChildren = new Promise<void>(resolve => {
grandChildrenProcessor = async job => {
processedGrandChildren++;

if (processedGrandChildren === 2) {
return resolve();
}

if (job.data.foo === 'bar') {
throw new Error('failed');
}
};
});

const grandChildrenWorker = new Worker(
grandChildrenQueueName,
grandChildrenProcessor,
{ connection, prefix },
);

await grandChildrenWorker.waitUntilReady();

const flow = new FlowProducer({ connection, prefix });
const tree = await flow.add({
name: 'parent-job',
queueName: parentQueueName,
data: {},
children: [
{
name,
data: { foo: 'qux' },
queueName,
opts: { ignoreDependencyOnFailure: true },
children: [
{
name,
data: { foo: 'bar' },
queueName: grandChildrenQueueName,
opts: { failParentOnFailure: true },
},
{
name,
data: { foo: 'baz' },
queueName: grandChildrenQueueName,
},
],
},
],
});

const failed = new Promise<void>((resolve, reject) => {
queueEvents.on('failed', async ({ jobId, failedReason, prev }) => {
try {
if (jobId === tree!.children![0].job.id) {
expect(prev).to.be.equal('waiting-children');
expect(failedReason).to.be.equal(
`child ${prefix}:${grandChildrenQueueName}:${
tree!.children![0].children![0].job.id
} failed`,
);
resolve();
} else {
reject(
new Error(
`wrong job (${jobId}) failed instead of ${
tree!.children![0].job.id
}`,
),
);
}
} catch (err) {
reject(err);
}
});
});

expect(tree).to.have.property('job');
expect(tree).to.have.property('children');

const { children, job } = tree;
const parentState = await job.getState();

expect(parentState).to.be.eql('waiting-children');

await processingChildren;
await failed;

const { children: grandChildren } = children[0];
const updatedGrandchildJob = await grandChildrenQueue.getJob(
grandChildren[0].job.id,
);
const grandChildState = await updatedGrandchildJob.getState();
expect(grandChildState).to.be.eql('failed');
expect(updatedGrandchildJob.failedReason).to.be.eql('failed');

const updatedParentJob = await queue.getJob(children[0].job.id);
const updatedParentState = await updatedParentJob.getState();

expect(updatedParentState).to.be.eql('failed');
expect(updatedParentJob.failedReason).to.be.eql(
`child ${prefix}:${grandChildrenQueueName}:${updatedGrandchildJob.id} failed`,
);

const updatedGrandparentJob = await parentQueue.getJob(job.id);
const updatedGrandparentState = await updatedGrandparentJob.getState();

expect(updatedGrandparentState).to.be.eql('waiting');

const failedChildrenValues =
await updatedGrandparentJob.getFailedChildrenValues();

const failedReason = `child ${prefix}:${grandChildrenQueueName}:${updatedGrandchildJob.id} failed`;
expect(failedChildrenValues).to.deep.equal({
[`${queue.qualifiedName}:${children[0].job.id}`]: failedReason,
});

await parentQueue.close();
await grandChildrenQueue.close();
await grandChildrenWorker.close();
await flow.close();
await queueEvents.close();

await removeAllQueueData(new IORedis(redisHost), parentQueueName);
await removeAllQueueData(
new IORedis(redisHost),
grandChildrenQueueName,
);
}).timeout(8000);
});
});

it('should get paginated processed dependencies keys', async () => {
Expand Down

0 comments on commit 53bc9eb

Please sign in to comment.