diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index df3f722c28..a96ad91df4 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -170,7 +170,7 @@ export class Scripts { const queueKeys = this.queue.keys; const parent: Record = job.parent - ? { ...job.parent, fpof: opts.fpof, rdof: opts.rdof } + ? { ...job.parent, fpof: opts.fpof, rdof: opts.rdof, idof: opts.idof } : null; const args = [ diff --git a/src/commands/includes/moveParentFromWaitingChildrenToFailed.lua b/src/commands/includes/moveParentFromWaitingChildrenToFailed.lua index da10722d7c..c8e0597b92 100644 --- a/src/commands/includes/moveParentFromWaitingChildrenToFailed.lua +++ b/src/commands/includes/moveParentFromWaitingChildrenToFailed.lua @@ -28,12 +28,16 @@ local function moveParentFromWaitingChildrenToFailed( parentQueueKey, parentKey, parentKey, timestamp ) - elseif parentData['rdof'] then + elseif parentData['idof'] or parentData['rdof'] then local grandParentKey = parentData['queueKey'] .. ':' .. parentData['id'] local grandParentDependenciesSet = grandParentKey .. ":dependencies" if rcall("SREM", grandParentDependenciesSet, parentKey) == 1 then moveParentToWaitIfNeeded(parentData['queueKey'], grandParentDependenciesSet, grandParentKey, parentData['id'], timestamp) + if parentData['idof'] then + local grandParentFailedSet = grandParentKey .. ":failed" + rcall("HSET", grandParentFailedSet, parentKey, failedReason) + end end end end diff --git a/tests/test_flow.ts b/tests/test_flow.ts index c976889128..8df615fe24 100644 --- a/tests/test_flow.ts +++ b/tests/test_flow.ts @@ -2285,6 +2285,156 @@ describe('flows', () => { ); }).timeout(8000); }); + + describe('when ignoreDependencyOnFailure is provided', async () => { + it('moves parent to wait after children fail', async () => { + const name = 'child-job'; + + const parentQueueName = `parent-queue-${v4()}`; + const grandChildrenQueueName = `grand-children-queue-${v4()}`; + + const parentQueue = new Queue(parentQueueName, { + connection, + prefix, + }); + const grandChildrenQueue = new Queue(grandChildrenQueueName, { + connection, + prefix, + }); + const queueEvents = new QueueEvents(queueName, { connection, prefix }); + await queueEvents.waitUntilReady(); + + let grandChildrenProcessor, + processedGrandChildren = 0; + const processingChildren = new Promise(resolve => { + grandChildrenProcessor = async job => { + processedGrandChildren++; + + if (processedGrandChildren === 2) { + return resolve(); + } + + if (job.data.foo === 'bar') { + throw new Error('failed'); + } + }; + }); + + const grandChildrenWorker = new Worker( + grandChildrenQueueName, + grandChildrenProcessor, + { connection, prefix }, + ); + + await grandChildrenWorker.waitUntilReady(); + + const flow = new FlowProducer({ connection, prefix }); + const tree = await flow.add({ + name: 'parent-job', + queueName: parentQueueName, + data: {}, + children: [ + { + name, + data: { foo: 'qux' }, + queueName, + opts: { ignoreDependencyOnFailure: true }, + children: [ + { + name, + data: { foo: 'bar' }, + queueName: grandChildrenQueueName, + opts: { failParentOnFailure: true }, + }, + { + name, + data: { foo: 'baz' }, + queueName: grandChildrenQueueName, + }, + ], + }, + ], + }); + + const failed = new Promise((resolve, reject) => { + queueEvents.on('failed', async ({ jobId, failedReason, prev }) => { + try { + if (jobId === tree!.children![0].job.id) { + expect(prev).to.be.equal('waiting-children'); + expect(failedReason).to.be.equal( + `child ${prefix}:${grandChildrenQueueName}:${ + tree!.children![0].children![0].job.id + } failed`, + ); + resolve(); + } else { + reject( + new Error( + `wrong job (${jobId}) failed instead of ${ + tree!.children![0].job.id + }`, + ), + ); + } + } catch (err) { + reject(err); + } + }); + }); + + expect(tree).to.have.property('job'); + expect(tree).to.have.property('children'); + + const { children, job } = tree; + const parentState = await job.getState(); + + expect(parentState).to.be.eql('waiting-children'); + + await processingChildren; + await failed; + + const { children: grandChildren } = children[0]; + const updatedGrandchildJob = await grandChildrenQueue.getJob( + grandChildren[0].job.id, + ); + const grandChildState = await updatedGrandchildJob.getState(); + expect(grandChildState).to.be.eql('failed'); + expect(updatedGrandchildJob.failedReason).to.be.eql('failed'); + + const updatedParentJob = await queue.getJob(children[0].job.id); + const updatedParentState = await updatedParentJob.getState(); + + expect(updatedParentState).to.be.eql('failed'); + expect(updatedParentJob.failedReason).to.be.eql( + `child ${prefix}:${grandChildrenQueueName}:${updatedGrandchildJob.id} failed`, + ); + + const updatedGrandparentJob = await parentQueue.getJob(job.id); + const updatedGrandparentState = await updatedGrandparentJob.getState(); + + expect(updatedGrandparentState).to.be.eql('waiting'); + + const failedChildrenValues = + await updatedGrandparentJob.getFailedChildrenValues(); + + const failedReason = `child ${prefix}:${grandChildrenQueueName}:${updatedGrandchildJob.id} failed`; + expect(failedChildrenValues).to.deep.equal({ + [`${queue.qualifiedName}:${children[0].job.id}`]: failedReason, + }); + + await parentQueue.close(); + await grandChildrenQueue.close(); + await grandChildrenWorker.close(); + await flow.close(); + await queueEvents.close(); + + await removeAllQueueData(new IORedis(redisHost), parentQueueName); + await removeAllQueueData( + new IORedis(redisHost), + grandChildrenQueueName, + ); + }).timeout(8000); + }); }); it('should get paginated processed dependencies keys', async () => {