Skip to content

Commit

Permalink
fix: prefix redis cache connection cache with checksum
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Aug 30, 2024
1 parent 59419f2 commit 72e3deb
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 23 deletions.
58 changes: 36 additions & 22 deletions lib/queue-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const maxCount = 150000;
const maxTime = 40000;

// We keep a redis client that we can reuse for all the queues.
let redisClients: Record<"bull" | "bullmq", Redis | Cluster> = {} as any;
let redisClients: Record<string, Redis | Cluster> = {} as any;

export interface FoundQueue {
prefix: string;
Expand All @@ -41,20 +41,18 @@ const scanForQueues = async (node: Redis | Cluster, startTime: number) => {
} while (Date.now() - startTime < maxTime && cursor !== "0");

return keys;
}
};

const getQueueKeys = async (client: Redis | Cluster, queueNames?: string[]) => {
let nodes = "nodes" in client ? client.nodes('master') : [client]
let nodes = "nodes" in client ? client.nodes("master") : [client];
let keys = [];
const startTime = Date.now();
const foundQueues = new Set<string>();

for await (const node of nodes) {

// If we have proposed queue names, lets check if they exist (including prefix)
// Basically checking if there is a id key for the queue (prefix:name:id)
if (queueNames) {

const queueKeys = queueNames.map((queueName) => {
// Separate queue name from prefix
let [prefix, name] = queueName.split(":");
Expand Down Expand Up @@ -82,13 +80,14 @@ const getQueueKeys = async (client: Redis | Cluster, queueNames?: string[]) => {
const match = queueNameRegExp.exec(key);
console.log(
chalk.yellow("Redis:") +
chalk.red(` Queue "${match[1]}:${match[2]}" not found in Redis. Skipping...`)
chalk.red(
` Queue "${match[1]}:${match[2]}" not found in Redis. Skipping...`
)
);
}
}

} else {
keys.push(...await scanForQueues(node, startTime));
keys.push(...(await scanForQueues(node, startTime)));
}
}
return keys;
Expand Down Expand Up @@ -152,37 +151,52 @@ export function getRedisClient(
type: "bull" | "bullmq",
clusterNodes?: string[]
) {
if (!redisClients[type]) {
// Compute checksum for redisOpts
const checksumJson = JSON.stringify(redisOpts);
const checksum = require("crypto")
.createHash("md5")
.update(checksumJson)
.digest("hex");

const key = `${type}-${checksum}`;

if (!redisClients[key]) {
if (clusterNodes && clusterNodes.length) {
const { username, password } = redisOptsFromUrl(clusterNodes[0])
redisClients[type] = new Redis.Cluster(clusterNodes, {
const { username, password } = redisOptsFromUrl(clusterNodes[0]);
redisClients[key] = new Redis.Cluster(clusterNodes, {
...redisOpts,
redisOptions: {
username,
password,
tls: process.env.REDIS_CLUSTER_TLS ? {
cert: Buffer.from(process.env.REDIS_CLUSTER_TLS ?? '', 'base64').toString('ascii')
} : undefined,
}
tls: process.env.REDIS_CLUSTER_TLS
? {
cert: Buffer.from(
process.env.REDIS_CLUSTER_TLS ?? "",
"base64"
).toString("ascii"),
}
: undefined,
},
});
} else {
redisClients[type] = new Redis(redisOpts);
redisClients[key] = new Redis(redisOpts);
}

redisClients[type].on("error", (err: Error) => {
redisClients[key].on("error", (err: Error) => {
console.log(
`${chalk.yellow("Redis:")} ${chalk.red("redis connection error")} ${err.message
`${chalk.yellow("Redis:")} ${chalk.red("redis connection error")} ${
err.message
}`
);
});

redisClients[type].on("connect", () => {
redisClients[key].on("connect", () => {
console.log(
`${chalk.yellow("Redis:")} ${chalk.green("connected to redis server")}`
);
});

redisClients[type].on("end", () => {
redisClients[key].on("end", () => {
console.log(
`${chalk.yellow("Redis:")} ${chalk.blueBright(
"disconnected from redis server"
Expand All @@ -191,7 +205,7 @@ export function getRedisClient(
});
}

return redisClients[type];
return redisClients[key];
}

export async function execRedisCommand(
Expand Down Expand Up @@ -255,7 +269,7 @@ export function createQueue(
default:
console.error(
chalk.red(`ERROR:`) +
`Unexpected queue type: ${foundQueue.type} for queue ${foundQueue.name}`
`Unexpected queue type: ${foundQueue.type} for queue ${foundQueue.name}`
);
}
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
"build": "tsc",
"test": "jest",
"start": "node app.js",
"prepare": "yarn build"
"prepare": "npm run build"
},
"repository": {
"type": "git",
Expand Down

0 comments on commit 72e3deb

Please sign in to comment.