Skip to content

Commit 7bc55fc

Browse files
feat: append more context to errors from the consumer (#559)
1 parent 047e426 commit 7bc55fc

File tree

3 files changed

+208
-2
lines changed

3 files changed

+208
-2
lines changed

src/consumer.ts

+12
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,7 @@ export class Consumer extends TypedEventEmitter {
315315
err,
316316
`SQS receive message failed: ${err.message}`,
317317
this.extendedAWSErrors,
318+
this.queueUrl,
318319
);
319320
}
320321
}
@@ -479,6 +480,8 @@ export class Consumer extends TypedEventEmitter {
479480
err,
480481
`Error changing visibility timeout: ${err.message}`,
481482
this.extendedAWSErrors,
483+
this.queueUrl,
484+
message,
482485
),
483486
message,
484487
);
@@ -514,6 +517,8 @@ export class Consumer extends TypedEventEmitter {
514517
err,
515518
`Error changing visibility timeout: ${err.message}`,
516519
this.extendedAWSErrors,
520+
this.queueUrl,
521+
messages,
517522
),
518523
messages,
519524
);
@@ -549,12 +554,14 @@ export class Consumer extends TypedEventEmitter {
549554
throw toTimeoutError(
550555
err,
551556
`Message handler timed out after ${this.handleMessageTimeout}ms: Operation timed out.`,
557+
message,
552558
);
553559
}
554560
if (err instanceof Error) {
555561
throw toStandardError(
556562
err,
557563
`Unexpected message handler failure: ${err.message}`,
564+
message,
558565
);
559566
}
560567
throw err;
@@ -581,6 +588,7 @@ export class Consumer extends TypedEventEmitter {
581588
throw toStandardError(
582589
err,
583590
`Unexpected message handler failure: ${err.message}`,
591+
messages,
584592
);
585593
}
586594
throw err;
@@ -616,6 +624,8 @@ export class Consumer extends TypedEventEmitter {
616624
err,
617625
`SQS delete message failed: ${err.message}`,
618626
this.extendedAWSErrors,
627+
this.queueUrl,
628+
message,
619629
);
620630
}
621631
}
@@ -654,6 +664,8 @@ export class Consumer extends TypedEventEmitter {
654664
err,
655665
`SQS delete message failed: ${err.message}`,
656666
this.extendedAWSErrors,
667+
this.queueUrl,
668+
messages,
657669
);
658670
}
659671
}

src/errors.ts

+43-2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { Message } from "@aws-sdk/client-sqs";
2+
13
import { AWSError } from "./types.js";
24

35
class SQSError extends Error {
@@ -9,6 +11,8 @@ class SQSError extends Error {
911
fault: AWSError["$fault"];
1012
response?: AWSError["$response"];
1113
metadata?: AWSError["$metadata"];
14+
queueUrl?: string;
15+
messageIds?: string[];
1216

1317
constructor(message: string) {
1418
super(message);
@@ -17,24 +21,28 @@ class SQSError extends Error {
1721
}
1822

1923
class TimeoutError extends Error {
24+
messageIds: string[];
2025
cause: Error;
2126
time: Date;
2227

2328
constructor(message = "Operation timed out.") {
2429
super(message);
2530
this.message = message;
2631
this.name = "TimeoutError";
32+
this.messageIds = [];
2733
}
2834
}
2935

3036
class StandardError extends Error {
37+
messageIds: string[];
3138
cause: Error;
3239
time: Date;
3340

3441
constructor(message = "An unexpected error occurred:") {
3542
super(message);
3643
this.message = message;
3744
this.name = "StandardError";
45+
this.messageIds = [];
3846
}
3947
}
4048

@@ -64,6 +72,17 @@ function isConnectionError(err: Error): boolean {
6472
return false;
6573
}
6674

75+
/**
76+
* Gets the message IDs from the message.
77+
* @param message The message that was received from SQS.
78+
*/
79+
function getMessageIds(message: Message | Message[]): string[] {
80+
if (Array.isArray(message)) {
81+
return message.map((m) => m.MessageId);
82+
}
83+
return [message.MessageId];
84+
}
85+
6786
/**
6887
* Formats an AWSError the the SQSError type.
6988
* @param err The error object that was received.
@@ -73,6 +92,8 @@ function toSQSError(
7392
err: AWSError,
7493
message: string,
7594
extendedAWSErrors: boolean,
95+
queueUrl?: string,
96+
sqsMessage?: Message | Message[],
7697
): SQSError {
7798
const sqsError = new SQSError(message);
7899
sqsError.code = err.name;
@@ -87,18 +108,32 @@ function toSQSError(
87108
sqsError.metadata = err.$metadata;
88109
}
89110

111+
if (queueUrl) {
112+
sqsError.queueUrl = queueUrl;
113+
}
114+
115+
if (sqsMessage) {
116+
sqsError.messageIds = getMessageIds(sqsMessage);
117+
}
118+
90119
return sqsError;
91120
}
92121

93122
/**
94123
* Formats an Error to the StandardError type.
95124
* @param err The error object that was received.
96125
* @param message The message to send with the error.
126+
* @param sqsMessage The message that was received from SQS.
97127
*/
98-
function toStandardError(err: Error, message: string): StandardError {
128+
function toStandardError(
129+
err: Error,
130+
message: string,
131+
sqsMessage: Message | Message[],
132+
): StandardError {
99133
const error = new StandardError(message);
100134
error.cause = err;
101135
error.time = new Date();
136+
error.messageIds = getMessageIds(sqsMessage);
102137

103138
return error;
104139
}
@@ -107,11 +142,17 @@ function toStandardError(err: Error, message: string): StandardError {
107142
* Formats an Error to the TimeoutError type.
108143
* @param err The error object that was received.
109144
* @param message The message to send with the error.
145+
* @param sqsMessage The message that was received from SQS.
110146
*/
111-
function toTimeoutError(err: TimeoutError, message: string): TimeoutError {
147+
function toTimeoutError(
148+
err: TimeoutError,
149+
message: string,
150+
sqsMessage: Message | Message[],
151+
): TimeoutError {
112152
const error = new TimeoutError(message);
113153
error.cause = err;
114154
error.time = new Date();
155+
error.messageIds = getMessageIds(sqsMessage);
115156

116157
return error;
117158
}

test/tests/consumer.test.ts

+153
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,7 @@ describe("Consumer", () => {
466466
"Unexpected message handler failure: Processing error",
467467
);
468468
assert.equal(message.MessageId, "123");
469+
assert.deepEqual((err as any).messageIds, ["123"]);
469470
});
470471

471472
it("fires an `error` event when an `SQSError` occurs processing a message", async () => {
@@ -1674,6 +1675,8 @@ describe("Consumer", () => {
16741675

16751676
assert.ok(err);
16761677
assert.equal(err.message, "Error changing visibility timeout: failed");
1678+
assert.equal(err.queueUrl, QUEUE_URL);
1679+
assert.deepEqual(err.messageIds, ["1"]);
16771680
});
16781681

16791682
it("emit error when changing visibility timeout fails for batch handler functions", async () => {
@@ -1706,6 +1709,156 @@ describe("Consumer", () => {
17061709

17071710
assert.ok(err);
17081711
assert.equal(err.message, "Error changing visibility timeout: failed");
1712+
assert.equal(err.queueUrl, QUEUE_URL);
1713+
assert.deepEqual(err.messageIds, ["1", "2"]);
1714+
});
1715+
1716+
it("includes messageIds in timeout errors", async () => {
1717+
const handleMessageTimeout = 500;
1718+
consumer = new Consumer({
1719+
queueUrl: QUEUE_URL,
1720+
region: REGION,
1721+
handleMessage: () =>
1722+
new Promise((resolve) => setTimeout(resolve, 1000)),
1723+
handleMessageTimeout,
1724+
sqs,
1725+
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT,
1726+
});
1727+
1728+
consumer.start();
1729+
const [err]: any = await Promise.all([
1730+
pEvent(consumer, "timeout_error"),
1731+
clock.tickAsync(handleMessageTimeout),
1732+
]);
1733+
consumer.stop();
1734+
1735+
assert.ok(err);
1736+
assert.equal(
1737+
err.message,
1738+
`Message handler timed out after ${handleMessageTimeout}ms: Operation timed out.`,
1739+
);
1740+
assert.deepEqual(err.messageIds, ["123"]);
1741+
});
1742+
1743+
it("includes messageIds in batch processing errors", async () => {
1744+
sqs.send.withArgs(mockReceiveMessage).resolves({
1745+
Messages: [
1746+
{ MessageId: "1", ReceiptHandle: "receipt-handle-1", Body: "body-1" },
1747+
{ MessageId: "2", ReceiptHandle: "receipt-handle-2", Body: "body-2" },
1748+
],
1749+
});
1750+
1751+
consumer = new Consumer({
1752+
queueUrl: QUEUE_URL,
1753+
region: REGION,
1754+
handleMessageBatch: () => {
1755+
throw new Error("Batch processing error");
1756+
},
1757+
batchSize: 2,
1758+
sqs,
1759+
authenticationErrorTimeout: AUTHENTICATION_ERROR_TIMEOUT,
1760+
});
1761+
1762+
consumer.start();
1763+
const [err]: any = await Promise.all([
1764+
pEvent(consumer, "error"),
1765+
clock.tickAsync(100),
1766+
]);
1767+
consumer.stop();
1768+
1769+
assert.ok(err);
1770+
assert.equal(
1771+
err.message,
1772+
"Unexpected message handler failure: Batch processing error",
1773+
);
1774+
assert.deepEqual(err.messageIds, ["1", "2"]);
1775+
});
1776+
1777+
it("includes queueUrl and messageIds in SQS errors when deleting message", async () => {
1778+
const deleteErr = new Error("Delete error");
1779+
deleteErr.name = "SQSError";
1780+
1781+
handleMessage.resolves(null);
1782+
sqs.send.withArgs(mockDeleteMessage).rejects(deleteErr);
1783+
1784+
consumer.start();
1785+
const [err]: any = await Promise.all([
1786+
pEvent(consumer, "error"),
1787+
clock.tickAsync(100),
1788+
]);
1789+
consumer.stop();
1790+
1791+
assert.ok(err);
1792+
assert.equal(err.message, "SQS delete message failed: Delete error");
1793+
assert.equal(err.queueUrl, QUEUE_URL);
1794+
assert.deepEqual(err.messageIds, ["123"]);
1795+
});
1796+
1797+
it("includes queueUrl and messageIds in SQS errors when changing visibility timeout", async () => {
1798+
sqs.send.withArgs(mockReceiveMessage).resolves({
1799+
Messages: [
1800+
{ MessageId: "1", ReceiptHandle: "receipt-handle-1", Body: "body-1" },
1801+
],
1802+
});
1803+
consumer = new Consumer({
1804+
queueUrl: QUEUE_URL,
1805+
region: REGION,
1806+
handleMessage: () =>
1807+
new Promise((resolve) => setTimeout(resolve, 75000)),
1808+
sqs,
1809+
visibilityTimeout: 40,
1810+
heartbeatInterval: 30,
1811+
});
1812+
1813+
const receiveErr = new MockSQSError("failed");
1814+
sqs.send.withArgs(mockChangeMessageVisibility).rejects(receiveErr);
1815+
1816+
consumer.start();
1817+
const [err]: any = await Promise.all([
1818+
pEvent(consumer, "error"),
1819+
clock.tickAsync(75000),
1820+
]);
1821+
consumer.stop();
1822+
1823+
assert.ok(err);
1824+
assert.equal(err.message, "Error changing visibility timeout: failed");
1825+
assert.equal(err.queueUrl, QUEUE_URL);
1826+
assert.deepEqual(err.messageIds, ["1"]);
1827+
});
1828+
1829+
it("includes queueUrl and messageIds in batch SQS errors", async () => {
1830+
sqs.send.withArgs(mockReceiveMessage).resolves({
1831+
Messages: [
1832+
{ MessageId: "1", ReceiptHandle: "receipt-handle-1", Body: "body-1" },
1833+
{ MessageId: "2", ReceiptHandle: "receipt-handle-2", Body: "body-2" },
1834+
],
1835+
});
1836+
1837+
consumer = new Consumer({
1838+
queueUrl: QUEUE_URL,
1839+
region: REGION,
1840+
handleMessageBatch: () =>
1841+
new Promise((resolve) => setTimeout(resolve, 75000)),
1842+
sqs,
1843+
batchSize: 2,
1844+
visibilityTimeout: 40,
1845+
heartbeatInterval: 30,
1846+
});
1847+
1848+
const receiveErr = new MockSQSError("failed");
1849+
sqs.send.withArgs(mockChangeMessageVisibilityBatch).rejects(receiveErr);
1850+
1851+
consumer.start();
1852+
const [err]: any = await Promise.all([
1853+
pEvent(consumer, "error"),
1854+
clock.tickAsync(75000),
1855+
]);
1856+
consumer.stop();
1857+
1858+
assert.ok(err);
1859+
assert.equal(err.message, "Error changing visibility timeout: failed");
1860+
assert.equal(err.queueUrl, QUEUE_URL);
1861+
assert.deepEqual(err.messageIds, ["1", "2"]);
17091862
});
17101863
});
17111864

0 commit comments

Comments
 (0)