Skip to content

Commit

Permalink
fix(repeat): replace delayed job when updating repeat key
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Sep 3, 2024
1 parent 6410673 commit 88029bb
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 57 deletions.
4 changes: 2 additions & 2 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,11 @@ export class Queue<
}
}

return (await this.repeat).addNextRepeatableJob<
return (await this.repeat).updateRepeatableJob<
DataType,
ResultType,
NameType
>(name, data, { ...this.jobsOpts, ...opts }, true);
>(name, data, { ...this.jobsOpts, ...opts }, { override: true });
} else {
const jobId = opts?.jobId;

Expand Down
70 changes: 38 additions & 32 deletions src/classes/repeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,44 +29,41 @@ export class Repeat extends QueueBase {
(opts.settings && opts.settings.repeatKeyHashAlgorithm) || 'md5';
}

async addNextRepeatableJob<T = any, R = any, N extends string = string>(
async updateRepeatableJob<T = any, R = any, N extends string = string>(
name: N,
data: T,
opts: JobsOptions,
skipCheckExists?: boolean,
{ override }: { override: boolean },
): Promise<Job<T, R, N> | undefined> {
// HACK: This is a temporary fix to enable easy migration from bullmq <3.0.0
// to >= 3.0.0. TODO: It should be removed when moving to 4.x.
// Backwards compatibility for repeatable jobs for versions <= 3.0.0
const repeatOpts: RepeatOptions & { cron?: string } = { ...opts.repeat };
repeatOpts.pattern ??= repeatOpts.cron;
delete repeatOpts.cron;

const prevMillis = opts.prevMillis || 0;
const currentCount = repeatOpts.count ? repeatOpts.count + 1 : 1;

// Check if we reached the limit of the repeatable job's iterations
const iterationCount = repeatOpts.count ? repeatOpts.count + 1 : 1;
if (
typeof repeatOpts.limit !== 'undefined' &&
currentCount > repeatOpts.limit
iterationCount > repeatOpts.limit
) {
return;
}

// Check if we reached the end date of the repeatable job
let now = Date.now();

if (
!(typeof repeatOpts.endDate === undefined) &&
now > new Date(repeatOpts.endDate!).getTime()
) {
const { endDate } = repeatOpts;
if (!(typeof endDate === undefined) && now > new Date(endDate!).getTime()) {
return;
}

const prevMillis = opts.prevMillis || 0;
now = prevMillis < now ? now : prevMillis;

const nextMillis = await this.repeatStrategy(now, repeatOpts, name);
const pattern = repeatOpts.pattern;
const { every, pattern } = repeatOpts;

const hasImmediately = Boolean(
(repeatOpts.every || pattern) && repeatOpts.immediately,
(every || pattern) && repeatOpts.immediately,
);
const offset = hasImmediately ? now - nextMillis : undefined;
if (nextMillis) {
Expand All @@ -77,32 +74,41 @@ export class Repeat extends QueueBase {

const repeatConcatOptions = getRepeatConcatOptions(name, repeatOpts);

const repeatJobKey = await this.scripts.addRepeatableJob(
opts.repeat.key ?? this.hash(repeatConcatOptions),
nextMillis,
{
name,
endDate: repeatOpts.endDate
? new Date(repeatOpts.endDate).getTime()
: undefined,
tz: repeatOpts.tz,
pattern: repeatOpts.pattern,
every: repeatOpts.every,
},
repeatConcatOptions,
skipCheckExists,
);
let repeatJobKey;
if (override) {
repeatJobKey = await this.scripts.addRepeatableJob(
opts.repeat.key ?? this.hash(repeatConcatOptions),
nextMillis,
{
name,
endDate: endDate ? new Date(endDate).getTime() : undefined,
tz: repeatOpts.tz,
pattern,
every,
},
repeatConcatOptions,
);
} else {
const client = await this.client;

const { immediately, ...filteredRepeatOpts } = repeatOpts;
repeatJobKey = await this.scripts.updateRepeatableJobMillis(
client,
opts.repeat.key ?? this.hash(repeatConcatOptions),
nextMillis,
repeatConcatOptions,
);
}

if (repeatJobKey) {
const { immediately, ...filteredRepeatOpts } = repeatOpts;

return this.createNextJob<T, R, N>(
name,
nextMillis,
repeatJobKey,
{ ...opts, repeat: { offset, ...filteredRepeatOpts } },
data,
currentCount,
iterationCount,
hasImmediately,
);
}
Expand Down
26 changes: 21 additions & 5 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -266,16 +266,19 @@ export class Scripts {
nextMillis: number,
opts: RepeatableOptions,
legacyCustomKey: string,
skipCheckExists: boolean,
): (string | number | Buffer)[] {
const keys: (string | number | Buffer)[] = [this.queue.keys.repeat];
const queueKeys = this.queue.keys;
const keys: (string | number | Buffer)[] = [
queueKeys.repeat,
queueKeys.delayed,
];

const args = [
nextMillis,
pack(opts),
legacyCustomKey,
customKey,
skipCheckExists ? '1' : '0',
queueKeys[''],
];

return keys.concat(args);
Expand All @@ -286,7 +289,6 @@ export class Scripts {
nextMillis: number,
opts: RepeatableOptions,
legacyCustomKey: string,
skipCheckExists: boolean,
): Promise<string> {
const client = await this.queue.client;

Expand All @@ -295,12 +297,26 @@ export class Scripts {
nextMillis,
opts,
legacyCustomKey,
skipCheckExists,
);

return (<any>client).addRepeatableJob(args);
}

async updateRepeatableJobMillis(
client: RedisClient,
customKey: string,
nextMillis: number,
legacyCustomKey: string,
): Promise<string> {
const args = [
this.queue.keys.repeat,
nextMillis,
customKey,
legacyCustomKey,
];
return (<any>client).updateRepeatableJobMillis(args);
}

private removeRepeatableArgs(
legacyRepeatJobId: string,
repeatConcatOptions: string,
Expand Down
4 changes: 3 additions & 1 deletion src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,9 @@ will never work with more accuracy than 1ms. */
job.token = token;
if (job.opts.repeat) {
const repeat = await this.repeat;
await repeat.addNextRepeatableJob(job.name, job.data, job.opts);
await repeat.updateRepeatableJob(job.name, job.data, job.opts, {
override: false,
});
}
return job;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
Input:
KEYS[1] 'repeat' key
KEYS[2] 'delayed' key
ARGV[1] next milliseconds
ARGV[2] msgpacked options
[1] name
Expand All @@ -13,19 +14,25 @@
[5] every?
ARGV[3] legacy custom key TODO: remove this logic in next breaking change
ARGV[4] custom key
ARGV[5] skipCheckExists
ARGV[5] prefix key
Output:
repeatableKey - OK
]]
local rcall = redis.call
local repeatKey = KEYS[1]
local nextMilli = ARGV[1]
local delayedKey = KEYS[2]

local nextMillis = ARGV[1]
local legacyCustomKey = ARGV[3]
local customKey = ARGV[4]
local prefixKey = ARGV[5]

-- Includes
--- @include "includes/removeJob"

local function storeRepeatableJob(repeatKey, customKey, nextMilli, rawOpts)
rcall("ZADD", repeatKey, nextMilli, customKey)
local function storeRepeatableJob(repeatKey, customKey, nextMillis, rawOpts)
rcall("ZADD", repeatKey, nextMillis, customKey)
local opts = cmsgpack.unpack(rawOpts)

local optionalValues = {}
Expand Down Expand Up @@ -55,17 +62,21 @@ local function storeRepeatableJob(repeatKey, customKey, nextMilli, rawOpts)
return customKey
end

local legacyRepeatableJobExists = rcall("ZSCORE", repeatKey, legacyCustomKey)
-- If we are overriding a repeatable job we must delete the delayed job for
-- the next iteration.
local prevMillis = rcall("ZSCORE", repeatKey, customKey)
if prevMillis ~= false then
local delayedJobId = "repeat:" .. customKey .. ":" .. prevMillis

if ARGV[5] == '0' or legacyRepeatableJobExists ~= false then
if legacyRepeatableJobExists ~= false then
rcall("ZADD", repeatKey, nextMilli, legacyCustomKey)
return legacyCustomKey
elseif rcall("ZSCORE", repeatKey, customKey) ~= false then
return storeRepeatableJob(repeatKey, customKey, nextMilli, ARGV[2])
if rcall("ZSCORE", delayedKey, delayedJobId) ~= false then
removeJob(delayedJobId, true, prefixKey, true --[[remove debounce key]])
rcall("ZREM", delayedKey, delayedJobId)
end
else
return storeRepeatableJob(repeatKey, customKey, nextMilli, ARGV[2])
end

return ''
-- Keep backwards compatibility with old repeatable jobs (<= 3.0.0)
if rcall("ZSCORE", repeatKey, legacyCustomKey) ~= false then
return storeRepeatableJob(repeatKey, legacyCustomKey, nextMillis, ARGV[2])
end

return storeRepeatableJob(repeatKey, customKey, nextMillis, ARGV[2])
28 changes: 28 additions & 0 deletions src/commands/updateRepeatableJobMillis-1.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
--[[
Adds a repeatable job
Input:
KEYS[1] 'repeat' key
ARGV[1] next milliseconds
ARGV[2] custom key
ARGV[3] legacy custom key TODO: remove this logic in next breaking change
Output:
repeatableKey - OK
]]
local rcall = redis.call
local repeatKey = KEYS[1]
local nextMillis = ARGV[1]
local customKey = ARGV[2]
local legacyCustomKey = ARGV[3]

if rcall("ZSCORE", repeatKey, customKey) ~= false then
rcall("ZADD", repeatKey, nextMillis, customKey)
return customKey
elseif rcall("ZSCORE", repeatKey, legacyCustomKey) ~= false then
rcall("ZADD", repeatKey, nextMillis, legacyCustomKey)
return legacyCustomKey
end

return ''
49 changes: 47 additions & 2 deletions tests/test_repeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1508,6 +1508,51 @@ describe('repeat', function () {
await processing;
delayStub.restore();
});

it('should keep only one delayed job if adding a new repeatable job with the same key', async function () {
const date = new Date('2017-02-07 9:24:00');
const key = 'mykey';

this.clock.setSystemTime(date);

const nextTick = 2 * ONE_SECOND;

await queue.add(
'test',
{ foo: 'bar' },
{
repeat: {
every: 10_000,
key,
},
},
);

this.clock.tick(nextTick);

let jobs = await queue.getRepeatableJobs();
expect(jobs).to.have.length(1);

let delayedJobs = await queue.getDelayed();
expect(delayedJobs).to.have.length(1);

await queue.add(
'test2',
{ qux: 'baz' },
{
repeat: {
every: 35_160,
key,
},
},
);

jobs = await queue.getRepeatableJobs();
expect(jobs).to.have.length(1);

delayedJobs = await queue.getDelayed();
expect(delayedJobs).to.have.length(1);
});
});

// This test is flaky and too complex we need something simpler that tests the same thing
Expand All @@ -1518,7 +1563,7 @@ describe('repeat', function () {
const jobId = 'xxxx';
const date = new Date('2017-02-07 9:24:00');
const nextTick = 2 * ONE_SECOND + 100;
const addNextRepeatableJob = repeat.addNextRepeatableJob;
const addNextRepeatableJob = repeat.updateRepeatableJob;
this.clock.setSystemTime(date);

const repeatOpts = { pattern: '*/2 * * * * *' };
Expand All @@ -1528,7 +1573,7 @@ describe('repeat', function () {
queueName,
async () => {
const repeatWorker = await worker.repeat;
(<unknown>repeatWorker.addNextRepeatableJob) = async (
(<unknown>repeatWorker.updateRepeatableJob) = async (
...args: [string, unknown, JobsOptions, boolean?]
) => {
// In order to simulate race condition
Expand Down

0 comments on commit 88029bb

Please sign in to comment.