Skip to content

Commit de3224b

Browse files
authored
feat: add jobsRetry mutation, where id argument is array (#75)
1 parent 3ca1567 commit de3224b

File tree

6 files changed

+80
-5
lines changed

6 files changed

+80
-5
lines changed

example/src/demo_queues/fetchMetrics.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,22 @@ export const metricsQueue = new Queue(queueSettings.name, {
1919
});
2020

2121
metricsQueue.add(
22-
'fetch_metrics_every_5m',
23-
{ field1: 'asdasdadas' },
24-
{ repeat: { cron: '*/1 * * * *' } }
22+
'fetch_metrics_every_2m',
23+
{ field1: 'some data' },
24+
{ repeat: { cron: '*/2 * * * *' } }
2525
);
2626

27-
metricsQueue.add('fetch_metrics_every_5000', { field1: 'asdasdadas' }, { repeat: { every: 5000 } });
27+
metricsQueue.add(
28+
'fetch_metrics_every_30000',
29+
{ field1: 'some data' },
30+
{ repeat: { every: 30000 } }
31+
);
2832

2933
const metricsWorker = new Worker(
3034
queueSettings.name,
3135
async (job) => {
36+
if (Math.random() > 0.8) throw new Error('Bull worker random error');
37+
3238
for (let i = 0; i < 5; i++) {
3339
job.updateProgress(i * 20);
3440
await new Promise((resolve) => setTimeout(resolve, 1000));

src/composeBull.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import {
2929
createJobRremoveFC,
3030
createJobRremoveBulkFC,
3131
createJobRetryFC,
32+
createJobsRetryFC,
3233
createJobUpdateFC,
3334
createJobLogAddFC,
3435
createJobMoveToDelayedFC,
@@ -91,6 +92,7 @@ export function composeBull(
9192
jobRemove: wrapMutation(createJobRremoveFC),
9293
jobRemoveBulk: wrapMutation(createJobRremoveBulkFC),
9394
jobRetry: wrapMutation(createJobRetryFC),
95+
jobsRetry: wrapMutation(createJobsRetryFC),
9496
jobUpdate: wrapMutation(createJobUpdateFC),
9597
jobLogAdd: wrapMutation(createJobLogAddFC),
9698
jobMoveToDelayed: wrapMutation(createJobMoveToDelayedFC),

src/helpers/MutationError.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
export class MutationError extends Error {
2-
constructor(message: string, public code: ErrorCodeEnum) {
2+
constructor(message: string, public code: ErrorCodeEnum, public id?: string) {
33
super(message);
44
Object.setPrototypeOf(this, MutationError.prototype);
55
}

src/helpers/getAsArray.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
export function getAsArray<T>(value: T | T[]): T[] {
2+
return Array.isArray(value) ? value : [value];
3+
}

src/mutation/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ export { createjobPromoteFC } from './jobPromote';
1313
export { createJobRremoveFC } from './jobRemove';
1414
export { createJobRremoveBulkFC } from './jobRemoveBulk';
1515
export { createJobRetryFC } from './jobRetry';
16+
export { createJobsRetryFC } from './jobsRetry';
1617
export { createJobUpdateFC } from './jobUpdate';
1718
export { createJobLogAddFC } from './jobLogAdd';
1819
export { createJobMoveToDelayedFC } from './jobMoveToDelayed';

src/mutation/jobsRetry.ts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import { SchemaComposer, ObjectTypeComposerFieldConfigAsObjectDefinition } from 'graphql-compose';
2+
import { MutationError, ErrorCodeEnum } from '../helpers';
3+
import { JobStatusEnum, getJobStatusEnumTC } from '../types';
4+
import { findQueue } from '../helpers';
5+
import { Options } from '../definitions';
6+
import { getAsArray } from '../helpers/getAsArray';
7+
8+
export function createJobsRetryFC(
9+
sc: SchemaComposer<any>,
10+
opts: Options
11+
): ObjectTypeComposerFieldConfigAsObjectDefinition<any, any> {
12+
const { typePrefix } = opts;
13+
14+
return {
15+
type: sc.createObjectTC({
16+
name: `${typePrefix}JobsRetryPayload`,
17+
fields: {
18+
ids: '[String]',
19+
state: getJobStatusEnumTC(sc, opts),
20+
},
21+
}),
22+
args: {
23+
prefix: {
24+
type: 'String!',
25+
defaultValue: 'bull',
26+
},
27+
queueName: 'String!',
28+
ids: '[String!]!',
29+
},
30+
resolve: async (_, { prefix, queueName, ids }) => {
31+
const queue = await findQueue(prefix, queueName, opts);
32+
const _ids = getAsArray(ids);
33+
34+
if (_ids.length > 100) {
35+
throw new MutationError(
36+
'Arg. <id> constraint: send less than 100 IDs.',
37+
ErrorCodeEnum.OTHER_ERROR
38+
);
39+
}
40+
41+
const promises: Promise<void>[] = [];
42+
43+
for (const _id of _ids) {
44+
promises.push(
45+
queue.getJob(_id).then((job) => {
46+
if (!job)
47+
throw new MutationError(`Job ${_id} not found!`, ErrorCodeEnum.JOB_NOT_FOUND, _id);
48+
return job.retry();
49+
})
50+
);
51+
}
52+
53+
// Let there be a delay (await),
54+
// this will make the execution more obvious to client.
55+
await Promise.all(promises);
56+
57+
return {
58+
ids,
59+
state: JobStatusEnum.WAITING,
60+
};
61+
},
62+
};
63+
}

0 commit comments

Comments
 (0)