Skip to content

Commit b554da6

Browse files
feat!: removing handler_processing debugger in favour of status (#470)
1 parent 9f07383 commit b554da6

File tree

3 files changed

+53
-48
lines changed

3 files changed

+53
-48
lines changed

src/consumer.ts

+2-10
Original file line numberDiff line numberDiff line change
@@ -164,12 +164,12 @@ export class Consumer extends TypedEventEmitter {
164164
Date.now() - this.stopRequestedAtTimestamp >
165165
this.pollingCompleteWaitTimeMs;
166166
if (exceededTimeout) {
167-
logger.debug('waiting_for_polling_to_complete_timeout_exceeded');
167+
this.emit('waiting_for_polling_to_complete_timeout_exceeded');
168168
this.emit('stopped');
169169
return;
170170
}
171171

172-
logger.debug('waiting_for_polling_to_complete');
172+
this.emit('waiting_for_polling_to_complete');
173173
setTimeout(this.waitForPollingToComplete, 1000);
174174
}
175175

@@ -304,20 +304,12 @@ export class Consumer extends TypedEventEmitter {
304304
response: ReceiveMessageCommandOutput
305305
): Promise<void> {
306306
if (hasMessages(response)) {
307-
const handlerProcessingDebugger = setInterval(() => {
308-
logger.debug('handler_processing', {
309-
detail: 'The handler is still processing the message(s)...'
310-
});
311-
}, 1000);
312-
313307
if (this.handleMessageBatch) {
314308
await this.processMessageBatch(response.Messages);
315309
} else {
316310
await Promise.all(response.Messages.map(this.processMessage));
317311
}
318312

319-
clearInterval(handlerProcessingDebugger);
320-
321313
this.emit('response_processed');
322314
} else if (response) {
323315
this.emit('empty');

src/types.ts

+8
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,14 @@ export interface Events {
197197
* Fired when an option is updated
198198
*/
199199
option_updated: [UpdatableOptions, ConsumerOptions[UpdatableOptions]];
200+
/**
201+
* Fired when the Consumer is waiting for polling to complete before stopping.
202+
*/
203+
waiting_for_polling_to_complete: [];
204+
/**
205+
* Fired when the Consumer has waited for polling to complete and is stopping due to a timeout.
206+
*/
207+
waiting_for_polling_to_complete_timeout_exceeded: [];
200208
}
201209

202210
export type AWSError = {

test/tests/consumer.test.ts

+43-38
Original file line numberDiff line numberDiff line change
@@ -1221,7 +1221,7 @@ describe('Consumer', () => {
12211221
VisibilityTimeout: 40
12221222
})
12231223
);
1224-
sandbox.assert.calledTwice(clearIntervalSpy);
1224+
sandbox.assert.calledOnce(clearIntervalSpy);
12251225
});
12261226

12271227
it('passes in the correct visibility timeout for long running batch handler functions', async () => {
@@ -1305,7 +1305,7 @@ describe('Consumer', () => {
13051305
]
13061306
})
13071307
);
1308-
sandbox.assert.calledTwice(clearIntervalSpy);
1308+
sandbox.assert.calledOnce(clearIntervalSpy);
13091309
});
13101310

13111311
it('emit error when changing visibility timeout fails', async () => {
@@ -1483,13 +1483,22 @@ describe('Consumer', () => {
14831483
});
14841484

14851485
it('waits for in-flight messages before emitting stopped (within timeout)', async () => {
1486+
sqs.send.withArgs(mockReceiveMessage).resolves({
1487+
Messages: [
1488+
{ MessageId: '1', ReceiptHandle: 'receipt-handle-1', Body: 'body-1' }
1489+
]
1490+
});
14861491
const handleStop = sandbox.stub().returns(null);
14871492
const handleResponseProcessed = sandbox.stub().returns(null);
1493+
const waitingForPollingComplete = sandbox.stub().returns(null);
1494+
const waitingForPollingCompleteTimeoutExceeded = sandbox
1495+
.stub()
1496+
.returns(null);
14881497

14891498
// A slow message handler
1490-
handleMessage = sandbox.stub().callsFake(async () => {
1491-
await new Promise((resolve) => setTimeout(resolve, 5000));
1492-
});
1499+
handleMessage = sandbox
1500+
.stub()
1501+
.resolves(new Promise((resolve) => setTimeout(resolve, 5000)));
14931502

14941503
consumer = new Consumer({
14951504
queueUrl: QUEUE_URL,
@@ -1502,16 +1511,24 @@ describe('Consumer', () => {
15021511

15031512
consumer.on('stopped', handleStop);
15041513
consumer.on('response_processed', handleResponseProcessed);
1514+
consumer.on('waiting_for_polling_to_complete', waitingForPollingComplete);
1515+
consumer.on(
1516+
'waiting_for_polling_to_complete_timeout_exceeded',
1517+
waitingForPollingCompleteTimeoutExceeded
1518+
);
15051519

15061520
consumer.start();
1507-
await clock.nextAsync();
1521+
await Promise.all([clock.tickAsync(1)]);
15081522
consumer.stop();
15091523

15101524
await clock.runAllAsync();
15111525

15121526
sandbox.assert.calledOnce(handleStop);
15131527
sandbox.assert.calledOnce(handleResponseProcessed);
15141528
sandbox.assert.calledOnce(handleMessage);
1529+
assert(waitingForPollingComplete.callCount === 5);
1530+
assert(waitingForPollingCompleteTimeoutExceeded.callCount === 0);
1531+
15151532
assert.ok(handleMessage.calledBefore(handleStop));
15161533

15171534
// handleResponseProcessed is called after handleMessage, indicating
@@ -1520,13 +1537,22 @@ describe('Consumer', () => {
15201537
});
15211538

15221539
it('waits for in-flight messages before emitting stopped (timeout reached)', async () => {
1540+
sqs.send.withArgs(mockReceiveMessage).resolves({
1541+
Messages: [
1542+
{ MessageId: '1', ReceiptHandle: 'receipt-handle-1', Body: 'body-1' }
1543+
]
1544+
});
15231545
const handleStop = sandbox.stub().returns(null);
15241546
const handleResponseProcessed = sandbox.stub().returns(null);
1547+
const waitingForPollingComplete = sandbox.stub().returns(null);
1548+
const waitingForPollingCompleteTimeoutExceeded = sandbox
1549+
.stub()
1550+
.returns(null);
15251551

15261552
// A slow message handler
1527-
handleMessage = sandbox.stub().callsFake(async () => {
1528-
await new Promise((resolve) => setTimeout(resolve, 5000));
1529-
});
1553+
handleMessage = sandbox
1554+
.stub()
1555+
.resolves(new Promise((resolve) => setTimeout(resolve, 5000)));
15301556

15311557
consumer = new Consumer({
15321558
queueUrl: QUEUE_URL,
@@ -1539,16 +1565,23 @@ describe('Consumer', () => {
15391565

15401566
consumer.on('stopped', handleStop);
15411567
consumer.on('response_processed', handleResponseProcessed);
1568+
consumer.on('waiting_for_polling_to_complete', waitingForPollingComplete);
1569+
consumer.on(
1570+
'waiting_for_polling_to_complete_timeout_exceeded',
1571+
waitingForPollingCompleteTimeoutExceeded
1572+
);
15421573

15431574
consumer.start();
1544-
await clock.nextAsync();
1575+
await Promise.all([clock.tickAsync(1)]);
15451576
consumer.stop();
15461577

15471578
await clock.runAllAsync();
15481579

15491580
sandbox.assert.calledOnce(handleStop);
15501581
sandbox.assert.calledOnce(handleResponseProcessed);
15511582
sandbox.assert.calledOnce(handleMessage);
1583+
sandbox.assert.calledOnce(waitingForPollingComplete);
1584+
sandbox.assert.calledOnce(waitingForPollingCompleteTimeoutExceeded);
15521585
assert(handleMessage.calledBefore(handleStop));
15531586

15541587
// Stop was called before the message could be processed, because we reached timeout.
@@ -1768,33 +1801,5 @@ describe('Consumer', () => {
17681801
sandbox.assert.calledWithMatch(loggerDebug, 'stopping');
17691802
sandbox.assert.calledWithMatch(loggerDebug, 'stopped');
17701803
});
1771-
1772-
it('logs a debug event while the handler is processing, for every second', async () => {
1773-
const loggerDebug = sandbox.stub(logger, 'debug');
1774-
const clearIntervalSpy = sinon.spy(global, 'clearInterval');
1775-
1776-
sqs.send.withArgs(mockReceiveMessage).resolves({
1777-
Messages: [
1778-
{ MessageId: '1', ReceiptHandle: 'receipt-handle-1', Body: 'body-1' }
1779-
]
1780-
});
1781-
consumer = new Consumer({
1782-
queueUrl: QUEUE_URL,
1783-
region: REGION,
1784-
handleMessage: () =>
1785-
new Promise((resolve) => setTimeout(resolve, 4000)),
1786-
sqs
1787-
});
1788-
1789-
consumer.start();
1790-
await Promise.all([clock.tickAsync(5000)]);
1791-
sandbox.assert.calledOnce(clearIntervalSpy);
1792-
consumer.stop();
1793-
1794-
sandbox.assert.callCount(loggerDebug, 15);
1795-
sandbox.assert.calledWith(loggerDebug, 'handler_processing', {
1796-
detail: 'The handler is still processing the message(s)...'
1797-
});
1798-
});
17991804
});
18001805
});

0 commit comments

Comments
 (0)