Skip to content

Commit

Permalink
refactor: remove paused key on moveToActive
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Sep 13, 2024
1 parent 87447a8 commit c3acbb9
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 29 deletions.
12 changes: 6 additions & 6 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
"getState": self.redisClient.register_script(self.getScript("getState-8.lua")),
"getStateV2": self.redisClient.register_script(self.getScript("getStateV2-8.lua")),
"isJobInList": self.redisClient.register_script(self.getScript("isJobInList-1.lua")),
"moveStalledJobsToWait": self.redisClient.register_script(self.getScript("moveStalledJobsToWait-9.lua")),
"moveToActive": self.redisClient.register_script(self.getScript("moveToActive-11.lua")),
"moveStalledJobsToWait": self.redisClient.register_script(self.getScript("moveStalledJobsToWait-8.lua")),
"moveToActive": self.redisClient.register_script(self.getScript("moveToActive-10.lua")),
"moveToDelayed": self.redisClient.register_script(self.getScript("moveToDelayed-8.lua")),
"moveToFinished": self.redisClient.register_script(self.getScript("moveToFinished-14.lua")),
"moveToWaitingChildren": self.redisClient.register_script(self.getScript("moveToWaitingChildren-5.lua")),
Expand All @@ -62,7 +62,7 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
"removeJob": self.redisClient.register_script(self.getScript("removeJob-2.lua")),
"reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-7.lua")),
"retryJob": self.redisClient.register_script(self.getScript("retryJob-10.lua")),
"moveJobsToWait": self.redisClient.register_script(self.getScript("moveJobsToWait-8.lua")),
"moveJobsToWait": self.redisClient.register_script(self.getScript("moveJobsToWait-7.lua")),
"saveStacktrace": self.redisClient.register_script(self.getScript("saveStacktrace-1.lua")),
"updateData": self.redisClient.register_script(self.getScript("updateData-1.lua")),
"updateProgress": self.redisClient.register_script(self.getScript("updateProgress-3.lua")),
Expand Down Expand Up @@ -446,7 +446,7 @@ async def obliterate(self, count: int, force: bool = False):

def moveJobsToWaitArgs(self, state: str, count: int, timestamp: int) -> int:
keys = self.getKeys(
['', 'events', state, 'wait', 'paused', 'meta', 'active', 'marker'])
['', 'events', state, 'wait', 'meta', 'active', 'marker'])

args = [count or 1000, timestamp or round(time.time()*1000), state]
return (keys, args)
Expand Down Expand Up @@ -479,7 +479,7 @@ async def moveToActive(self, token: str, opts: dict) -> list[Any]:
limiter = opts.get("limiter", None)

keys = self.getKeys(['wait', 'active', 'prioritized', 'events',
'stalled', 'limiter', 'delayed', 'paused', 'meta', 'pc', 'marker'])
'stalled', 'limiter', 'delayed', 'meta', 'pc', 'marker'])
packedOpts = msgpack.packb(
{"token": token, "lockDuration": lockDuration, "limiter": limiter}, use_bin_type=True)
args = [self.keys[''], timestamp, packedOpts]
Expand Down Expand Up @@ -576,7 +576,7 @@ def extendLock(self, jobId: str, token: str, duration: int, client: Redis = None

def moveStalledJobsToWait(self, maxStalledCount: int, stalledInterval: int):
keys = self.getKeys(['stalled', 'wait', 'active', 'failed',
'stalled-check', 'meta', 'paused', 'marker', 'events'])
'stalled-check', 'meta', 'marker', 'events'])
args = [maxStalledCount, self.keys[''], round(
time.time() * 1000), stalledInterval]
return self.commands["moveStalledJobsToWait"](keys, args)
Expand Down
3 changes: 0 additions & 3 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1105,7 +1105,6 @@ export class Scripts {
this.queue.keys.events,
this.queue.toKey(state),
this.queue.toKey('wait'),
this.queue.toKey('paused'),
this.queue.keys.meta,
this.queue.keys.active,
this.queue.keys.marker,
Expand Down Expand Up @@ -1199,7 +1198,6 @@ export class Scripts {
queueKeys.stalled,
queueKeys.limiter,
queueKeys.delayed,
queueKeys.paused,
queueKeys.meta,
queueKeys.pc,
queueKeys.marker,
Expand Down Expand Up @@ -1259,7 +1257,6 @@ export class Scripts {
this.queue.keys.failed,
this.queue.keys['stalled-check'],
this.queue.keys.meta,
this.queue.keys.paused,
this.queue.keys.marker,
this.queue.keys.events,
];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@
KEYS[2] events stream
KEYS[3] state key (failed, completed, delayed)
KEYS[4] 'wait'
KEYS[5] 'paused' // TODO remove
KEYS[6] 'meta'
KEYS[7] 'active'
KEYS[8] 'marker'
KEYS[5] 'meta'
KEYS[6] 'active'
KEYS[7] 'marker'
ARGV[1] count
ARGV[2] timestamp
Expand All @@ -32,8 +31,8 @@ local rcall = redis.call;
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/isQueuePausedOrMaxed"

local metaKey = KEYS[6]
local isPausedOrMaxed = isQueuePausedOrMaxed(metaKey, KEYS[7])
local metaKey = KEYS[5]
local isPausedOrMaxed = isQueuePausedOrMaxed(metaKey, KEYS[6])

local jobs = rcall('ZRANGEBYSCORE', KEYS[3], 0, timestamp, 'LIMIT', 0, maxCount)
if (#jobs > 0) then
Expand Down Expand Up @@ -63,7 +62,7 @@ if (#jobs > 0) then
rcall("LPUSH", KEYS[4], unpack(jobs, from, to))
end

addBaseMarkerIfNeeded(KEYS[8], isPausedOrMaxed)
addBaseMarkerIfNeeded(KEYS[7], isPausedOrMaxed)
end

maxCount = maxCount - #jobs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@
KEYS[4] 'failed', (ZSET)
KEYS[5] 'stalled-check', (KEY)
KEYS[6] 'meta', (KEY)
KEYS[7] 'paused', (LIST) // TODO remove
KEYS[8] 'marker'
KEYS[9] 'event stream' (STREAM)
KEYS[7] 'marker'
KEYS[8] 'event stream' (STREAM)
ARGV[1] Max stalled job count
ARGV[2] queue.toKey('')
Expand Down Expand Up @@ -40,8 +39,8 @@ local activeKey = KEYS[3]
local failedKey = KEYS[4]
local stalledCheckKey = KEYS[5]
local metaKey = KEYS[6]
local markerKey = KEYS[8]
local eventStreamKey = KEYS[9]
local markerKey = KEYS[7]
local eventStreamKey = KEYS[8]
local maxStalledJobCount = ARGV[1]
local queueKeyPrefix = ARGV[2]
local timestamp = ARGV[3]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@
KEYS[7] delayed key
-- Delayed jobs
KEYS[8] paused key // TODO remove
KEYS[9] meta key
KEYS[10] pc priority counter
KEYS[8] meta key
KEYS[9] pc priority counter
-- Marker
KEYS[11] marker key
KEYS[10] marker key
-- Arguments
ARGV[1] key prefix
Expand All @@ -50,12 +49,12 @@ local opts = cmsgpack.unpack(ARGV[3])
--- @include "includes/prepareJobForProcessing"
--- @include "includes/promoteDelayedJobs"

local isPausedOrMaxed = isQueuePausedOrMaxed(KEYS[9], activeKey)
local isPausedOrMaxed = isQueuePausedOrMaxed(KEYS[8], activeKey)

-- Check if there are delayed jobs that we can move to wait.
local markerKey = KEYS[11]
local markerKey = KEYS[10]
promoteDelayedJobs(delayedKey, markerKey, KEYS[1], KEYS[3], eventStreamKey, ARGV[1],
ARGV[2], KEYS[10], isPausedOrMaxed)
ARGV[2], KEYS[9], isPausedOrMaxed)

local maxJobs = tonumber(opts['limiter'] and opts['limiter']['max'])
local expireTime = getRateLimitTTL(maxJobs, rateLimiterKey)
Expand All @@ -73,7 +72,7 @@ if jobId then
return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2],
maxJobs, opts)
else
jobId = moveJobFromPriorityToActive(KEYS[3], activeKey, KEYS[10])
jobId = moveJobFromPriorityToActive(KEYS[3], activeKey, KEYS[9])
if jobId then
return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2],
maxJobs, opts)
Expand Down

0 comments on commit c3acbb9

Please sign in to comment.