Skip to content

Commit f028968

Browse files
intproBoris Dorofeev
and
Boris Dorofeev
authored
feat: add jobs memory usage avg field (#17)
Co-authored-by: Boris Dorofeev <[email protected]>
1 parent c7a760d commit f028968

File tree

5 files changed

+106
-0
lines changed

5 files changed

+106
-0
lines changed

src/helpers/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ export * from './wrapQueueArgs';
77
export * from './composeFC';
88
export * from './deleteKeys';
99
export * from './fixDelayStream';
10+
export * from './memoryUsage';

src/helpers/memoryUsage.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import { getBullConnection } from './getBullConnection';
2+
import { Options } from '../definitions';
3+
import Redis from 'ioredis';
4+
import path from 'path';
5+
import util from 'util';
6+
import fs from 'fs';
7+
8+
let commandDefined = false;
9+
10+
async function defineCommand(redis: Redis.Redis) {
11+
const readFile = util.promisify(fs.readFile);
12+
const lua = await readFile(path.join(__dirname, 'zsetKeysMemoryUsage.lua'));
13+
14+
redis.defineCommand('zsetKeysMemoryUsage', { numberOfKeys: 0, lua: lua.toString() });
15+
16+
commandDefined = true;
17+
}
18+
19+
export async function getZsetKeysMemoryUsageAvg(
20+
prefix: string,
21+
queueName: string,
22+
keySetName: string,
23+
limit: number,
24+
opts: Options
25+
): Promise<number> {
26+
const redis = getBullConnection(opts);
27+
28+
if (!commandDefined) {
29+
await defineCommand(redis);
30+
}
31+
32+
const fullPrefix = [prefix, queueName].join(':');
33+
34+
const [amountBytes, jobsCount] = await (redis as any).zsetKeysMemoryUsage(
35+
fullPrefix + ':',
36+
keySetName,
37+
limit
38+
);
39+
40+
return amountBytes / (jobsCount || 1);
41+
}

src/helpers/zsetKeysMemoryUsage.lua

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
--[[
2+
Input:
3+
ARGV[1] keys prefix
4+
ARGV[2] set name
5+
ARGV[3] limit
6+
7+
Output:
8+
(integer) bytesAmount
9+
]]
10+
11+
local jobs = redis.call("ZREVRANGE", ARGV[1]..ARGV[2], 0, ARGV[3])
12+
13+
local bytesAmount = 0
14+
15+
if (#jobs > 0) then
16+
for _, jobId in ipairs(jobs) do
17+
bytesAmount = bytesAmount + redis.call('memory', 'usage', ARGV[1]..jobId)
18+
end
19+
end
20+
21+
return {bytesAmount, #jobs}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import { SchemaComposer, ObjectTypeComposerFieldConfigDefinition } from 'graphql-compose';
2+
import { Queue } from 'bullmq';
3+
import { Options } from '../../definitions';
4+
import { getZsetKeysMemoryUsageAvg } from '../../helpers';
5+
6+
export function createJobsMemoryUsageAvgFC(
7+
sc: SchemaComposer<any>,
8+
opts: Options
9+
): ObjectTypeComposerFieldConfigDefinition<any, any> {
10+
return {
11+
type: 'Int!',
12+
args: {
13+
keySetName: {
14+
type: sc.createEnumTC({
15+
name: 'keySetNamesEnum',
16+
values: {
17+
COMPLETED: { value: 'completed' },
18+
FAILED: { value: 'failed' },
19+
},
20+
}),
21+
defaultValue: 'completed',
22+
},
23+
limit: {
24+
type: 'Int',
25+
defaultValue: 100,
26+
},
27+
},
28+
resolve: async (queue: Queue, { limit, keySetName }) => {
29+
const avgBytes =
30+
(await getZsetKeysMemoryUsageAvg(
31+
queue.opts?.prefix || 'bull',
32+
queue.name,
33+
keySetName,
34+
limit,
35+
opts
36+
)) || 0;
37+
38+
return avgBytes.toFixed(0);
39+
},
40+
};
41+
}

src/types/queue/Queue.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { createDelayedJobsFC } from './Queue.delayedJobs';
99
import { createFailedJobsFC } from './Queue.failedJobs';
1010
import { createWorkersTC } from './Queue.workers';
1111
import { createDurationAvgFC } from './Queue.durationAvg';
12+
import { createJobsMemoryUsageAvgFC } from './Queue.jobsMemoryUsageAvg';
1213
import { SchemaComposer } from 'graphql-compose';
1314
import { Options } from '../../definitions';
1415

@@ -29,6 +30,7 @@ export function getQueueTC(sc: SchemaComposer<any>, opts: Options) {
2930
jobsFailed: createFailedJobsFC(sc, opts),
3031
activeWorkers: createWorkersTC(sc, opts),
3132
durationAvg: createDurationAvgFC(sc, opts),
33+
jobsMemoryUsageAvg: createJobsMemoryUsageAvgFC(sc, opts),
3234
});
3335
});
3436
}

0 commit comments

Comments
 (0)