Skip to content

Commit

Permalink
perf(worker): promote delayed jobs while queue is rate limited (#2697)…
Browse files Browse the repository at this point in the history
… ref #2582
  • Loading branch information
roggervalf authored Aug 9, 2024
1 parent 5344692 commit f3290ac
Showing 1 changed file with 15 additions and 2 deletions.
17 changes: 15 additions & 2 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ import {
// 10 seconds is the maximum time a BRPOPLPUSH can block.
const maximumBlockTimeout = 10;

// 30 seconds is the maximum limit until.
const maximumLimitUntil = 30000;

// note: sandboxed processors would also like to define concurrency per process
// for better resource utilization.

Expand Down Expand Up @@ -563,10 +566,14 @@ export class Worker<
this.waiting = null;
}
} else {
if (this.limitUntil) {
const limitUntil = this.limitUntil;
if (limitUntil) {
this.abortDelayController?.abort();
this.abortDelayController = new AbortController();
await this.delay(this.limitUntil, this.abortDelayController);
await this.delay(
this.getLimitUntil(limitUntil),
this.abortDelayController,
);
}
return this.moveToActive(client, token, this.opts.name);
}
Expand Down Expand Up @@ -686,6 +693,12 @@ will never work with more accuracy than 1ms. */
}
}

protected getLimitUntil(limitUntil: number): number {
// We restrict the maximum limit until to 30 second to
// be able to promote delayed jobs while queue is rate limited
return Math.min(limitUntil, maximumLimitUntil);
}

/**
*
* This function is exposed only for testing purposes.
Expand Down

0 comments on commit f3290ac

Please sign in to comment.