Skip to content

Commit

Permalink
feat(update): minor changes
Browse files Browse the repository at this point in the history
  • Loading branch information
mgcrea committed Jun 28, 2024
1 parent 6d8aa92 commit 635a50a
Showing 1 changed file with 41 additions and 4 deletions.
45 changes: 41 additions & 4 deletions src/PrismaQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ export class PrismaQueue<
private concurrency = 0;
private stopped = true;

/**
* Constructs a PrismaQueue object with specified options and a worker function.
* @param options - Configuration options for the queue.
* @param worker - The worker function that processes jobs.
*/
public constructor(
private options: PrismaQueueOptions = {},
public worker: JobWorker<T, U>,
Expand Down Expand Up @@ -118,11 +123,17 @@ export class PrismaQueue<
);
}

/**
* Gets the Prisma delegate associated with the queue job model.
*/
private get model(): Prisma.QueueJobDelegate {
const queueJobKey = uncapitalize(this.config.modelName) as "queueJob";
return this.#prisma[queueJobKey];
}

/**
* Starts the job processing in the queue.
*/
public async start(): Promise<void> {
debug(`starting queue named="${this.name}"...`);
if (!this.stopped) {
Expand All @@ -133,6 +144,9 @@ export class PrismaQueue<
return this.poll();
}

/**
* Stops the job processing in the queue.
*/
public async stop(): Promise<void> {
const { pollInterval } = this.config;
debug(`stopping queue named="${this.name}"...`);
Expand All @@ -141,9 +155,18 @@ export class PrismaQueue<
await waitFor(pollInterval);
}

// Alias for enqueue
/**
* Adds a job to the queue.
* @param payloadOrFunction - The job payload or a function that returns a job payload.
* @param options - Options for the job, such as scheduling and attempts.
*/
public add = this.enqueue;

/**
* Adds a job to the queue.
* @param payloadOrFunction - The job payload or a function that returns a job payload.
* @param options - Options for the job, such as scheduling and attempts.
*/
public async enqueue(
payloadOrFunction: T | JobCreator<T>,
options: EnqueueOptions = {},
Expand Down Expand Up @@ -183,6 +206,11 @@ export class PrismaQueue<
return job;
}

/**
* Schedules a job according to the cron expression or a specific run time.
* @param options - Scheduling options including cron, key, and run time.
* @param payloadOrFunction - The job payload or a function that returns a job payload.
*/
public async schedule(
options: ScheduleOptions,
payloadOrFunction: T | JobCreator<T>,
Expand All @@ -194,6 +222,9 @@ export class PrismaQueue<
return this.enqueue(payloadOrFunction, { key, cron, runAt, ...otherOptions });
}

/**
* Polls the queue and processes jobs according to the configured intervals and concurrency settings.
*/
private async poll(): Promise<void> {
const { maxConcurrency, pollInterval, jobInterval } = this.config;
debug(`polling queue named="${this.name}" with maxConcurrency=${maxConcurrency}...`);
Expand Down Expand Up @@ -243,9 +274,10 @@ export class PrismaQueue<
}
}

// https://www.2ndquadrant.com/en/blog/what-is-select-skip-locked-for-in-postgresql-9-5/
// @TODO https://docs.quirrel.dev/api/queue
// https://github.com/quirrel-dev/quirrel/blob/main/package.json
/**
* Dequeues and processes the next job in the queue. Handles locking and error management internally.
* @returns {Promise<PrismaJob<T, U> | null>} The job that was processed or null if no job was available.
*/
private async dequeue(): Promise<PrismaJob<T, U> | null> {
if (this.stopped) {
return null;
Expand Down Expand Up @@ -342,6 +374,11 @@ export class PrismaQueue<
return job;
}

/**
* Counts the number of jobs in the queue, optionally only those available for processing.
* @param {boolean} onlyAvailable - If true, counts only jobs that are ready to be processed.
* @returns {Promise<number>} The number of jobs.
*/
public async size(onlyAvailable?: boolean): Promise<number> {
const { name: queueName } = this;
const date = new Date();
Expand Down

0 comments on commit 635a50a

Please sign in to comment.