Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding concurrency functionality #562

Draft
wants to merge 5 commits into
base: canary
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,11 @@ app.start();

- The queue is polled continuously for messages using [long polling](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html).
- Throwing an error (or returning a rejected promise) from the handler function will cause the message to be left on the queue. An [SQS redrive policy](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/SQSDeadLetterQueue.html) can be used to move messages that cannot be processed to a dead letter queue.
- By default messages are processed one at a time – a new message won't be received until the first one has been processed. To process messages in parallel, use the `batchSize` option [detailed here](https://bbc.github.io/sqs-consumer/interfaces/ConsumerOptions.html#batchSize).
- It's also important to await any processing that you are doing to ensure that messages are processed one at a time.
- Messages can be processed in two ways:
1. Individual message processing (default): Messages are processed one at a time by default. You can control parallel processing using the `concurrency` option to specify how many messages can be processed simultaneously.
2. Batch processing: Using the `batchSize` option [detailed here](https://bbc.github.io/sqs-consumer/interfaces/ConsumerOptions.html#batchSize) processes messages in batches. When using batch processing, the entire batch is processed together and concurrency controls don't apply.
- Note: When both `batchSize` and `concurrency` are set, `concurrency` will automatically be set to at least match `batchSize` to maintain compatibility.
- It's important to await any processing that you are doing to ensure messages are processed correctly.
- By default, messages that are sent to the `handleMessage` and `handleMessageBatch` functions will be considered as processed if they return without an error.
- To acknowledge individual messages, please return the message that you want to acknowledge if you are using `handleMessage` or the messages for `handleMessageBatch`.
- To note, returning an empty object or an empty array will be considered an acknowledgement of no message(s) and will result in no messages being deleted. If you would like to change this behaviour, please use the `alwaysAcknowledge` option [detailed here](https://bbc.github.io/sqs-consumer/interfaces/ConsumerOptions.html).
Expand Down
57 changes: 44 additions & 13 deletions src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,14 @@ export class Consumer extends TypedEventEmitter {
private authenticationErrorTimeout: number;
private pollingWaitTimeMs: number;
private pollingCompleteWaitTimeMs: number;
private concurrencyWaitTimeMs: number;
private heartbeatInterval: number;
private isPolling = false;
private stopRequestedAtTimestamp: number;
public abortController: AbortController;
private extendedAWSErrors: boolean;
private concurrency: number;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want a maxConcurrency setting that ensures we're not trying to do silly stuff.

private inFlightMessages = 0;

constructor(options: ConsumerOptions) {
super(options.queueUrl);
Expand All @@ -92,9 +95,11 @@ export class Consumer extends TypedEventEmitter {
options.authenticationErrorTimeout ?? 10000;
this.pollingWaitTimeMs = options.pollingWaitTimeMs ?? 0;
this.pollingCompleteWaitTimeMs = options.pollingCompleteWaitTimeMs ?? 0;
this.concurrencyWaitTimeMs = options.concurrencyWaitTimeMs ?? 50;
this.shouldDeleteMessages = options.shouldDeleteMessages ?? true;
this.alwaysAcknowledge = options.alwaysAcknowledge ?? false;
this.extendedAWSErrors = options.extendedAWSErrors ?? false;
this.concurrency = Math.max(options.concurrency ?? 1, this.batchSize);
this.sqs =
options.sqs ||
new SQSClient({
Expand Down Expand Up @@ -328,21 +333,47 @@ export class Consumer extends TypedEventEmitter {
private async handleSqsResponse(
response: ReceiveMessageCommandOutput,
): Promise<void> {
if (hasMessages(response)) {
if (this.handleMessageBatch) {
await this.processMessageBatch(response.Messages);
} else {
await Promise.all(
response.Messages.map((message: Message) =>
this.processMessage(message),
),
);
}

this.emit("response_processed");
} else if (response) {
if (!hasMessages(response)) {
this.emit("empty");
return;
}

const messages = response.Messages;

if (this.handleMessageBatch) {
await this.processMessageBatch(messages);
} else {
let waitingMessages = 0;
await Promise.all(
messages.map(async (message) => {
while (
this.batchSize === 1 &&
this.inFlightMessages >= this.concurrency
) {
if (waitingMessages === 0) {
this.emit("concurrency_limit_reached", {
limit: this.concurrency,
waiting: messages.length - this.inFlightMessages,
});
}
waitingMessages++;
await new Promise((resolve) =>
setTimeout(resolve, this.concurrencyWaitTimeMs),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to think if concurrencyWaitTimeMs, it's probably too late and my brain is mush at this point though, but I think this conflicts with the polling timeout? Need to think on it.

);
if (this.stopped) return;
}
waitingMessages = Math.max(0, waitingMessages - 1);
this.inFlightMessages++;
try {
await this.processMessage(message);
} finally {
this.inFlightMessages--;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if it could poll again to maintain maximum concurrency.

}
}),
);
}

this.emit("response_processed");
}

/**
Expand Down
20 changes: 19 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,17 @@ export interface ConsumerOptions {
* example to add middlewares.
*/
postReceiveMessageCallback?(): Promise<void>;
/**
* The maximum number of messages that can be processed concurrently.
* If not provided, messages will be processed sequentially.
* @defaultvalue `1`
*/
concurrency?: number;
/**
* The duration (in milliseconds) to wait between concurrency checks when the concurrency limit is reached.
* @defaultvalue `50`
*/
concurrencyWaitTimeMs?: number;
/**
* Set this to `true` if you want to receive additional information about the error
* that occurred from AWS, such as the response and metadata.
Expand All @@ -171,7 +182,9 @@ export type UpdatableOptions =
| "visibilityTimeout"
| "batchSize"
| "waitTimeSeconds"
| "pollingWaitTimeMs";
| "pollingWaitTimeMs"
| "concurrency"
| "concurrencyWaitTimeMs";

/**
* The options for the stop method.
Expand Down Expand Up @@ -257,6 +270,11 @@ export interface Events {
* Fired when the Consumer has waited for polling to complete and is stopping due to a timeout.
*/
waiting_for_polling_to_complete_timeout_exceeded: [];
/**
* Fired when concurrency limit is hit and messages are waiting to be processed.
* Includes the current concurrency limit and number of messages waiting.
*/
concurrency_limit_reached: [{ limit: number; waiting: number }];
}

/**
Expand Down
27 changes: 27 additions & 0 deletions src/validation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,23 @@ function validateOption(
throw new Error("pollingWaitTimeMs must be greater than 0.");
}
break;
case "concurrency":
if (!Number.isInteger(value) || value < 1) {
throw new Error("concurrency must be a positive integer.");
}
if (allOptions.batchSize && value < allOptions.batchSize) {
throw new Error(
"concurrency must be greater than or equal to batchSize.",
);
}
break;
case "concurrencyWaitTimeMs":
if (!Number.isInteger(value) || value < 0) {
throw new Error(
"concurrencyWaitTimeMs must be a non-negative integer.",
);
}
break;
default:
if (strict) {
throw new Error(`The update ${option} cannot be updated`);
Expand Down Expand Up @@ -78,6 +95,16 @@ function assertOptions(options: ConsumerOptions): void {
if (options.heartbeatInterval) {
validateOption("heartbeatInterval", options.heartbeatInterval, options);
}
if (options.concurrency !== undefined) {
validateOption("concurrency", options.concurrency, options);
}
if (options.concurrencyWaitTimeMs !== undefined) {
validateOption(
"concurrencyWaitTimeMs",
options.concurrencyWaitTimeMs,
options,
);
}
}

/**
Expand Down
Loading