Skip to content

Commit f2ee096

Browse files
author
Matt Willian
committed
feat: improve redis cluster handling
1 parent 215d0a7 commit f2ee096

File tree

1 file changed

+75
-14
lines changed

1 file changed

+75
-14
lines changed

lib/queue-factory.ts

Lines changed: 75 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { Redis, Cluster, RedisOptions } from "ioredis";
2+
import { ConnectionOptions as TlsConnectionOptions } from "tls";
23

34
import { QueueType, getQueueType, redisOptsFromUrl } from "./utils";
45
import { Queue } from "bullmq";
@@ -16,6 +17,13 @@ const maxTime = 40000;
1617
// We keep a redis client that we can reuse for all the queues.
1718
let redisClients: Record<string, Redis | Cluster> = {} as any;
1819

20+
type NestedRedisOptions = Partial<RedisOptions> & {
21+
tls?: TlsConnectionOptions;
22+
};
23+
type RedisOptionsWithNested = RedisOptions & {
24+
redisOptions?: NestedRedisOptions;
25+
};
26+
1927
export interface FoundQueue {
2028
prefix: string;
2129
name: string;
@@ -167,21 +175,52 @@ export function getRedisClient(
167175

168176
if (!redisClients[key]) {
169177
if (clusterNodes && clusterNodes.length) {
170-
const { username, password } = redisOptsFromUrl(clusterNodes[0]);
178+
const { username: nodeUsername, password: nodePassword } =
179+
redisOptsFromUrl(clusterNodes[0]);
180+
const {
181+
username: redisOptsUsername,
182+
password: redisOptsPassword,
183+
tls: redisOptsTls,
184+
redisOptions: suppliedRedisOptionsRaw,
185+
...clusterLevelOpts
186+
} = redisOpts as RedisOptionsWithNested;
187+
const suppliedRedisOptions: NestedRedisOptions =
188+
suppliedRedisOptionsRaw ?? {};
189+
const baseRedisOptions: NestedRedisOptions = {
190+
...suppliedRedisOptions,
191+
};
192+
delete baseRedisOptions.username;
193+
delete baseRedisOptions.password;
194+
delete baseRedisOptions.tls;
195+
196+
const finalUsername =
197+
redisOptsUsername ??
198+
suppliedRedisOptions.username ??
199+
nodeUsername;
200+
if (finalUsername !== undefined) {
201+
baseRedisOptions.username = finalUsername;
202+
}
203+
204+
const finalPassword =
205+
redisOptsPassword ??
206+
suppliedRedisOptions.password ??
207+
nodePassword;
208+
if (finalPassword !== undefined) {
209+
baseRedisOptions.password = finalPassword;
210+
}
211+
212+
const mergedTls = mergeTlsConfigs(
213+
getClusterTlsFromEnv(),
214+
suppliedRedisOptions.tls,
215+
redisOptsTls
216+
);
217+
if (mergedTls) {
218+
baseRedisOptions.tls = mergedTls;
219+
}
220+
171221
redisClients[key] = new Redis.Cluster(clusterNodes, {
172-
...redisOpts,
173-
redisOptions: {
174-
username,
175-
password,
176-
tls: process.env.REDIS_CLUSTER_TLS
177-
? {
178-
cert: Buffer.from(
179-
process.env.REDIS_CLUSTER_TLS ?? "",
180-
"base64"
181-
).toString("ascii"),
182-
}
183-
: undefined,
184-
},
222+
...clusterLevelOpts,
223+
redisOptions: baseRedisOptions,
185224
});
186225
} else {
187226
redisClients[key] = new Redis(redisOpts);
@@ -300,3 +339,25 @@ export function createQueue(
300339
);
301340
}
302341
}
342+
343+
function getClusterTlsFromEnv(): TlsConnectionOptions | undefined {
344+
if (!process.env.REDIS_CLUSTER_TLS) {
345+
return undefined;
346+
}
347+
return {
348+
cert: Buffer.from(
349+
process.env.REDIS_CLUSTER_TLS ?? "",
350+
"base64"
351+
).toString("ascii"),
352+
};
353+
}
354+
355+
function mergeTlsConfigs(
356+
...configs: Array<TlsConnectionOptions | undefined>
357+
): TlsConnectionOptions | undefined {
358+
const valid = configs.filter(Boolean) as TlsConnectionOptions[];
359+
if (!valid.length) {
360+
return undefined;
361+
}
362+
return Object.assign({}, ...valid);
363+
}

0 commit comments

Comments
 (0)