diff --git a/src/classes/job.ts b/src/classes/job.ts index 06035eefd9..c941bed83a 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -38,6 +38,7 @@ import type { QueueEvents } from './queue-events'; const logger = debuglog('bull'); const optsDecodeMap = { + pen: 'pending', de: 'debounce', fpof: 'failParentOnFailure', idof: 'ignoreDependencyOnFailure', diff --git a/src/classes/queue-keys.ts b/src/classes/queue-keys.ts index 9109ae8dba..97ed84566d 100644 --- a/src/classes/queue-keys.ts +++ b/src/classes/queue-keys.ts @@ -8,6 +8,7 @@ export class QueueKeys { [ '', 'active', + 'pending', 'wait', 'waiting-children', 'paused', diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index df3f722c28..295e96ea88 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -60,6 +60,7 @@ export class Scripts { undefined, undefined, undefined, + undefined, ]; } @@ -111,6 +112,7 @@ export class Scripts { queueKeys.active, queueKeys.events, queueKeys.pc, + queueKeys.pending, ]; keys.push(pack(args), job.data, encodedOpts); @@ -153,6 +155,7 @@ export class Scripts { queueKeys.active, queueKeys.events, queueKeys.marker, + queueKeys.pending, ]; keys.push(pack(args), job.data, encodedOpts); @@ -467,6 +470,7 @@ export class Scripts { keys[11] = this.queue.toKey(job.id ?? ''); keys[12] = metricsKey; keys[13] = this.queue.keys.marker; + keys[14] = this.queue.keys.pending; const keepJobs = this.getKeepJobs(shouldRemove, workerKeepJobs); @@ -843,6 +847,7 @@ export class Scripts { this.queue.keys.active, this.queue.keys.pc, this.queue.keys.marker, + this.queue.keys.pending, ]; return keys.concat([ @@ -870,6 +875,7 @@ export class Scripts { queueKeys.events, queueKeys.meta, queueKeys.stalled, + queueKeys.pending, ]; return keys.concat([ @@ -907,6 +913,7 @@ export class Scripts { 'waiting-children', jobId, 'stalled', + 'pending', ].map(name => { return this.queue.toKey(name); }); @@ -921,7 +928,11 @@ export class Scripts { isMaxedArgs(): string[] { const queueKeys = this.queue.keys; - const keys: string[] = [queueKeys.meta, queueKeys.active]; + const keys: string[] = [ + queueKeys.meta, + queueKeys.active, + queueKeys.pending, + ]; return keys; } @@ -1042,6 +1053,7 @@ export class Scripts { this.queue.keys.pc, this.queue.keys.marker, this.queue.keys.stalled, + this.queue.keys.pending, ]; const pushCmd = (lifo ? 'R' : 'L') + 'PUSH'; @@ -1068,6 +1080,7 @@ export class Scripts { this.queue.toKey('paused'), this.queue.keys.meta, this.queue.keys.active, + this.queue.keys.pending, this.queue.keys.marker, ]; @@ -1124,6 +1137,7 @@ export class Scripts { this.queue.keys.paused, this.queue.keys.active, this.queue.keys.marker, + this.queue.keys.pending, ]; const args = [ @@ -1164,6 +1178,7 @@ export class Scripts { queueKeys.meta, queueKeys.pc, queueKeys.marker, + queueKeys.pending, ]; const args: (string | number | boolean | Buffer)[] = [ @@ -1196,6 +1211,7 @@ export class Scripts { this.queue.keys.active, this.queue.keys.pc, this.queue.keys.events, + this.queue.keys.pending, this.queue.keys.marker, ]; @@ -1222,6 +1238,7 @@ export class Scripts { this.queue.keys['stalled-check'], this.queue.keys.meta, this.queue.keys.paused, + this.queue.keys.pending, this.queue.keys.marker, this.queue.keys.events, ]; @@ -1275,6 +1292,7 @@ export class Scripts { this.queue.keys.limiter, this.queue.keys.prioritized, this.queue.keys.marker, + this.queue.keys.pending, this.queue.keys.events, ]; diff --git a/src/commands/addPrioritizedJob-8.lua b/src/commands/addPrioritizedJob-9.lua similarity index 97% rename from src/commands/addPrioritizedJob-8.lua rename to src/commands/addPrioritizedJob-9.lua index 288e779154..dc020933c3 100644 --- a/src/commands/addPrioritizedJob-8.lua +++ b/src/commands/addPrioritizedJob-9.lua @@ -13,6 +13,7 @@ KEYS[6] 'active' KEYS[7] events stream key KEYS[8] 'pc' priority counter + KEYS[9] 'pending' ARGV[1] msgpacked arguments array [1] key prefix, @@ -102,7 +103,7 @@ local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], repeatJobKey) -- Add the job to the prioritized set -local isPausedOrMaxed = isQueuePausedOrMaxed(metaKey, activeKey) +local isPausedOrMaxed = isQueuePausedOrMaxed(metaKey, activeKey, KEYS[9]) addJobWithPriority( KEYS[1], priorityKey, priority, jobId, priorityCounterKey, isPausedOrMaxed) -- Emit waiting event diff --git a/src/commands/addStandardJob-8.lua b/src/commands/addStandardJob-9.lua similarity index 98% rename from src/commands/addStandardJob-8.lua rename to src/commands/addStandardJob-9.lua index 54fb363d73..3b3176c134 100644 --- a/src/commands/addStandardJob-8.lua +++ b/src/commands/addStandardJob-9.lua @@ -23,6 +23,7 @@ KEYS[6] 'active' KEYS[7] events stream key KEYS[8] marker key + KEYS[9] pending key ARGV[1] msgpacked arguments array [1] key prefix, @@ -104,7 +105,7 @@ end storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp, parentKey, parentData, repeatJobKey) -local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[6], KEYS[1], KEYS[2]) +local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[6], KEYS[1], KEYS[2], KEYS[9]) -- LIFO or FIFO local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH' diff --git a/src/commands/changePriority-7.lua b/src/commands/changePriority-8.lua similarity index 97% rename from src/commands/changePriority-7.lua rename to src/commands/changePriority-8.lua index d23461edb1..3cc186ba2d 100644 --- a/src/commands/changePriority-7.lua +++ b/src/commands/changePriority-8.lua @@ -8,6 +8,7 @@ KEYS[5] 'active' KEYS[6] 'pc' priority counter KEYS[7] 'marker' + KEYS[8] 'pending' ARGV[1] priority value ARGV[2] job key @@ -46,11 +47,11 @@ end if rcall("EXISTS", jobKey) == 1 then local metaKey = KEYS[3] - local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[5], KEYS[1], KEYS[2]) + local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[5], KEYS[1], KEYS[2], KEYS[8]) local prioritizedKey = KEYS[4] local priorityCounterKey = KEYS[6] local markerKey = KEYS[7] - + -- Re-add with the new priority if rcall("ZREM", KEYS[4], jobId) > 0 then reAddJobWithNewPriority( prioritizedKey, markerKey, target, diff --git a/src/commands/includes/addPendingJobIfNeeded.lua b/src/commands/includes/addPendingJobIfNeeded.lua new file mode 100644 index 0000000000..d3f60d80c7 --- /dev/null +++ b/src/commands/includes/addPendingJobIfNeeded.lua @@ -0,0 +1,9 @@ +--[[ + Function to add job into pending state. +]] + +local function addPendingJobIfNeeded(isPending, pendingKey, jobId, timestamp) + if isPending then + rcall("ZADD", pendingKey, timestamp, jobId) + end +end diff --git a/src/commands/includes/getTargetQueueList.lua b/src/commands/includes/getTargetQueueList.lua index 2a7b03571a..02f4c2130e 100644 --- a/src/commands/includes/getTargetQueueList.lua +++ b/src/commands/includes/getTargetQueueList.lua @@ -3,7 +3,7 @@ (since an empty list and !EXISTS are not really the same). ]] -local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey) +local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey, pendingKey) local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency") if queueAttributes[1] then @@ -11,7 +11,8 @@ local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey) else if queueAttributes[2] then local activeCount = rcall("LLEN", activeKey) - if activeCount >= tonumber(queueAttributes[2]) then + local pendingCount = rcall("ZCARD", pendingKey) + if (activeCount + pendingCount) >= tonumber(queueAttributes[2]) then return waitKey, true else return waitKey, false diff --git a/src/commands/includes/isQueueMaxed.lua b/src/commands/includes/isQueueMaxed.lua index d0a81aedd5..619d7d7d09 100644 --- a/src/commands/includes/isQueueMaxed.lua +++ b/src/commands/includes/isQueueMaxed.lua @@ -1,12 +1,13 @@ --[[ Function to check if queue is maxed or not. ]] -local function isQueueMaxed(queueMetaKey, activeKey) +local function isQueueMaxed(queueMetaKey, activeKey, pendingKey) local maxConcurrency = rcall("HGET", queueMetaKey, "concurrency") if maxConcurrency then local activeCount = rcall("LLEN", activeKey) - if activeCount >= tonumber(maxConcurrency) then + local pendingCount = rcall("ZCARD", pendingKey) + if (activeCount + pendingCount) >= tonumber(maxConcurrency) then return true end end diff --git a/src/commands/includes/isQueuePausedOrMaxed.lua b/src/commands/includes/isQueuePausedOrMaxed.lua index 89d8a0f92c..36c528ba2a 100644 --- a/src/commands/includes/isQueuePausedOrMaxed.lua +++ b/src/commands/includes/isQueuePausedOrMaxed.lua @@ -3,7 +3,7 @@ (since an empty list and !EXISTS are not really the same). ]] -local function isQueuePausedOrMaxed(queueMetaKey, activeKey) +local function isQueuePausedOrMaxed(queueMetaKey, activeKey, pendingKey) local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency") if queueAttributes[1] then @@ -11,7 +11,8 @@ local function isQueuePausedOrMaxed(queueMetaKey, activeKey) else if queueAttributes[2] then local activeCount = rcall("LLEN", activeKey) - return activeCount >= tonumber(queueAttributes[2]) + local pendingCount = rcall("ZCARD", pendingKey) + return (activeCount + pendingCount) >= tonumber(queueAttributes[2]) end end return false diff --git a/src/commands/includes/moveParentToWaitIfNeeded.lua b/src/commands/includes/moveParentToWaitIfNeeded.lua index 33d9bcb031..b57b5d492f 100644 --- a/src/commands/includes/moveParentToWaitIfNeeded.lua +++ b/src/commands/includes/moveParentToWaitIfNeeded.lua @@ -16,12 +16,13 @@ local function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey, if rcall("SCARD", parentDependenciesKey) == 0 and isParentActive then rcall("ZREM", parentQueueKey .. ":waiting-children", parentId) local parentWaitKey = parentQueueKey .. ":wait" + local parentPendingKey = parentQueueKey .. ":pending" local parentPausedKey = parentQueueKey .. ":paused" local parentActiveKey = parentQueueKey .. ":active" local parentMetaKey = parentQueueKey .. ":meta" local parentMarkerKey = parentQueueKey .. ":marker" - local jobAttributes = rcall("HMGET", parentKey, "priority", "delay") + local jobAttributes = rcall("HMGET", parentKey, "priority", "delay", "pen") local priority = tonumber(jobAttributes[1]) or 0 local delay = tonumber(jobAttributes[2]) or 0 @@ -35,17 +36,22 @@ local function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey, addDelayMarkerIfNeeded(parentMarkerKey, parentDelayedKey) else - if priority == 0 then - local parentTarget, isParentPausedOrMaxed = - getTargetQueueList(parentMetaKey, parentActiveKey, parentWaitKey, - parentPausedKey) - addJobInTargetList(parentTarget, parentMarkerKey, "RPUSH", isParentPausedOrMaxed, - parentId) + if jobAttributes[3] then + rcall("ZREM", parentPendingKey, parentId) + addJobInTargetList(parentTarget, parentMarkerKey, "RPUSH", isParentPausedOrMaxed, parentId) else - local isPausedOrMaxed = isQueuePausedOrMaxed(parentMetaKey, parentActiveKey) - addJobWithPriority(parentMarkerKey, - parentQueueKey .. ":prioritized", priority, - parentId, parentQueueKey .. ":pc", isPausedOrMaxed) + if priority == 0 then + local parentTarget, isParentPausedOrMaxed = + getTargetQueueList(parentMetaKey, parentActiveKey, parentWaitKey, + parentPausedKey, parentPendingKey) + addJobInTargetList(parentTarget, parentMarkerKey, "RPUSH", isParentPausedOrMaxed, + parentId) + else + local isPausedOrMaxed = isQueuePausedOrMaxed(parentMetaKey, parentActiveKey, parentPendingKey) + addJobWithPriority(parentMarkerKey, + parentQueueKey .. ":prioritized", priority, + parentId, parentQueueKey .. ":pc", isPausedOrMaxed) + end end rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", diff --git a/src/commands/includes/promoteDelayedJobs.lua b/src/commands/includes/promoteDelayedJobs.lua index 56cb209c61..e49f08126a 100644 --- a/src/commands/includes/promoteDelayedJobs.lua +++ b/src/commands/includes/promoteDelayedJobs.lua @@ -12,7 +12,7 @@ -- Try to get as much as 1000 jobs at once local function promoteDelayedJobs(delayedKey, markerKey, targetKey, prioritizedKey, - eventStreamKey, prefix, timestamp, priorityCounterKey, isPaused) + eventStreamKey, prefix, timestamp, priorityCounterKey, isPaused, pendingKey) local jobs = rcall("ZRANGEBYSCORE", delayedKey, 0, (timestamp + 1) * 0x1000 - 1, "LIMIT", 0, 1000) if (#jobs > 0) then @@ -20,15 +20,21 @@ local function promoteDelayedJobs(delayedKey, markerKey, targetKey, prioritizedK for _, jobId in ipairs(jobs) do local jobKey = prefix .. jobId + local jobAttributes = rcall("HMGET", jobKey, "priority", "pen") local priority = - tonumber(rcall("HGET", jobKey, "priority")) or 0 + tonumber(jobAttributes[1]) or 0 - if priority == 0 then - -- LIFO or FIFO - addJobInTargetList(targetKey, markerKey, "LPUSH", isPaused, jobId) + if jobAttributes[2] then + rcall("ZREM", pendingKey, jobId) + addJobInTargetList(targetKey, markerKey, "RPUSH", isPaused, jobId) else - addJobWithPriority(markerKey, prioritizedKey, priority, - jobId, priorityCounterKey, isPaused) + if priority == 0 then + -- LIFO or FIFO + addJobInTargetList(targetKey, markerKey, "LPUSH", isPaused, jobId) + else + addJobWithPriority(markerKey, prioritizedKey, priority, + jobId, priorityCounterKey, isPaused) + end end -- Emit waiting event diff --git a/src/commands/includes/removeParentDependencyKey.lua b/src/commands/includes/removeParentDependencyKey.lua index 87262850c9..c63506b34a 100644 --- a/src/commands/includes/removeParentDependencyKey.lua +++ b/src/commands/includes/removeParentDependencyKey.lua @@ -12,7 +12,7 @@ local function moveParentToWait(parentPrefix, parentId, emitEvent) local parentTarget, isPausedOrMaxed = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "active", - parentPrefix .. "wait", parentPrefix .. "paused") + parentPrefix .. "wait", parentPrefix .. "paused", parentPrefix .. "pending") addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPausedOrMaxed, parentId) if emitEvent then diff --git a/src/commands/includes/storeJob.lua b/src/commands/includes/storeJob.lua index ef53c9cba1..da2a7f60b0 100644 --- a/src/commands/includes/storeJob.lua +++ b/src/commands/includes/storeJob.lua @@ -7,7 +7,8 @@ local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp, local delay = opts['delay'] or 0 local priority = opts['priority'] or 0 local debounceId = opts['de'] and opts['de']['id'] - + local pending = opts['pen'] + local optionalValues = {} if parentKey ~= nil then table.insert(optionalValues, "parentKey") @@ -26,6 +27,11 @@ local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp, table.insert(optionalValues, debounceId) end + if pending then + table.insert(optionalValues, "pen") + table.insert(optionalValues, 1) + end + rcall("HMSET", jobIdKey, "name", name, "data", data, "opts", jsonOpts, "timestamp", timestamp, "delay", delay, "priority", priority, unpack(optionalValues)) diff --git a/src/commands/isMaxed-2.lua b/src/commands/isMaxed-3.lua similarity index 75% rename from src/commands/isMaxed-2.lua rename to src/commands/isMaxed-3.lua index b01e79139a..b59ea3fcaa 100644 --- a/src/commands/isMaxed-2.lua +++ b/src/commands/isMaxed-3.lua @@ -4,6 +4,7 @@ Input: KEYS[1] meta key KEYS[2] active key + KEYS[3] pending key Output: 1 if element found in the list. @@ -14,4 +15,4 @@ local rcall = redis.call -- Includes --- @include "includes/isQueueMaxed" -return isQueueMaxed(KEYS[1], KEYS[2]) +return isQueueMaxed(KEYS[1], KEYS[2], KEYS[3]) diff --git a/src/commands/moveJobFromActiveToWait-10.lua b/src/commands/moveJobFromActiveToWait-11.lua similarity index 90% rename from src/commands/moveJobFromActiveToWait-10.lua rename to src/commands/moveJobFromActiveToWait-11.lua index e90d6d2d10..4b4a0a52a4 100644 --- a/src/commands/moveJobFromActiveToWait-10.lua +++ b/src/commands/moveJobFromActiveToWait-11.lua @@ -11,7 +11,8 @@ KEYS[7] limiter key KEYS[8] prioritized key KEYS[9] marker key - KEYS[10] event key + KEYS[10] pending key + KEYS[11] event key ARGV[1] job id ARGV[2] lock token @@ -35,7 +36,7 @@ if lockToken == token then local metaKey = KEYS[6] local removed = rcall("LREM", KEYS[1], 1, jobId) if removed > 0 then - local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[1], KEYS[2], KEYS[5]) + local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[1], KEYS[2], KEYS[5], KEYS[10]) rcall("SREM", KEYS[3], jobId) @@ -52,7 +53,7 @@ if lockToken == token then local maxEvents = getOrSetMaxEvents(metaKey) -- Emit waiting event - rcall("XADD", KEYS[10], "MAXLEN", "~", maxEvents, "*", "event", "waiting", + rcall("XADD", KEYS[11], "MAXLEN", "~", maxEvents, "*", "event", "waiting", "jobId", jobId) end end diff --git a/src/commands/moveJobsToWait-8.lua b/src/commands/moveJobsToWait-9.lua similarity index 93% rename from src/commands/moveJobsToWait-8.lua rename to src/commands/moveJobsToWait-9.lua index 15e99c6295..63a135efbe 100644 --- a/src/commands/moveJobsToWait-8.lua +++ b/src/commands/moveJobsToWait-9.lua @@ -11,7 +11,8 @@ KEYS[5] 'paused' KEYS[6] 'meta' KEYS[7] 'active' - KEYS[8] 'marker' + KEYS[8] 'pendings' + KEYS[9] 'marker' ARGV[1] count ARGV[2] timestamp @@ -33,7 +34,7 @@ local rcall = redis.call; --- @include "includes/getTargetQueueList" local metaKey = KEYS[6] -local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[7], KEYS[4], KEYS[5]) +local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[7], KEYS[4], KEYS[5], KEYS[8]) local jobs = rcall('ZRANGEBYSCORE', KEYS[3], 0, timestamp, 'LIMIT', 0, maxCount) if (#jobs > 0) then @@ -63,7 +64,7 @@ if (#jobs > 0) then rcall("LPUSH", target, unpack(jobs, from, to)) end - addBaseMarkerIfNeeded(KEYS[8], isPausedOrMaxed) + addBaseMarkerIfNeeded(KEYS[9], isPausedOrMaxed) end maxCount = maxCount - #jobs diff --git a/src/commands/moveStalledJobsToWait-9.lua b/src/commands/moveStalledJobsToWait-10.lua similarity index 94% rename from src/commands/moveStalledJobsToWait-9.lua rename to src/commands/moveStalledJobsToWait-10.lua index fe39587ebc..aa005a6663 100644 --- a/src/commands/moveStalledJobsToWait-9.lua +++ b/src/commands/moveStalledJobsToWait-10.lua @@ -2,15 +2,16 @@ Move stalled jobs to wait. Input: - KEYS[1] 'stalled' (SET) - KEYS[2] 'wait', (LIST) - KEYS[3] 'active', (LIST) - KEYS[4] 'failed', (ZSET) - KEYS[5] 'stalled-check', (KEY) - KEYS[6] 'meta', (KEY) - KEYS[7] 'paused', (LIST) - KEYS[8] 'marker' - KEYS[9] 'event stream' (STREAM) + KEYS[1] 'stalled' (SET) + KEYS[2] 'wait', (LIST) + KEYS[3] 'active', (LIST) + KEYS[4] 'failed', (ZSET) + KEYS[5] 'stalled-check', (KEY) + KEYS[6] 'meta', (KEY) + KEYS[7] 'paused', (LIST) + KEYS[8] 'pending' (ZSET) + KEYS[9] 'marker' + KEYS[10] 'event stream' (STREAM) ARGV[1] Max stalled job count ARGV[2] queue.toKey('') @@ -42,8 +43,8 @@ local failedKey = KEYS[4] local stalledCheckKey = KEYS[5] local metaKey = KEYS[6] local pausedKey = KEYS[7] -local markerKey = KEYS[8] -local eventStreamKey = KEYS[9] +local markerKey = KEYS[9] +local eventStreamKey = KEYS[10] local maxStalledJobCount = ARGV[1] local queueKeyPrefix = ARGV[2] local timestamp = ARGV[3] @@ -147,7 +148,7 @@ if (#stalling > 0) then table.insert(failed, jobId) else local target, isPausedOrMaxed= - getTargetQueueList(metaKey, activeKey, waitKey, pausedKey) + getTargetQueueList(metaKey, activeKey, waitKey, pausedKey, KEYS[8]) -- Move the job back to the wait queue, to immediately be picked up by a waiting worker. addJobInTargetList(target, markerKey, "RPUSH", isPausedOrMaxed, jobId) diff --git a/src/commands/moveToActive-11.lua b/src/commands/moveToActive-12.lua similarity index 87% rename from src/commands/moveToActive-11.lua rename to src/commands/moveToActive-12.lua index 946091e84a..26e216adaa 100644 --- a/src/commands/moveToActive-11.lua +++ b/src/commands/moveToActive-12.lua @@ -7,23 +7,24 @@ so that no other worker picks this job again. Input: - KEYS[1] wait key - KEYS[2] active key - KEYS[3] prioritized key - KEYS[4] stream events key - KEYS[5] stalled key + KEYS[1] wait key + KEYS[2] active key + KEYS[3] prioritized key + KEYS[4] stream events key + KEYS[5] stalled key -- Rate limiting - KEYS[6] rate limiter key - KEYS[7] delayed key + KEYS[6] rate limiter key + KEYS[7] delayed key -- Delayed jobs - KEYS[8] paused key - KEYS[9] meta key + KEYS[8] paused key + KEYS[9] meta key KEYS[10] pc priority counter -- Marker KEYS[11] marker key + KEYS[12] pending key -- Arguments ARGV[1] key prefix @@ -50,12 +51,12 @@ local opts = cmsgpack.unpack(ARGV[3]) --- @include "includes/prepareJobForProcessing" --- @include "includes/promoteDelayedJobs" -local target, isPausedOrMaxed = getTargetQueueList(KEYS[9], activeKey, waitKey, KEYS[8]) +local target, isPausedOrMaxed = getTargetQueueList(KEYS[9], activeKey, waitKey, KEYS[8], KEYS[12]) -- Check if there are delayed jobs that we can move to wait. local markerKey = KEYS[11] promoteDelayedJobs(delayedKey, markerKey, target, KEYS[3], eventStreamKey, ARGV[1], - ARGV[2], KEYS[10], isPausedOrMaxed) + ARGV[2], KEYS[10], isPausedOrMaxed, KEYS[12]) local maxJobs = tonumber(opts['limiter'] and opts['limiter']['max']) local expireTime = getRateLimitTTL(maxJobs, rateLimiterKey) diff --git a/src/commands/moveToDelayed-8.lua b/src/commands/moveToDelayed-9.lua similarity index 90% rename from src/commands/moveToDelayed-8.lua rename to src/commands/moveToDelayed-9.lua index e48d3f235d..5889b374b9 100644 --- a/src/commands/moveToDelayed-8.lua +++ b/src/commands/moveToDelayed-9.lua @@ -10,6 +10,7 @@ KEYS[6] events stream KEYS[7] meta key KEYS[8] stalled key + KEYS[9] pending key ARGV[1] key prefix ARGV[2] timestamp @@ -30,6 +31,7 @@ local rcall = redis.call -- Includes --- @include "includes/addDelayMarkerIfNeeded" +--- @include "includes/addPendingJobIfNeeded" --- @include "includes/getDelayedScore" --- @include "includes/getOrSetMaxEvents" --- @include "includes/removeLock" @@ -51,6 +53,9 @@ if rcall("EXISTS", jobKey) == 1 then local numRemovedElements = rcall("LREM", KEYS[2], -1, jobId) if numRemovedElements < 1 then return -3 end + local isPending = rcall("HGET", jobKey, "pen") + addPendingJobIfNeeded(isPending, KEYS[9], jobId, ARGV[2]) + if ARGV[6] == "0" then rcall("HINCRBY", jobKey, "atm", 1) end diff --git a/src/commands/moveToFinished-14.lua b/src/commands/moveToFinished-15.lua similarity index 98% rename from src/commands/moveToFinished-14.lua rename to src/commands/moveToFinished-15.lua index dc396da706..3819d69a60 100644 --- a/src/commands/moveToFinished-14.lua +++ b/src/commands/moveToFinished-15.lua @@ -23,6 +23,7 @@ KEYS[12] jobId key KEYS[13] metrics key KEYS[14] marker key + KEYS[15] pending key ARGV[1] jobId ARGV[2] timestamp @@ -205,11 +206,11 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists -- and not rate limited. if (ARGV[6] == "1") then - local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[2], KEYS[1], KEYS[8]) + local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[2], KEYS[1], KEYS[8], KEYS[15]) -- Check if there are delayed jobs that can be promoted promoteDelayedJobs(KEYS[7], KEYS[14], target, KEYS[3], eventStreamKey, prefix, - timestamp, KEYS[10], isPausedOrMaxed) + timestamp, KEYS[10], isPausedOrMaxed, KEYS[15]) local maxJobs = tonumber(opts['limiter'] and opts['limiter']['max']) -- Check if we are rate limited first. diff --git a/src/commands/moveToWaitingChildren-5.lua b/src/commands/moveToWaitingChildren-6.lua similarity index 58% rename from src/commands/moveToWaitingChildren-5.lua rename to src/commands/moveToWaitingChildren-6.lua index 4ee8f29a25..34d9e6e6da 100644 --- a/src/commands/moveToWaitingChildren-5.lua +++ b/src/commands/moveToWaitingChildren-6.lua @@ -7,6 +7,7 @@ KEYS[3] waitChildrenKey key KEYS[4] job key KEYS[5] stalled key + KEYS[6] pending key ARGV[1] token ARGV[2] child key @@ -22,12 +23,14 @@ ]] local rcall = redis.call local stalledKey = KEYS[5] +local jobKey = KEYS[4] --- Includes +--- @include "includes/addPendingJobIfNeeded" --- @include "includes/removeLock" -local function moveToWaitingChildren (activeKey, waitingChildrenKey, jobId, - timestamp) +local function moveToWaitingChildren (activeKey, waitingChildrenKey, pendingKey, + jobKey, jobId, timestamp) local score = tonumber(timestamp) local numRemovedElements = rcall("LREM", activeKey, -1, jobId) @@ -36,29 +39,32 @@ local function moveToWaitingChildren (activeKey, waitingChildrenKey, jobId, return -3 end + local isPending = rcall("HGET", jobKey, "pen") + addPendingJobIfNeeded(isPending, pendingKey, jobId, timestamp) + rcall("ZADD", waitingChildrenKey, score, jobId) return 0 end -if rcall("EXISTS", KEYS[4]) == 1 then +if rcall("EXISTS", jobKey) == 1 then if ARGV[2] ~= "" then - if rcall("SISMEMBER", KEYS[4] .. ":dependencies", ARGV[2]) ~= 0 then - local errorCode = removeLock(KEYS[4], stalledKey, ARGV[1], ARGV[4]) + if rcall("SISMEMBER", jobKey .. ":dependencies", ARGV[2]) ~= 0 then + local errorCode = removeLock(jobKey, stalledKey, ARGV[1], ARGV[4]) if errorCode < 0 then return errorCode end - return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3]) + return moveToWaitingChildren(KEYS[2], KEYS[3], KEYS[6], jobKey, ARGV[4], ARGV[3]) end return 1 else - if rcall("SCARD", KEYS[4] .. ":dependencies") ~= 0 then - local errorCode = removeLock(KEYS[4], stalledKey, ARGV[1], ARGV[4]) + if rcall("SCARD", jobKey .. ":dependencies") ~= 0 then + local errorCode = removeLock(jobKey, stalledKey, ARGV[1], ARGV[4]) if errorCode < 0 then return errorCode end - return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3]) + return moveToWaitingChildren(KEYS[2], KEYS[3], KEYS[6], jobKey, ARGV[4], ARGV[3]) end return 1 diff --git a/src/commands/promote-9.lua b/src/commands/promote-10.lua similarity index 55% rename from src/commands/promote-9.lua rename to src/commands/promote-10.lua index 2143e52aa0..fc0558635f 100644 --- a/src/commands/promote-9.lua +++ b/src/commands/promote-10.lua @@ -2,15 +2,16 @@ Promotes a job that is currently "delayed" to the "waiting" state Input: - KEYS[1] 'delayed' - KEYS[2] 'wait' - KEYS[3] 'paused' - KEYS[4] 'meta' - KEYS[5] 'prioritized' - KEYS[6] 'active' - KEYS[7] 'pc' priority counter - KEYS[8] 'event stream' - KEYS[9] 'marker' + KEYS[1] 'delayed' + KEYS[2] 'wait' + KEYS[3] 'paused' + KEYS[4] 'meta' + KEYS[5] 'prioritized' + KEYS[6] 'active' + KEYS[7] 'pc' priority counter + KEYS[8] 'event stream' + KEYS[9] 'pending' + KEYS[10] 'marker' ARGV[1] queue.toKey('') ARGV[2] jobId @@ -32,22 +33,28 @@ local jobId = ARGV[2] if rcall("ZREM", KEYS[1], jobId) == 1 then local jobKey = ARGV[1] .. jobId - local priority = tonumber(rcall("HGET", jobKey, "priority")) or 0 + local jobAttributes = rcall("HMGET", jobKey, "priority", "pen") + local priority = tonumber(jobAttributes[1]) or 0 local metaKey = KEYS[4] - local markerKey = KEYS[9] + local markerKey = KEYS[10] - -- Remove delayed "marker" from the wait list if there is any. + -- TODO: Remove delayed "marker" from the wait list if there is any. -- Since we are adding a job we do not need the marker anymore. -- Markers in waitlist DEPRECATED in v5: Remove in v6. local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[6], KEYS[2], KEYS[3]) local marker = rcall("LINDEX", target, 0) if marker and string.sub(marker, 1, 2) == "0:" then rcall("LPOP", target) end - if priority == 0 then - -- LIFO or FIFO - addJobInTargetList(target, markerKey, "LPUSH", isPausedOrMaxed, jobId) + if jobAttributes[2] then + rcall("ZREM", KEYS[9], jobId) + addJobInTargetList(target, markerKey, "RPUSH", isPausedOrMaxed, jobId) else - addJobWithPriority(markerKey, KEYS[5], priority, jobId, KEYS[7], isPausedOrMaxed) + if priority == 0 then + -- LIFO or FIFO + addJobInTargetList(target, markerKey, "LPUSH", isPausedOrMaxed, jobId) + else + addJobWithPriority(markerKey, KEYS[5], priority, jobId, KEYS[7], isPausedOrMaxed) + end end -- Emit waiting event (wait..ing@token) diff --git a/src/commands/reprocessJob-8.lua b/src/commands/reprocessJob-9.lua similarity index 95% rename from src/commands/reprocessJob-8.lua rename to src/commands/reprocessJob-9.lua index 300ab6a1e8..e08a292f96 100644 --- a/src/commands/reprocessJob-8.lua +++ b/src/commands/reprocessJob-9.lua @@ -10,6 +10,7 @@ KEYS[6] paused key KEYS[7] active key KEYS[8] marker key + KEYS[9] pending key ARGV[1] job.id ARGV[2] (job.opts.lifo ? 'R' : 'L') + 'PUSH' @@ -33,7 +34,7 @@ if rcall("EXISTS", KEYS[1]) == 1 then if (rcall("ZREM", KEYS[3], jobId) == 1) then rcall("HDEL", KEYS[1], "finishedOn", "processedOn", ARGV[3]) - local target, isPausedOrMaxed = getTargetQueueList(KEYS[5], KEYS[7], KEYS[4], KEYS[6]) + local target, isPausedOrMaxed = getTargetQueueList(KEYS[5], KEYS[7], KEYS[4], KEYS[6], KEYS[9]) addJobInTargetList(target, KEYS[8], ARGV[2], isPausedOrMaxed, jobId) local maxEvents = getOrSetMaxEvents(KEYS[5]) diff --git a/src/commands/retryJob-11.lua b/src/commands/retryJob-12.lua similarity index 74% rename from src/commands/retryJob-11.lua rename to src/commands/retryJob-12.lua index 33d1f7a85a..3c01d06b20 100644 --- a/src/commands/retryJob-11.lua +++ b/src/commands/retryJob-12.lua @@ -13,6 +13,7 @@ KEYS[9] 'pc' priority counter KEYS[10] 'marker' KEYS[11] 'stalled' + KEYS[12] 'pending' ARGV[1] key prefix ARGV[2] timestamp @@ -40,12 +41,12 @@ local rcall = redis.call --- @include "includes/removeLock" --- @include "includes/isQueuePausedOrMaxed" -local target, isPausedOrMaxed = getTargetQueueList(KEYS[5], KEYS[1], KEYS[2], KEYS[3]) +local target, isPausedOrMaxed = getTargetQueueList(KEYS[5], KEYS[1], KEYS[2], KEYS[3], KEYS[12]) local markerKey = KEYS[10] -- Check if there are delayed jobs that we can move to wait. -- test example: when there are delayed jobs between retries -promoteDelayedJobs(KEYS[7], markerKey, target, KEYS[8], KEYS[6], ARGV[1], ARGV[2], KEYS[9], isPausedOrMaxed) +promoteDelayedJobs(KEYS[7], markerKey, target, KEYS[8], KEYS[6], ARGV[1], ARGV[2], KEYS[9], isPausedOrMaxed, KEYS[12]) if rcall("EXISTS", KEYS[4]) == 1 then local errorCode = removeLock(KEYS[4], KEYS[11], ARGV[5], ARGV[4]) @@ -56,16 +57,21 @@ if rcall("EXISTS", KEYS[4]) == 1 then local numRemovedElements = rcall("LREM", KEYS[1], -1, ARGV[4]) if (numRemovedElements < 1) then return -3 end - local priority = tonumber(rcall("HGET", KEYS[4], "priority")) or 0 + local jobAttributes = rcall("HMGET", KEYS[4], "priority", "pen") + local priority = tonumber(jobAttributes[1]) or 0 --need to re-evaluate after removing job from active - isPausedOrMaxed = isQueuePausedOrMaxed(KEYS[5], KEYS[1]) + isPausedOrMaxed = isQueuePausedOrMaxed(KEYS[5], KEYS[1], KEYS[12]) - -- Standard or priority add - if priority == 0 then - addJobInTargetList(target, markerKey, ARGV[3], isPausedOrMaxed, ARGV[4]) + if jobAttributes[2] then + addJobInTargetList(target, markerKey, "RPUSH", isPausedOrMaxed, ARGV[4]) else - addJobWithPriority(markerKey, KEYS[8], priority, ARGV[4], KEYS[9], isPausedOrMaxed) + -- Standard or priority add + if priority == 0 then + addJobInTargetList(target, markerKey, ARGV[3], isPausedOrMaxed, ARGV[4]) + else + addJobWithPriority(markerKey, KEYS[8], priority, ARGV[4], KEYS[9], isPausedOrMaxed) + end end rcall("HINCRBY", KEYS[4], "atm", 1) diff --git a/src/types/job-options.ts b/src/types/job-options.ts index 4b0eea7b78..1c20a0b46d 100644 --- a/src/types/job-options.ts +++ b/src/types/job-options.ts @@ -16,6 +16,11 @@ export type JobsOptions = BaseJobOptions & { */ ignoreDependencyOnFailure?: boolean; + /** + * Consider job as pending since it's move to active for the first time. + */ + pending?: boolean; + /** * If true, removes the job from its parent dependencies when it fails after all attempts. */ @@ -46,6 +51,11 @@ export type RedisJobOptions = BaseJobOptions & { */ kl?: number; + /** + * Consider job as pending since it's move to active for the first time. + */ + pen?: string; + /** * If true, removes the job from its parent dependencies when it fails after all attempts. */ diff --git a/tests/test_delay.ts b/tests/test_delay.ts index fe670e7690..fa13ef0dc5 100644 --- a/tests/test_delay.ts +++ b/tests/test_delay.ts @@ -454,6 +454,59 @@ describe('Delayed jobs', function () { await worker.close(); }); + describe('when pending option is provided', function () { + it('should process delayed jobs waiting to be finished in correct order', async function () { + this.timeout(4000); + const numJobs = 12; + + const worker = new Worker(queueName, async (job: Job) => {}, { + autorun: false, + connection, + prefix, + }); + + worker.on('failed', function (job, err) {}); + + const orderList: number[] = []; + let count = 0; + const completed = new Promise((resolve, reject) => { + worker.on('completed', async function (job) { + try { + count++; + orderList.push(job.data.order as number); + if (count == numJobs) { + resolve(); + } + } catch (err) { + reject(err); + } + }); + }); + + const jobs = Array.from(Array(numJobs).keys()).map(index => ({ + name: 'test', + data: { order: numJobs - index }, + opts: { + pending: true, + delay: (numJobs - index) * 150, + attempts: 1, + backoff: { type: 'fixed', delay: 200 }, + }, + })); + const expectedOrder = Array.from(Array(numJobs).keys()).map( + index => index + 1, + ); + + await queue.addBulk(jobs); + worker.run(); + await completed; + + expect(orderList).to.eql(expectedOrder); + + await worker.close(); + }); + }); + describe('when failed jobs are retried and moved to delayed', function () { it('processes jobs without getting stuck', async () => { const countJobs = 2;