Skip to content

Commit ce9c42b

Browse files
author
Matt Willian
committed
feat: improve redis cluster handling
1 parent 215d0a7 commit ce9c42b

File tree

4 files changed

+112
-319
lines changed

4 files changed

+112
-319
lines changed

lib/queue-factory.ts

Lines changed: 75 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { Redis, Cluster, RedisOptions } from "ioredis";
2+
import { ConnectionOptions as TlsConnectionOptions } from "tls";
23

34
import { QueueType, getQueueType, redisOptsFromUrl } from "./utils";
45
import { Queue } from "bullmq";
@@ -16,6 +17,13 @@ const maxTime = 40000;
1617
// We keep a redis client that we can reuse for all the queues.
1718
let redisClients: Record<string, Redis | Cluster> = {} as any;
1819

20+
type NestedRedisOptions = Partial<RedisOptions> & {
21+
tls?: TlsConnectionOptions;
22+
};
23+
type RedisOptionsWithNested = RedisOptions & {
24+
redisOptions?: NestedRedisOptions;
25+
};
26+
1927
export interface FoundQueue {
2028
prefix: string;
2129
name: string;
@@ -167,21 +175,52 @@ export function getRedisClient(
167175

168176
if (!redisClients[key]) {
169177
if (clusterNodes && clusterNodes.length) {
170-
const { username, password } = redisOptsFromUrl(clusterNodes[0]);
178+
const { username: nodeUsername, password: nodePassword } =
179+
redisOptsFromUrl(clusterNodes[0]);
180+
const {
181+
username: redisOptsUsername,
182+
password: redisOptsPassword,
183+
tls: redisOptsTls,
184+
redisOptions: suppliedRedisOptionsRaw,
185+
...clusterLevelOpts
186+
} = redisOpts as RedisOptionsWithNested;
187+
const suppliedRedisOptions: NestedRedisOptions =
188+
suppliedRedisOptionsRaw ?? {};
189+
const baseRedisOptions: NestedRedisOptions = {
190+
...suppliedRedisOptions,
191+
};
192+
delete baseRedisOptions.username;
193+
delete baseRedisOptions.password;
194+
delete baseRedisOptions.tls;
195+
196+
const finalUsername =
197+
redisOptsUsername ??
198+
suppliedRedisOptions.username ??
199+
nodeUsername;
200+
if (finalUsername !== undefined) {
201+
baseRedisOptions.username = finalUsername;
202+
}
203+
204+
const finalPassword =
205+
redisOptsPassword ??
206+
suppliedRedisOptions.password ??
207+
nodePassword;
208+
if (finalPassword !== undefined) {
209+
baseRedisOptions.password = finalPassword;
210+
}
211+
212+
const mergedTls = mergeTlsConfigs(
213+
getClusterTlsFromEnv(),
214+
suppliedRedisOptions.tls,
215+
redisOptsTls
216+
);
217+
if (mergedTls) {
218+
baseRedisOptions.tls = mergedTls;
219+
}
220+
171221
redisClients[key] = new Redis.Cluster(clusterNodes, {
172-
...redisOpts,
173-
redisOptions: {
174-
username,
175-
password,
176-
tls: process.env.REDIS_CLUSTER_TLS
177-
? {
178-
cert: Buffer.from(
179-
process.env.REDIS_CLUSTER_TLS ?? "",
180-
"base64"
181-
).toString("ascii"),
182-
}
183-
: undefined,
184-
},
222+
...clusterLevelOpts,
223+
redisOptions: baseRedisOptions,
185224
});
186225
} else {
187226
redisClients[key] = new Redis(redisOpts);
@@ -300,3 +339,25 @@ export function createQueue(
300339
);
301340
}
302341
}
342+
343+
function getClusterTlsFromEnv(): TlsConnectionOptions | undefined {
344+
if (!process.env.REDIS_CLUSTER_TLS) {
345+
return undefined;
346+
}
347+
return {
348+
cert: Buffer.from(
349+
process.env.REDIS_CLUSTER_TLS ?? "",
350+
"base64"
351+
).toString("ascii"),
352+
};
353+
}
354+
355+
function mergeTlsConfigs(
356+
...configs: Array<TlsConnectionOptions | undefined>
357+
): TlsConnectionOptions | undefined {
358+
const valid = configs.filter(Boolean) as TlsConnectionOptions[];
359+
if (!valid.length) {
360+
return undefined;
361+
}
362+
return Object.assign({}, ...valid);
363+
}

lib/socket.ts

Lines changed: 29 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import { RedisOptions } from "ioredis";
2-
import { pick } from "lodash";
32
import { getCache, updateQueuesCache, queueKey } from "./queues-cache";
43
import { WebSocketClient } from "./ws-autoreconnect";
54
import {
@@ -15,13 +14,8 @@ const { version } = require(`${__dirname}/../package.json`);
1514

1615
const chalk = require("chalk");
1716

18-
export interface Connection {
19-
port?: number;
20-
host?: string;
21-
password?: string;
22-
db?: number;
17+
export interface Connection extends Partial<RedisOptions> {
2318
uri?: string;
24-
tls?: object;
2519
}
2620

2721
export const Socket = (
@@ -139,12 +133,20 @@ export const Socket = (
139133
throw new Error("Unable to update queues");
140134
}
141135
}
142-
const { queue, responders } =
143-
cache[
144-
queueKey({ name: queueName, prefix: queuePrefix || "bull" })
145-
];
136+
const cacheKeyValue = queueKey({
137+
name: queueName,
138+
prefix: queuePrefix || "bull",
139+
});
140+
const cacheEntry = cache[cacheKeyValue];
146141

147-
if (!queue) {
142+
if (!cacheEntry || !cacheEntry.queue) {
143+
console.warn(
144+
`${chalk.yellow("WebSocket:")} ${chalk.red(
145+
"queue cache miss for"
146+
)} ${chalk.green(queueName)} ${chalk.blueBright(
147+
"prefix:"
148+
)} ${chalk.green(queuePrefix || "bull")}`
149+
);
148150
ws.send(
149151
JSON.stringify({
150152
id: msg.id,
@@ -153,6 +155,7 @@ export const Socket = (
153155
startTime
154156
);
155157
} else {
158+
const { queue, responders } = cacheEntry;
156159
switch (res) {
157160
case "queues":
158161
await responders.respondQueueCommand(ws, queue, msg);
@@ -249,31 +252,25 @@ export const Socket = (
249252
}
250253
};
251254

252-
function redisOptsFromConnection(connection: Connection): RedisOptions {
253-
let opts: RedisOptions = {
254-
...pick(connection, [
255-
"host",
256-
"port",
257-
"username",
258-
"password",
259-
"family",
260-
"sentinelPassword",
261-
"db",
262-
"tls",
263-
"sentinels",
264-
"name",
265-
]),
255+
function redisOptsFromConnection(
256+
connection: Connection = {}
257+
): RedisOptions {
258+
let opts: (RedisOptions & { uri?: string }) = {
259+
...connection,
266260
};
267261

268262
if (connection.uri) {
269263
opts = { ...opts, ...redisOptsFromUrl(connection.uri) };
270264
}
265+
delete opts.uri;
271266

272-
opts.retryStrategy = function (times: number) {
273-
times = times % 8;
274-
const delay = Math.round(Math.pow(2, times + 8));
275-
console.log(chalk.yellow("Redis: ") + `Reconnecting in ${delay} ms`);
276-
return delay;
277-
};
267+
if (!opts.retryStrategy) {
268+
opts.retryStrategy = function (times: number) {
269+
times = times % 8;
270+
const delay = Math.round(Math.pow(2, times + 8));
271+
console.log(chalk.yellow("Redis: ") + `Reconnecting in ${delay} ms`);
272+
return delay;
273+
};
274+
}
278275
return opts;
279276
}

0 commit comments

Comments
 (0)