Skip to content

Commit 1013f4f

Browse files
authored
feat: pep up queue, helps to alive broken cron tasks via ...:delay steam
* fix: job name not necessarily * feat: add pepUp queue mutation
1 parent 30a90f4 commit 1013f4f

File tree

5 files changed

+100
-0
lines changed

5 files changed

+100
-0
lines changed

src/composeBull.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import {
2121
createJobUpdateFC,
2222
createJobLogAddFC,
2323
createJobMoveToDelayedFC,
24+
createQueuePepUpFC,
2425
} from './mutation';
2526
import { wrapMutationFC, wrapQueueArgs, composeFC } from './helpers';
2627

@@ -58,6 +59,7 @@ export function composeBull(opts: Options & { schemaComposer?: SchemaComposer<an
5859
jobUpdate: wrapMutation(createJobUpdateFC),
5960
jobLogAdd: wrapMutation(createJobLogAddFC),
6061
jobMoveToDelayed: wrapMutation(createJobMoveToDelayedFC),
62+
queuePepUp: wrapMutation(createQueuePepUpFC),
6163
},
6264
};
6365
}

src/helpers/fixDelayStream.ts

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import { MutationError, ErrorCodeEnum } from './MutationError';
2+
import { getBullConnection } from './getBullConnection';
3+
import { Options } from '../definitions';
4+
5+
type Record = {
6+
id: string;
7+
nextTimestamp: string;
8+
};
9+
10+
type Result = {
11+
first: Record;
12+
last: Record;
13+
fixLast: Record;
14+
};
15+
16+
export async function addFixRecordToDelayStream(
17+
prefix: string,
18+
queueName: string,
19+
opts: Options,
20+
checkExistence: boolean = true
21+
): Promise<Result> {
22+
const redis = getBullConnection(opts);
23+
24+
const fullQueueName = [prefix, queueName].join(':');
25+
26+
if (checkExistence) {
27+
const queueExists = await redis.exists([fullQueueName, 'meta'].join(':'));
28+
29+
if (!queueExists) {
30+
throw new MutationError('Queue not found!', ErrorCodeEnum.QUEUE_NOT_FOUND);
31+
}
32+
}
33+
34+
const streamName = fullQueueName + ':delay';
35+
36+
const first = await redis.xrange(streamName, '-', '+', 'count', 1);
37+
const last = await redis.xrevrange(streamName, '+', '-', 'count', 1);
38+
const defaultRec = {
39+
id: '0-0',
40+
nextTimestamp: '',
41+
};
42+
const fixLastNextTimestamp = `${Date.now()}`;
43+
const fixLastId = await redis.xadd(streamName, '*', 'nextTimestamp', fixLastNextTimestamp);
44+
45+
function readRecords(records: [string, string[]][]): Record {
46+
if (records.length > 0) {
47+
return {
48+
id: records[0][0],
49+
nextTimestamp: records[0][1][1],
50+
};
51+
} else {
52+
return defaultRec;
53+
}
54+
}
55+
56+
return {
57+
first: readRecords(first),
58+
last: readRecords(last),
59+
fixLast: {
60+
id: fixLastId,
61+
nextTimestamp: fixLastNextTimestamp,
62+
},
63+
};
64+
}

src/helpers/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ export * from './wrapMutationFC';
66
export * from './wrapQueueArgs';
77
export * from './composeFC';
88
export * from './deleteKeys';
9+
export * from './fixDelayStream';

src/mutation/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@ export { createJobRetryFC } from './jobRetry';
1616
export { createJobUpdateFC } from './jobUpdate';
1717
export { createJobLogAddFC } from './jobLogAdd';
1818
export { createJobMoveToDelayedFC } from './jobMoveToDelayed';
19+
export { createQueuePepUpFC } from './queuePepUp';

src/mutation/queuePepUp.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose';
2+
import { addFixRecordToDelayStream } from '../helpers';
3+
import { Options } from '../definitions';
4+
5+
export function createQueuePepUpFC(
6+
sc: SchemaComposer<any>,
7+
opts: Options
8+
): ObjectTypeComposerFieldConfigAsObjectDefinition<any, any> {
9+
const { typePrefix } = opts;
10+
11+
return {
12+
type: sc.createObjectTC({
13+
name: `${typePrefix}QueuePepUpPayload`,
14+
fields: { records: 'JSON' },
15+
}),
16+
args: {
17+
prefix: {
18+
type: 'String!',
19+
defaultValue: 'bull',
20+
},
21+
queueName: 'String!',
22+
checkExistence: {
23+
type: 'Boolean',
24+
defaultValue: true,
25+
},
26+
},
27+
resolve: async (_, { prefix, queueName, checkExistence }) => {
28+
const records = await addFixRecordToDelayStream(prefix, queueName, opts, checkExistence);
29+
return { records };
30+
},
31+
};
32+
}

0 commit comments

Comments
 (0)