Skip to content

Commit

Permalink
perf(redis-connection): check redis version greater or equal than v6 …
Browse files Browse the repository at this point in the history
…only once (#2252)
  • Loading branch information
roggervalf committed Oct 29, 2023
1 parent 14176c1 commit a09b15a
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 8 deletions.
14 changes: 13 additions & 1 deletion src/classes/redis-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ const deprecationMessage = [
'On the next versions having this settings will throw an exception',
].join(' ');

interface RedisCapabilities {
canDoubleTimeout: boolean;
}

export interface RawCommand {
content: string;
name: string;
Expand All @@ -33,11 +37,14 @@ export class RedisConnection extends EventEmitter {
static recommendedMinimumVersion = '6.2.0';

closing: boolean;
capabilities: RedisCapabilities = {
canDoubleTimeout: false,
};

protected _client: RedisClient;

private readonly opts: RedisOptions;
private initializing: Promise<RedisClient>;
private readonly initializing: Promise<RedisClient>;

private version: string;
private skipVersionCheck: boolean;
Expand Down Expand Up @@ -207,6 +214,11 @@ export class RedisConnection extends EventEmitter {
);
}
}

this.capabilities = {
canDoubleTimeout: !isRedisVersionLowerThan(this.version, '6.0.0'),
};

return this._client;
}

Expand Down
11 changes: 4 additions & 7 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ export class Worker<
private stalledCheckTimer: NodeJS.Timeout;
private waiting: Promise<string> | null = null;
private _repeat: Repeat;

protected paused: Promise<void>;
protected processFn: Processor<DataType, ResultType, NameType>;
protected running = false;
Expand Down Expand Up @@ -540,12 +540,9 @@ export class Worker<
);

// Only Redis v6.0.0 and above supports doubles as block time
blockTimeout = isRedisVersionLowerThan(
this.blockingConnection.redisVersion,
'6.0.0',
)
? Math.ceil(blockTimeout)
: blockTimeout;
blockTimeout = this.blockingConnection.capabilities.canDoubleTimeout
? blockTimeout
: Math.ceil(blockTimeout);

// We restrict the maximum block timeout to 10 second to avoid
// blocking the connection for too long in the case of reconnections
Expand Down
38 changes: 38 additions & 0 deletions tests/test_repeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1053,6 +1053,44 @@ describe('repeat', function () {
});
});

describe('when repeatable job is promoted', function () {
it('keeps one repeatable and one delayed after being processed', async function () {
const options = {
repeat: {
pattern: '0 * 1 * *',
},
};

const worker = new Worker(queueName, async () => {}, { connection });

const completing = new Promise<void>(resolve => {
worker.on('completed', () => {
resolve();
});
});

const repeatableJob = await queue.add('test', { foo: 'bar' }, options);
const delayedCount = await queue.getDelayedCount();
expect(delayedCount).to.be.equal(1);

await repeatableJob.promote();
await completing;

const delayedCount2 = await queue.getDelayedCount();
expect(delayedCount2).to.be.equal(1);

const configs = await repeat.getRepeatableJobs(0, -1, true);

expect(delayedCount).to.be.equal(1);

const count = await queue.count();

expect(count).to.be.equal(1);
expect(configs).to.have.length(1);
await worker.close();
});
});

it('should allow removing a named repeatable job', async function () {
const numJobs = 3;
const date = new Date('2017-02-07 9:24:00');
Expand Down

0 comments on commit a09b15a

Please sign in to comment.