Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support pending state #2687

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import type { QueueEvents } from './queue-events';
const logger = debuglog('bull');

const optsDecodeMap = {
pen: 'pending',
de: 'debounce',
fpof: 'failParentOnFailure',
idof: 'ignoreDependencyOnFailure',
Expand Down
1 change: 1 addition & 0 deletions src/classes/queue-keys.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export class QueueKeys {
[
'',
'active',
'pending',
'wait',
'waiting-children',
'paused',
Expand Down
20 changes: 19 additions & 1 deletion src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ export class Scripts {
undefined,
undefined,
undefined,
undefined,
];
}

Expand Down Expand Up @@ -111,6 +112,7 @@ export class Scripts {
queueKeys.active,
queueKeys.events,
queueKeys.pc,
queueKeys.pending,
];

keys.push(pack(args), job.data, encodedOpts);
Expand Down Expand Up @@ -153,6 +155,7 @@ export class Scripts {
queueKeys.active,
queueKeys.events,
queueKeys.marker,
queueKeys.pending,
];

keys.push(pack(args), job.data, encodedOpts);
Expand Down Expand Up @@ -467,6 +470,7 @@ export class Scripts {
keys[11] = this.queue.toKey(job.id ?? '');
keys[12] = metricsKey;
keys[13] = this.queue.keys.marker;
keys[14] = this.queue.keys.pending;

const keepJobs = this.getKeepJobs(shouldRemove, workerKeepJobs);

Expand Down Expand Up @@ -843,6 +847,7 @@ export class Scripts {
this.queue.keys.active,
this.queue.keys.pc,
this.queue.keys.marker,
this.queue.keys.pending,
];

return keys.concat([
Expand Down Expand Up @@ -870,6 +875,7 @@ export class Scripts {
queueKeys.events,
queueKeys.meta,
queueKeys.stalled,
queueKeys.pending,
];

return keys.concat([
Expand Down Expand Up @@ -907,6 +913,7 @@ export class Scripts {
'waiting-children',
jobId,
'stalled',
'pending',
].map(name => {
return this.queue.toKey(name);
});
Expand All @@ -921,7 +928,11 @@ export class Scripts {

isMaxedArgs(): string[] {
const queueKeys = this.queue.keys;
const keys: string[] = [queueKeys.meta, queueKeys.active];
const keys: string[] = [
queueKeys.meta,
queueKeys.active,
queueKeys.pending,
];

return keys;
}
Expand Down Expand Up @@ -1042,6 +1053,7 @@ export class Scripts {
this.queue.keys.pc,
this.queue.keys.marker,
this.queue.keys.stalled,
this.queue.keys.pending,
];

const pushCmd = (lifo ? 'R' : 'L') + 'PUSH';
Expand All @@ -1068,6 +1080,7 @@ export class Scripts {
this.queue.toKey('paused'),
this.queue.keys.meta,
this.queue.keys.active,
this.queue.keys.pending,
this.queue.keys.marker,
];

Expand Down Expand Up @@ -1124,6 +1137,7 @@ export class Scripts {
this.queue.keys.paused,
this.queue.keys.active,
this.queue.keys.marker,
this.queue.keys.pending,
];

const args = [
Expand Down Expand Up @@ -1164,6 +1178,7 @@ export class Scripts {
queueKeys.meta,
queueKeys.pc,
queueKeys.marker,
queueKeys.pending,
];

const args: (string | number | boolean | Buffer)[] = [
Expand Down Expand Up @@ -1196,6 +1211,7 @@ export class Scripts {
this.queue.keys.active,
this.queue.keys.pc,
this.queue.keys.events,
this.queue.keys.pending,
this.queue.keys.marker,
];

Expand All @@ -1222,6 +1238,7 @@ export class Scripts {
this.queue.keys['stalled-check'],
this.queue.keys.meta,
this.queue.keys.paused,
this.queue.keys.pending,
this.queue.keys.marker,
this.queue.keys.events,
];
Expand Down Expand Up @@ -1275,6 +1292,7 @@ export class Scripts {
this.queue.keys.limiter,
this.queue.keys.prioritized,
this.queue.keys.marker,
this.queue.keys.pending,
this.queue.keys.events,
];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
KEYS[6] 'active'
KEYS[7] events stream key
KEYS[8] 'pc' priority counter
KEYS[9] 'pending'

ARGV[1] msgpacked arguments array
[1] key prefix,
Expand Down Expand Up @@ -102,7 +103,7 @@ local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2],
repeatJobKey)

-- Add the job to the prioritized set
local isPausedOrMaxed = isQueuePausedOrMaxed(metaKey, activeKey)
local isPausedOrMaxed = isQueuePausedOrMaxed(metaKey, activeKey, KEYS[9])
addJobWithPriority( KEYS[1], priorityKey, priority, jobId, priorityCounterKey, isPausedOrMaxed)

-- Emit waiting event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
KEYS[6] 'active'
KEYS[7] events stream key
KEYS[8] marker key
KEYS[9] pending key

ARGV[1] msgpacked arguments array
[1] key prefix,
Expand Down Expand Up @@ -104,7 +105,7 @@ end
storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp,
parentKey, parentData, repeatJobKey)

local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[6], KEYS[1], KEYS[2])
local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[6], KEYS[1], KEYS[2], KEYS[9])

-- LIFO or FIFO
local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
KEYS[5] 'active'
KEYS[6] 'pc' priority counter
KEYS[7] 'marker'
KEYS[8] 'pending'

ARGV[1] priority value
ARGV[2] job key
Expand Down Expand Up @@ -46,11 +47,11 @@ end

if rcall("EXISTS", jobKey) == 1 then
local metaKey = KEYS[3]
local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[5], KEYS[1], KEYS[2])
local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[5], KEYS[1], KEYS[2], KEYS[8])
local prioritizedKey = KEYS[4]
local priorityCounterKey = KEYS[6]
local markerKey = KEYS[7]

-- Re-add with the new priority
if rcall("ZREM", KEYS[4], jobId) > 0 then
reAddJobWithNewPriority( prioritizedKey, markerKey, target,
Expand Down
9 changes: 9 additions & 0 deletions src/commands/includes/addPendingJobIfNeeded.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
--[[
Function to add job into pending state.
]]

local function addPendingJobIfNeeded(isPending, pendingKey, jobId, timestamp)
if isPending then
rcall("ZADD", pendingKey, timestamp, jobId)
end
end
5 changes: 3 additions & 2 deletions src/commands/includes/getTargetQueueList.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
(since an empty list and !EXISTS are not really the same).
]]

local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey)
local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey, pendingKey)
local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency")

if queueAttributes[1] then
return pausedKey, true
else
if queueAttributes[2] then
local activeCount = rcall("LLEN", activeKey)
if activeCount >= tonumber(queueAttributes[2]) then
local pendingCount = rcall("ZCARD", pendingKey)
if (activeCount + pendingCount) >= tonumber(queueAttributes[2]) then
return waitKey, true
else
return waitKey, false
Expand Down
5 changes: 3 additions & 2 deletions src/commands/includes/isQueueMaxed.lua
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
--[[
Function to check if queue is maxed or not.
]]
local function isQueueMaxed(queueMetaKey, activeKey)
local function isQueueMaxed(queueMetaKey, activeKey, pendingKey)
local maxConcurrency = rcall("HGET", queueMetaKey, "concurrency")

if maxConcurrency then
local activeCount = rcall("LLEN", activeKey)
if activeCount >= tonumber(maxConcurrency) then
local pendingCount = rcall("ZCARD", pendingKey)
if (activeCount + pendingCount) >= tonumber(maxConcurrency) then
return true
end
end
Expand Down
5 changes: 3 additions & 2 deletions src/commands/includes/isQueuePausedOrMaxed.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
(since an empty list and !EXISTS are not really the same).
]]

local function isQueuePausedOrMaxed(queueMetaKey, activeKey)
local function isQueuePausedOrMaxed(queueMetaKey, activeKey, pendingKey)
local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency")

if queueAttributes[1] then
return true
else
if queueAttributes[2] then
local activeCount = rcall("LLEN", activeKey)
return activeCount >= tonumber(queueAttributes[2])
local pendingCount = rcall("ZCARD", pendingKey)
return (activeCount + pendingCount) >= tonumber(queueAttributes[2])
end
end
return false
Expand Down
28 changes: 17 additions & 11 deletions src/commands/includes/moveParentToWaitIfNeeded.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ local function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey,
if rcall("SCARD", parentDependenciesKey) == 0 and isParentActive then
rcall("ZREM", parentQueueKey .. ":waiting-children", parentId)
local parentWaitKey = parentQueueKey .. ":wait"
local parentPendingKey = parentQueueKey .. ":pending"
local parentPausedKey = parentQueueKey .. ":paused"
local parentActiveKey = parentQueueKey .. ":active"
local parentMetaKey = parentQueueKey .. ":meta"

local parentMarkerKey = parentQueueKey .. ":marker"
local jobAttributes = rcall("HMGET", parentKey, "priority", "delay")
local jobAttributes = rcall("HMGET", parentKey, "priority", "delay", "pen")
local priority = tonumber(jobAttributes[1]) or 0
local delay = tonumber(jobAttributes[2]) or 0

Expand All @@ -35,17 +36,22 @@ local function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey,

addDelayMarkerIfNeeded(parentMarkerKey, parentDelayedKey)
else
if priority == 0 then
local parentTarget, isParentPausedOrMaxed =
getTargetQueueList(parentMetaKey, parentActiveKey, parentWaitKey,
parentPausedKey)
addJobInTargetList(parentTarget, parentMarkerKey, "RPUSH", isParentPausedOrMaxed,
parentId)
if jobAttributes[3] then
rcall("ZREM", parentPendingKey, parentId)
addJobInTargetList(parentTarget, parentMarkerKey, "RPUSH", isParentPausedOrMaxed, parentId)
else
local isPausedOrMaxed = isQueuePausedOrMaxed(parentMetaKey, parentActiveKey)
addJobWithPriority(parentMarkerKey,
parentQueueKey .. ":prioritized", priority,
parentId, parentQueueKey .. ":pc", isPausedOrMaxed)
if priority == 0 then
local parentTarget, isParentPausedOrMaxed =
getTargetQueueList(parentMetaKey, parentActiveKey, parentWaitKey,
parentPausedKey, parentPendingKey)
addJobInTargetList(parentTarget, parentMarkerKey, "RPUSH", isParentPausedOrMaxed,
parentId)
else
local isPausedOrMaxed = isQueuePausedOrMaxed(parentMetaKey, parentActiveKey, parentPendingKey)
addJobWithPriority(parentMarkerKey,
parentQueueKey .. ":prioritized", priority,
parentId, parentQueueKey .. ":pc", isPausedOrMaxed)
end
end

rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting",
Expand Down
20 changes: 13 additions & 7 deletions src/commands/includes/promoteDelayedJobs.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,29 @@

-- Try to get as much as 1000 jobs at once
local function promoteDelayedJobs(delayedKey, markerKey, targetKey, prioritizedKey,
eventStreamKey, prefix, timestamp, priorityCounterKey, isPaused)
eventStreamKey, prefix, timestamp, priorityCounterKey, isPaused, pendingKey)
local jobs = rcall("ZRANGEBYSCORE", delayedKey, 0, (timestamp + 1) * 0x1000 - 1, "LIMIT", 0, 1000)

if (#jobs > 0) then
rcall("ZREM", delayedKey, unpack(jobs))

for _, jobId in ipairs(jobs) do
local jobKey = prefix .. jobId
local jobAttributes = rcall("HMGET", jobKey, "priority", "pen")
local priority =
tonumber(rcall("HGET", jobKey, "priority")) or 0
tonumber(jobAttributes[1]) or 0

if priority == 0 then
-- LIFO or FIFO
addJobInTargetList(targetKey, markerKey, "LPUSH", isPaused, jobId)
if jobAttributes[2] then
rcall("ZREM", pendingKey, jobId)
addJobInTargetList(targetKey, markerKey, "RPUSH", isPaused, jobId)
else
addJobWithPriority(markerKey, prioritizedKey, priority,
jobId, priorityCounterKey, isPaused)
if priority == 0 then
-- LIFO or FIFO
addJobInTargetList(targetKey, markerKey, "LPUSH", isPaused, jobId)
else
addJobWithPriority(markerKey, prioritizedKey, priority,
jobId, priorityCounterKey, isPaused)
end
end

-- Emit waiting event
Expand Down
2 changes: 1 addition & 1 deletion src/commands/includes/removeParentDependencyKey.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

local function moveParentToWait(parentPrefix, parentId, emitEvent)
local parentTarget, isPausedOrMaxed = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "active",
parentPrefix .. "wait", parentPrefix .. "paused")
parentPrefix .. "wait", parentPrefix .. "paused", parentPrefix .. "pending")
addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPausedOrMaxed, parentId)

if emitEvent then
Expand Down
8 changes: 7 additions & 1 deletion src/commands/includes/storeJob.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp,
local delay = opts['delay'] or 0
local priority = opts['priority'] or 0
local debounceId = opts['de'] and opts['de']['id']

local pending = opts['pen']

local optionalValues = {}
if parentKey ~= nil then
table.insert(optionalValues, "parentKey")
Expand All @@ -26,6 +27,11 @@ local function storeJob(eventsKey, jobIdKey, jobId, name, data, opts, timestamp,
table.insert(optionalValues, debounceId)
end

if pending then
table.insert(optionalValues, "pen")
table.insert(optionalValues, 1)
end

rcall("HMSET", jobIdKey, "name", name, "data", data, "opts", jsonOpts,
"timestamp", timestamp, "delay", delay, "priority", priority,
unpack(optionalValues))
Expand Down
3 changes: 2 additions & 1 deletion src/commands/isMaxed-2.lua → src/commands/isMaxed-3.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
Input:
KEYS[1] meta key
KEYS[2] active key
KEYS[3] pending key

Output:
1 if element found in the list.
Expand All @@ -14,4 +15,4 @@ local rcall = redis.call
-- Includes
--- @include "includes/isQueueMaxed"

return isQueueMaxed(KEYS[1], KEYS[2])
return isQueueMaxed(KEYS[1], KEYS[2], KEYS[3])
Loading
Loading