Skip to content

Commit

Permalink
fix: address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dylandepass committed Feb 3, 2025
1 parent e4ad90d commit dfa036c
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 26 deletions.
3 changes: 3 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions packages/helix-shared-process-queue/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
"publishConfig": {
"access": "public"
},
"dependencies": {
"@adobe/helix-shared-async": "2.0.2"
},
"devDependencies": {
"sinon": "19.0.2"
}
Expand Down
9 changes: 5 additions & 4 deletions packages/helix-shared-process-queue/src/process-queue.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ export declare type ProcessQueueHandler<
*
* @param queue A list of entries to be processed
* @param fn A handler function
* @param {number} [maxConcurrent=8] Concurrency level
* @param {RateLimitOptions} [rateLimitOptions] Optional rate limit options for throttling processing.
* @param {RateLimitOptions|number} [rateLimitOptions=null] Optional rate limit options for throttling processing.
* @returns the results
*/
export default function processQueue<
Expand All @@ -67,17 +66,19 @@ export default function processQueue<
>(
queue: TQueue,
fn: THandler,
maxConcurrent?: number,
rateLimitOptions?: RateLimitOptions | null
rateLimitOptions?: RateLimitOptions | number | null
): Promise<TReturn[]>;

/**
* Rate limiting options for processQueue
*
* @property {number} maxConcurrent Maximum number of items processed concurrently
* @property {number} limit Maximum number of items processed within the interval
* @property {number} interval Time window in milliseconds
*/
export declare type RateLimitOptions = {
maxConcurrent: number;
limit: number;
interval: number;
abortSignal?: AbortSignal;
};
53 changes: 33 additions & 20 deletions packages/helix-shared-process-queue/src/process-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
* governing permissions and limitations under the License.
*/

import { sleep } from '@adobe/helix-shared-async';

/**
* Simple dequeing iterator.
* @param queue
Expand Down Expand Up @@ -42,7 +44,7 @@ function* dequeue(queue) {
* @returns {Function} An async function that waits until a token is available
*/
function createRateLimiter(limit, interval) {
let tokens = limit;
let numTokens = limit;
let lastRefill = Date.now();

return async function waitForToken() {
Expand All @@ -52,19 +54,19 @@ function createRateLimiter(limit, interval) {

// Refill tokens if the interval has passed
if (now - lastRefill >= interval) {
tokens = limit;
numTokens = limit;
lastRefill = now;
}

// If a token is available, consume one and exit
if (tokens > 0) {
tokens -= 1;
if (numTokens > 0) {
numTokens -= 1;
return;
}

// Else, wait before checking again
// eslint-disable-next-line no-await-in-loop, no-promise-executor-return
await new Promise((resolve) => setTimeout(resolve, 50));
// eslint-disable-next-line no-await-in-loop
await sleep(interval - (now - lastRefill));
}
};
}
Expand All @@ -75,28 +77,35 @@ function createRateLimiter(limit, interval) {
*
* @param {Iterable|Array} queue A list of tasks
* @param {ProcessQueueHandler} fn A handler function `fn(task:any, queue:array, results:array)`
* @param {number} [maxConcurrent = 8] Concurrency level
* @param {RateLimitOptions} [rateLimitOptions=null] Optional rate limiting options
* @param {RateLimitOptions|number} [rateLimitOptions=null] Optional rate limiting options
* @returns {Promise<Array>} the results
*/
export default async function processQueue(queue, fn, maxConcurrent = 8, rateLimitOptions = null) {
export default async function processQueue(
queue,
fn,
rateLimitOptions,
) {
if (typeof queue !== 'object') {
throw Error('invalid queue argument: iterable expected');
}

// noop by default
let waitForToken = async () => {};

// If rate limiting options are provided, define a token bucket limiter.
if (
rateLimitOptions
&& rateLimitOptions.limit != null
&& rateLimitOptions.interval != null
) {
const { limit, interval } = rateLimitOptions;
waitForToken = createRateLimiter(limit, interval);
if (rateLimitOptions !== undefined && typeof rateLimitOptions !== 'object' && typeof rateLimitOptions !== 'number') {
throw Error('invalid rate limit options argument: object or number expected');
}

const {
limit,
interval,
maxConcurrent = 8,
abortSignal,
} = typeof rateLimitOptions === 'object'
? rateLimitOptions
: { maxConcurrent: rateLimitOptions || 8 };

const waitForToken = (limit && interval != null)
? createRateLimiter(limit, interval, abortSignal)
: async () => {};

const running = [];
const results = [];

Expand Down Expand Up @@ -127,6 +136,10 @@ export default async function processQueue(queue, fn, maxConcurrent = 8, rateLim
}

for await (const value of iter) {
if (abortSignal?.aborted) {
return results;
}

await waitForToken();

while (running.length >= maxConcurrent) {
Expand Down
67 changes: 65 additions & 2 deletions packages/helix-shared-process-queue/test/process-queue.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ describe('Process Queue', () => {
}

// Concurrency of 2, 20 operations per 30000ms (30 seconds)
const processPromise = processQueue(tasks, recordTestFunction, 2, {
const processPromise = processQueue(tasks, recordTestFunction, {
maxConcurrent: 2,
limit: 20,
interval: 30000,
});
Expand All @@ -222,6 +223,57 @@ describe('Process Queue', () => {
assert(timestamps[80] >= 120000);
});

it('rate limited queue can be aborted', async () => {
const abortController = new AbortController();
// Use fake timers to simulate time passing
const clock = sinon.useFakeTimers();
const timestamps = [];

async function recordTestFunction(task) {
if (task.number === 50) {
abortController.abort();
return;
}

timestamps.push(clock.now);
// eslint-disable-next-line no-promise-executor-return
await new Promise((resolve) => setTimeout(resolve, task.time));

// eslint-disable-next-line consistent-return
return task.number * task.number;
}

// Create 100 tasks with a duration of 100ms each
const tasks = [];
for (let i = 0; i < 100; i += 1) {
tasks.push({ time: 100, number: i });
}

// Concurrency of 2, 20 operations per 30000ms (30 seconds)
const processPromise = processQueue(tasks, recordTestFunction, {
maxConcurrent: 2,
limit: 20,
interval: 30000,
abortSignal: abortController.signal,
});

// Increase time enough so that all tasks can complete
await clock.tickAsync(160000);

const result = await processPromise;
clock.restore();

assert.strictEqual(result.length, 50);

// With the rate limit, tasks 0-19 should start near time 0
// Tasks 20-39 should not start until after 30000ms
// Tasks 40-49 after 60000ms
// Tasks 50-99 should never start
assert(timestamps[20] >= 30000);
assert(timestamps[49] >= 60000);
assert(timestamps[50] === undefined);
});

it('falls back to no rate limit if partial options', async () => {
const timestamps = [];
const tasks = [
Expand All @@ -237,7 +289,7 @@ describe('Process Queue', () => {
assert(delay < 100, `Expected delay < 100ms, got ${delay}ms`);
});

it('falls back to no rate limit if invalid options are provided', async () => {
it('falls back to no rate limit if empty rate limit options are provided', async () => {
const timestamps = [];
const tasks = [
{ id: 1, timestamps },
Expand All @@ -251,5 +303,16 @@ describe('Process Queue', () => {
const delay = timestamps[2] - timestamps[0];
assert(delay < 100, `Expected delay < 100ms, got ${delay}ms`);
});

it('throws error if invalid rate limit options are provided', async () => {
const timestamps = [];
const tasks = [
{ id: 1, timestamps },
{ id: 2, timestamps },
{ id: 3, timestamps },
];

await assert.rejects(processQueue(tasks, recordTask, 'invalid'));
});
});
});

0 comments on commit dfa036c

Please sign in to comment.