Skip to content

Commit 0fa5305

Browse files
authored
feat: Wait for msgs to be processed before emitting stopped (#454)
* feat: Wait for msgs to be processed before emitting stopped * chore: Update README with waitForInFlightMessages info * fix: Await final poll for graceful shutdown * refactor: Rename waitForInFlightMessagesMs to pollingCompleteWaitTimeMs and move to constructor * chore: Rename waitForInFlightMessagesToComplete to waitForPollingToComplete * refactor: Use setTimeout instead of Promise/loop in waitForPollingToComplete * chore: Remove outdated info from README * chore: Rename beganWaitingForStop to stopRequestedAtTimestamp
1 parent 6b95edb commit 0fa5305

File tree

6 files changed

+192
-1
lines changed

6 files changed

+192
-1
lines changed

src/consumer.ts

+34-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,10 @@ export class Consumer extends TypedEventEmitter {
5454
private waitTimeSeconds: number;
5555
private authenticationErrorTimeout: number;
5656
private pollingWaitTimeMs: number;
57+
private pollingCompleteWaitTimeMs: number;
5758
private heartbeatInterval: number;
59+
private isPolling: boolean;
60+
private stopRequestedAtTimestamp: number;
5861
public abortController: AbortController;
5962

6063
constructor(options: ConsumerOptions) {
@@ -77,6 +80,7 @@ export class Consumer extends TypedEventEmitter {
7780
this.authenticationErrorTimeout =
7881
options.authenticationErrorTimeout ?? 10000;
7982
this.pollingWaitTimeMs = options.pollingWaitTimeMs ?? 0;
83+
this.pollingCompleteWaitTimeMs = options.pollingCompleteWaitTimeMs ?? 0;
8084
this.shouldDeleteMessages = options.shouldDeleteMessages ?? true;
8185
this.alwaysAcknowledge = options.alwaysAcknowledge ?? false;
8286
this.sqs =
@@ -142,7 +146,31 @@ export class Consumer extends TypedEventEmitter {
142146
this.emit('aborted');
143147
}
144148

145-
this.emit('stopped');
149+
this.stopRequestedAtTimestamp = Date.now();
150+
this.waitForPollingToComplete();
151+
}
152+
153+
/**
154+
* Wait for final poll and in flight messages to complete.
155+
* @private
156+
*/
157+
private waitForPollingToComplete(): void {
158+
if (!this.isPolling || !(this.pollingCompleteWaitTimeMs > 0)) {
159+
this.emit('stopped');
160+
return;
161+
}
162+
163+
const exceededTimeout =
164+
Date.now() - this.stopRequestedAtTimestamp >
165+
this.pollingCompleteWaitTimeMs;
166+
if (exceededTimeout) {
167+
logger.debug('waiting_for_polling_to_complete_timeout_exceeded');
168+
this.emit('stopped');
169+
return;
170+
}
171+
172+
logger.debug('waiting_for_polling_to_complete');
173+
setTimeout(this.waitForPollingToComplete, 1000);
146174
}
147175

148176
/**
@@ -198,6 +226,8 @@ export class Consumer extends TypedEventEmitter {
198226

199227
logger.debug('polling');
200228

229+
this.isPolling = true;
230+
201231
let currentPollingTimeout = this.pollingWaitTimeMs;
202232
this.receiveMessage({
203233
QueueUrl: this.queueUrl,
@@ -227,6 +257,9 @@ export class Consumer extends TypedEventEmitter {
227257
})
228258
.catch((err) => {
229259
this.emitError(err);
260+
})
261+
.finally(() => {
262+
this.isPolling = false;
230263
});
231264
}
232265

src/types.ts

+6
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ export interface ConsumerOptions {
4646
* @defaultvalue `0`
4747
*/
4848
pollingWaitTimeMs?: number;
49+
/**
50+
* If you want the stop action to wait for the final poll to complete and in-flight messages
51+
* to be processed before emitting 'stopped' set this to the max amount of time to wait.
52+
* @defaultvalue `0`
53+
*/
54+
pollingCompleteWaitTimeMs?: number;
4955
/**
5056
* If true, sets the message visibility timeout to 0 after a `processing_error`.
5157
* @defaultvalue `false`
+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
Feature: Graceful shutdown
2+
3+
Scenario: Several messages in flight
4+
Given Several messages are sent to the SQS queue
5+
Then the application is stopped while messages are in flight
6+
Then the in-flight messages should be processed before stopped is emitted
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
const { Given, Then, After } = require('@cucumber/cucumber');
2+
const assert = require('assert');
3+
const { PurgeQueueCommand } = require('@aws-sdk/client-sqs');
4+
const pEvent = require('p-event');
5+
6+
const { consumer } = require('../utils/consumer/gracefulShutdown');
7+
const { producer } = require('../utils/producer');
8+
const { sqs, QUEUE_URL } = require('../utils/sqs');
9+
10+
Given('Several messages are sent to the SQS queue', async () => {
11+
const params = {
12+
QueueUrl: QUEUE_URL
13+
};
14+
const command = new PurgeQueueCommand(params);
15+
const response = await sqs.send(command);
16+
17+
assert.strictEqual(response['$metadata'].httpStatusCode, 200);
18+
19+
const size = await producer.queueSize();
20+
assert.strictEqual(size, 0);
21+
22+
await producer.send(['msg1', 'msg2', 'msg3']);
23+
24+
const size2 = await producer.queueSize();
25+
26+
assert.strictEqual(size2, 3);
27+
});
28+
29+
Then('the application is stopped while messages are in flight', async () => {
30+
consumer.start();
31+
32+
consumer.stop();
33+
34+
assert.strictEqual(consumer.isRunning, false);
35+
});
36+
37+
Then(
38+
'the in-flight messages should be processed before stopped is emitted',
39+
async () => {
40+
let numProcessed = 0;
41+
consumer.on('message_processed', () => {
42+
numProcessed++;
43+
});
44+
45+
await pEvent(consumer, 'stopped');
46+
47+
assert.strictEqual(numProcessed, 3);
48+
49+
const size = await producer.queueSize();
50+
assert.strictEqual(size, 0);
51+
}
52+
);
53+
54+
After(() => {
55+
return consumer.stop();
56+
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
const { Consumer } = require('../../../../dist/consumer');
2+
3+
const { QUEUE_URL, sqs } = require('../sqs');
4+
5+
const consumer = Consumer.create({
6+
queueUrl: QUEUE_URL,
7+
sqs,
8+
pollingWaitTimeMs: 1000,
9+
pollingCompleteWaitTimeMs: 5000,
10+
batchSize: 10,
11+
handleMessage: async (message) => {
12+
await new Promise((resolve) => setTimeout(resolve, 1500));
13+
return message;
14+
}
15+
});
16+
17+
exports.consumer = consumer;

test/tests/consumer.test.ts

+73
Original file line numberDiff line numberDiff line change
@@ -1481,6 +1481,79 @@ describe('Consumer', () => {
14811481
sandbox.assert.calledOnce(handleAbort);
14821482
sandbox.assert.calledOnce(handleStop);
14831483
});
1484+
1485+
it('waits for in-flight messages before emitting stopped (within timeout)', async () => {
1486+
const handleStop = sandbox.stub().returns(null);
1487+
const handleResponseProcessed = sandbox.stub().returns(null);
1488+
1489+
// A slow message handler
1490+
handleMessage = sandbox.stub().callsFake(async () => {
1491+
await new Promise((resolve) => setTimeout(resolve, 5000));
1492+
});
1493+
1494+
consumer = new Consumer({
1495+
queueUrl: QUEUE_URL,
1496+
region: REGION,
1497+
handleMessage,
1498+
sqs,
1499+
pollingCompleteWaitTimeMs: 5000,
1500+
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT
1501+
});
1502+
1503+
consumer.on('stopped', handleStop);
1504+
consumer.on('response_processed', handleResponseProcessed);
1505+
1506+
consumer.start();
1507+
await clock.nextAsync();
1508+
consumer.stop();
1509+
1510+
await clock.runAllAsync();
1511+
1512+
sandbox.assert.calledOnce(handleStop);
1513+
sandbox.assert.calledOnce(handleResponseProcessed);
1514+
sandbox.assert.calledOnce(handleMessage);
1515+
assert.ok(handleMessage.calledBefore(handleStop));
1516+
1517+
// handleResponseProcessed is called after handleMessage, indicating
1518+
// messages were allowed to complete before 'stopped' was emitted
1519+
assert.ok(handleResponseProcessed.calledBefore(handleStop));
1520+
});
1521+
1522+
it('waits for in-flight messages before emitting stopped (timeout reached)', async () => {
1523+
const handleStop = sandbox.stub().returns(null);
1524+
const handleResponseProcessed = sandbox.stub().returns(null);
1525+
1526+
// A slow message handler
1527+
handleMessage = sandbox.stub().callsFake(async () => {
1528+
await new Promise((resolve) => setTimeout(resolve, 5000));
1529+
});
1530+
1531+
consumer = new Consumer({
1532+
queueUrl: QUEUE_URL,
1533+
region: REGION,
1534+
handleMessage,
1535+
sqs,
1536+
pollingCompleteWaitTimeMs: 500,
1537+
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT
1538+
});
1539+
1540+
consumer.on('stopped', handleStop);
1541+
consumer.on('response_processed', handleResponseProcessed);
1542+
1543+
consumer.start();
1544+
await clock.nextAsync();
1545+
consumer.stop();
1546+
1547+
await clock.runAllAsync();
1548+
1549+
sandbox.assert.calledOnce(handleStop);
1550+
sandbox.assert.calledOnce(handleResponseProcessed);
1551+
sandbox.assert.calledOnce(handleMessage);
1552+
assert(handleMessage.calledBefore(handleStop));
1553+
1554+
// Stop was called before the message could be processed, because we reached timeout.
1555+
assert(handleStop.calledBefore(handleResponseProcessed));
1556+
});
14841557
});
14851558

14861559
describe('isRunning', async () => {

0 commit comments

Comments
 (0)