Skip to content

Commit a7894ce

Browse files
committed
Bump version to v1.2.0: Adding a new method waitForAvailability
1 parent 15ce3fc commit a7894ce

10 files changed

+177
-8
lines changed

README.md

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,8 +145,8 @@ async function handleDataAggregation(sensorUID): Promise<void> {
145145
}
146146
```
147147

148-
Please note that in a real-world scenario, sensor UIDs are more likely to be consumed from a message queue (e.g., RabbitMQ, Kafka, AWS SNS) rather than from an in-memory array. This setup **highlights the benefits** of avoiding backpressure:
149-
We should avoid consuming a message if we cannot start processing it immediately. Working with message queues typically involves acknowledgements, which have timeout mechanisms. Therefore, immediate processing is crucial to ensure efficient and reliable handling of messages.
148+
Please note that in a real-world scenario, sensor UIDs may be consumed from a message queue (e.g., RabbitMQ, Kafka, AWS SNS) rather than from an in-memory array. This setup **highlights the benefits** of avoiding backpressure:
149+
We should avoid consuming a message if we cannot start processing it immediately. Working with message queues typically involves acknowledgements, which have **timeout** mechanisms. Therefore, immediate processing is crucial to ensure efficient and reliable handling of messages. The `waitForAvailability` method addresses this need by checking availability as a preliminary action before consuming a message.
150150
Refer to the following adaptation of the previous example, where sensor UIDs are consumed from a message queue. This example overlooks error handling and message validation, for simplicity.
151151

152152
```ts
@@ -165,6 +165,7 @@ async function processConsumedMessages(): Promise<void> {
165165
let numberOfProcessedMessages = 0;
166166

167167
do {
168+
await sensorAggregationSemaphore.waitForAvailability();
168169
const message = await mqClient.receiveOneMessage();
169170
if (!message) {
170171
// Consider the queue as empty.
@@ -173,6 +174,9 @@ async function processConsumedMessages(): Promise<void> {
173174

174175
++numberOfProcessedMessages;
175176
const { uid } = message.data;
177+
178+
// At this point, `startExecution` will begin immediately, due to the
179+
// preliminary `waitForAvailability` action.
176180
await sensorAggregationSemaphore.startExecution(
177181
(): Promise<void> => handleDataAggregation(uid);
178182
);
@@ -200,6 +204,9 @@ async function processConsumedMessages(): Promise<void> {
200204
}
201205
```
202206

207+
In reference to the above example, please note that `waitForAvailability` may be considered overkill or redundant if the job's duration is significantly shorter than the message timeout.
208+
For example, if the message queue's timeout for acknowledging a message is 1 minute and a typical job duration is 1 second, the 59 second gap provides a substantial safety margin. In such cases, the preliminary `waitForAvailability` action can be omitted.
209+
203210
## 2nd use-case: Single Job Execution
204211

205212
The `waitForCompletion` method is useful for executing a sub-procedure, for which the caller must wait before proceeding with its work.

dist/zero-backpressure-semaphore.d.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,26 @@ export declare class ZeroBackpressureSemaphore<T, UncaughtErrorType = Error> {
126126
* @returns A promise that resolves when all currently executing jobs are settled.
127127
*/
128128
waitTillAllExecutingJobsAreSettled(): Promise<void>;
129+
/**
130+
* waitForAvailability
131+
*
132+
* This method resolves once at least one room (slot) is available for job execution.
133+
* In other words, it resolves when the semaphore is available to trigger a new job immediately.
134+
*
135+
* ### Example Use Case
136+
* Consider a scenario where we read messages from a message queue (e.g., RabbitMQ, Kafka).
137+
* Each message contains job-specific metadata, meaning for each message, we want to create a
138+
* corresponding semaphore job. We aim to start processing a message immediately once it is
139+
* consumed, as message queues typically involve *acknowledgements*, which have *timeout*
140+
* mechanisms. Therefore, immediate processing is crucial to ensure efficient and reliable
141+
* handling of messages. Backpressure on the semaphore may cause messages to wait too long
142+
* before their corresponding job starts, increasing the chances of their timeout being exceeded.
143+
* To prevent such potential backpressure, users can utilize the `waitForAvailability` method
144+
* before consuming the next message.
145+
*
146+
* @returns A promise that resolves once at least one room is available.
147+
*/
148+
waitForAvailability(): Promise<void>;
129149
/**
130150
* extractUncaughtErrors
131151
*

dist/zero-backpressure-semaphore.js

Lines changed: 26 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/zero-backpressure-semaphore.js.map

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

dist/zero-backpressure-semaphore.test.js

Lines changed: 40 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)