Skip to content

Commit a469d84

Browse files
committed
Refactor RabbitMQConsumer
1 parent 679a4a7 commit a469d84

File tree

75 files changed

+1122
-627
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

75 files changed

+1122
-627
lines changed

10-rabbitmq-message-broker/4-consume-domain-events/src/Contexts/Shared/infrastructure/EventBus/RabbitMQ/RabbitMQConsumer.ts

+6-5
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,21 @@ import { ConsumeMessage } from 'amqplib';
22
import { DomainEvent } from '../../../domain/DomainEvent';
33
import { DomainEventSubscriber } from '../../../domain/DomainEventSubscriber';
44
import { DomainEventDeserializer } from '../DomainEventDeserializer';
5+
import { RabbitMqConnection } from './RabbitMqConnection';
56

67

78
export class RabbitMQConsumer {
8-
constructor(private subscriber: DomainEventSubscriber<DomainEvent>, private deserializer: DomainEventDeserializer) { }
9+
constructor(private subscriber: DomainEventSubscriber<DomainEvent>, private deserializer: DomainEventDeserializer, private connection: RabbitMqConnection) { }
910

10-
async onMessage(params: { message: ConsumeMessage; ack: Function; noAck: Function; }) {
11-
const content = params.message.content.toString();
11+
async onMessage(message: ConsumeMessage) {
12+
const content = message.content.toString();
1213
const domainEvent = this.deserializer.deserialize(content);
1314

1415
try {
1516
await this.subscriber.on(domainEvent);
16-
params.ack();
17+
this.connection.ack(message);
1718
} catch (error) {
18-
params.noAck();
19+
this.connection.noAck(message);
1920
}
2021
}
2122
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import { DomainEvent } from '../../../domain/DomainEvent';
2+
import { DomainEventSubscriber } from '../../../domain/DomainEventSubscriber';
3+
import { DomainEventDeserializer } from '../DomainEventDeserializer';
4+
import { RabbitMqConnection } from './RabbitMqConnection';
5+
import { RabbitMQConsumer } from './RabbitMQConsumer';
6+
7+
export class RabbitMQConsumerFactory {
8+
constructor(private deserializer: DomainEventDeserializer, private connection: RabbitMqConnection) { }
9+
10+
build(subscriber: DomainEventSubscriber<DomainEvent>) {
11+
return new RabbitMQConsumer(subscriber, this.deserializer, this.connection);
12+
}
13+
}

10-rabbitmq-message-broker/4-consume-domain-events/src/Contexts/Shared/infrastructure/EventBus/RabbitMQ/RabbitMQEventBus.ts

+9-6
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import { DomainEvent } from '../../../domain/DomainEvent';
22
import { EventBus } from '../../../domain/EventBus';
3-
import { DomainEventSubscribers } from '../DomainEventSubscribers';
3+
import { DomainEventDeserializer } from '../DomainEventDeserializer';
4+
import { DomainEventFailoverPublisher } from '../DomainEventFailoverPublisher/DomainEventFailoverPublisher';
45
import { DomainEventJsonSerializer } from '../DomainEventJsonSerializer';
6+
import { DomainEventSubscribers } from '../DomainEventSubscribers';
57
import { RabbitMqConnection } from './RabbitMqConnection';
6-
import { DomainEventFailoverPublisher } from '../DomainEventFailoverPublisher/DomainEventFailoverPublisher';
8+
import { RabbitMQConsumerFactory } from './RabbitMQConsumerFactory';
79
import { RabbitMQqueueFormatter } from './RabbitMQqueueFormatter';
8-
import { RabbitMQConsumer } from './RabbitMQConsumer';
9-
import { DomainEventDeserializer } from '../DomainEventDeserializer';
1010

1111
export class RabbitMQEventBus implements EventBus {
1212
private failoverPublisher: DomainEventFailoverPublisher;
@@ -29,12 +29,13 @@ export class RabbitMQEventBus implements EventBus {
2929

3030
async addSubscribers(subscribers: DomainEventSubscribers): Promise<void> {
3131
const deserializer = DomainEventDeserializer.configure(subscribers);
32+
const consumerFactory = new RabbitMQConsumerFactory(deserializer, this.connection);
3233

3334
for (const subscriber of subscribers.items) {
3435
const queueName = this.queueNameFormatter.format(subscriber);
35-
const rabbitMQConsumer = new RabbitMQConsumer(subscriber, deserializer);
36+
const rabbitMQConsumer = consumerFactory.build(subscriber);
3637

37-
await this.connection.consume(queueName, rabbitMQConsumer);
38+
await this.connection.consume(queueName, rabbitMQConsumer.onMessage.bind(rabbitMQConsumer));
3839
}
3940
}
4041

@@ -66,3 +67,5 @@ export class RabbitMQEventBus implements EventBus {
6667
return Buffer.from(eventPrimitives);
6768
}
6869
}
70+
71+

10-rabbitmq-message-broker/4-consume-domain-events/src/Contexts/Shared/infrastructure/EventBus/RabbitMQ/RabbitMqConnection.ts

+14-21
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import amqplib, { ConsumeMessage } from 'amqplib';
22
import { ConnectionSettings } from './ConnectionSettings';
33
import { ExchangeSetting } from './ExchangeSetting';
4-
import { RabbitMQConsumer } from './RabbitMQConsumer';
4+
55
export class RabbitMqConnection {
66
protected connectionSettings: ConnectionSettings;
77

@@ -36,18 +36,6 @@ export class RabbitMqConnection {
3636
}
3737
}
3838

39-
async consume(queue: string, consumer: RabbitMQConsumer) {
40-
await this.channel!.consume(queue, (message: ConsumeMessage | null) => {
41-
if (!message) {
42-
return;
43-
}
44-
45-
const ack = this.getAck(message);
46-
const noAck = this.getNoAck(message);
47-
consumer.onMessage({ message, ack, noAck });
48-
});
49-
}
50-
5139
async deleteQueue(queue: string) {
5240
return await this.channel!.deleteQueue(queue);
5341
}
@@ -99,15 +87,20 @@ export class RabbitMqConnection {
9987
return await this.connection?.close();
10088
}
10189

102-
private getAck(message: ConsumeMessage) {
103-
return () => {
104-
this.channel!.ack(message);
105-
};
90+
async consume(queue: string, onMessage: (message: ConsumeMessage) => {}) {
91+
await this.channel!.consume(queue, (message: ConsumeMessage | null) => {
92+
if (!message) {
93+
return;
94+
}
95+
onMessage(message);
96+
});
97+
}
98+
99+
ack(message: ConsumeMessage) {
100+
this.channel!.ack(message);
106101
}
107102

108-
private getNoAck(message: ConsumeMessage) {
109-
return () => {
110-
this.channel!.nack(message);
111-
};
103+
noAck(message: ConsumeMessage) {
104+
this.channel!.nack(message);
112105
}
113106
}

10-rabbitmq-message-broker/5-handle-consume-errors/src/Contexts/Shared/infrastructure/EventBus/RabbitMQ/RabbitMQConsumer.ts

+30-5
Original file line numberDiff line numberDiff line change
@@ -2,41 +2,66 @@ import { ConsumeMessage } from 'amqplib';
22
import { DomainEvent } from '../../../domain/DomainEvent';
33
import { DomainEventSubscriber } from '../../../domain/DomainEventSubscriber';
44
import { DomainEventDeserializer } from '../DomainEventDeserializer';
5+
import { RabbitMqConnection } from './RabbitMqConnection';
6+
57
export class RabbitMQConsumer {
68
private subscriber: DomainEventSubscriber<DomainEvent>;
79
private deserializer: DomainEventDeserializer;
10+
private connection: RabbitMqConnection;
811
private maxRetries: Number;
12+
private queueName: string;
13+
private exchange: string;
914

1015
constructor(params: {
1116
subscriber: DomainEventSubscriber<DomainEvent>;
1217
deserializer: DomainEventDeserializer;
18+
connection: RabbitMqConnection
19+
queueName: string;
20+
exchange: string;
1321
maxRetries: Number;
1422
}) {
1523
this.subscriber = params.subscriber;
1624
this.deserializer = params.deserializer;
25+
this.connection = params.connection;
1726
this.maxRetries = params.maxRetries;
27+
this.queueName = params.queueName;
28+
this.exchange = params.exchange;
1829
}
1930

20-
async onMessage(params: { message: ConsumeMessage; ack: Function; retry: Function; deadLetter: Function }) {
21-
const { message, ack, retry, deadLetter } = params;
31+
async onMessage(message: ConsumeMessage) {
2232
const content = message.content.toString();
2333
const domainEvent = this.deserializer.deserialize(content);
2434

2535
try {
2636
await this.subscriber.on(domainEvent);
2737
} catch (error) {
28-
this.hasBeenRedeliveredTooMuch(message) ? deadLetter() : retry();
38+
await this.handleError(message);
2939
} finally {
30-
ack();
40+
this.connection.ack(message);
3141
}
3242
}
3343

44+
private async handleError(message: ConsumeMessage) {
45+
if (this.hasBeenRedeliveredTooMuch(message)) {
46+
await this.deadLetter(message)
47+
} else {
48+
await this.retry(message);
49+
}
50+
}
51+
52+
private async retry(message: ConsumeMessage) {
53+
await this.connection.retry(message, this.queueName, this.exchange)
54+
}
55+
56+
private async deadLetter(message: ConsumeMessage) {
57+
await this.connection.deadLetter(message, this.queueName, this.exchange)
58+
}
59+
3460
private hasBeenRedeliveredTooMuch(message: ConsumeMessage) {
3561
if (this.hasBeenRedelivered(message)) {
3662
const count = parseInt(message.properties.headers['redelivery_count']);
3763
return count >= this.maxRetries;
3864
}
39-
4065
return false;
4166
}
4267

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import { DomainEvent } from '../../../domain/DomainEvent';
2+
import { DomainEventSubscriber } from '../../../domain/DomainEventSubscriber';
3+
import { DomainEventDeserializer } from '../DomainEventDeserializer';
4+
import { RabbitMqConnection } from './RabbitMqConnection';
5+
import { RabbitMQConsumer } from './RabbitMQConsumer';
6+
7+
export class RabbitMQConsumerFactory {
8+
constructor(private deserializer: DomainEventDeserializer, private connection: RabbitMqConnection, private maxRetries: Number) { }
9+
10+
build(subscriber: DomainEventSubscriber<DomainEvent>, exchange: string, queueName: string,) {
11+
12+
return new RabbitMQConsumer({
13+
subscriber, deserializer: this.deserializer, connection: this.connection,
14+
queueName,
15+
exchange,
16+
maxRetries: this.maxRetries
17+
18+
});
19+
}
20+
}

10-rabbitmq-message-broker/5-handle-consume-errors/src/Contexts/Shared/infrastructure/EventBus/RabbitMQ/RabbitMQEventBus.ts

+7-8
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import { DomainEvent } from '../../../domain/DomainEvent';
22
import { EventBus } from '../../../domain/EventBus';
3-
import { DomainEventSubscribers } from '../DomainEventSubscribers';
3+
import { DomainEventDeserializer } from '../DomainEventDeserializer';
4+
import { DomainEventFailoverPublisher } from '../DomainEventFailoverPublisher/DomainEventFailoverPublisher';
45
import { DomainEventJsonSerializer } from '../DomainEventJsonSerializer';
6+
import { DomainEventSubscribers } from '../DomainEventSubscribers';
57
import { RabbitMqConnection } from './RabbitMqConnection';
6-
import { DomainEventFailoverPublisher } from '../DomainEventFailoverPublisher/DomainEventFailoverPublisher';
8+
import { RabbitMQConsumerFactory } from './RabbitMQConsumerFactory';
79
import { RabbitMQqueueFormatter } from './RabbitMQqueueFormatter';
8-
import { RabbitMQConsumer } from './RabbitMQConsumer';
9-
import { DomainEventDeserializer } from '../DomainEventDeserializer';
1010

1111
export class RabbitMQEventBus implements EventBus {
1212
private failoverPublisher: DomainEventFailoverPublisher;
@@ -32,14 +32,13 @@ export class RabbitMQEventBus implements EventBus {
3232

3333
async addSubscribers(subscribers: DomainEventSubscribers): Promise<void> {
3434
const deserializer = DomainEventDeserializer.configure(subscribers);
35-
this.failoverPublisher.setDeserializer(deserializer);
36-
const maxRetries = this.maxRetries;
35+
const consumerFactory = new RabbitMQConsumerFactory(deserializer, this.connection, this.maxRetries);
3736

3837
for (const subscriber of subscribers.items) {
3938
const queueName = this.queueNameFormatter.format(subscriber);
40-
const rabbitMQConsumer = new RabbitMQConsumer({ subscriber, deserializer, maxRetries });
39+
const rabbitMQConsumer = consumerFactory.build(subscriber, this.exchange, queueName);
4140

42-
await this.connection.consume(this.exchange, queueName, rabbitMQConsumer);
41+
await this.connection.consume(queueName, rabbitMQConsumer.onMessage.bind(rabbitMQConsumer));
4342
}
4443
}
4544

10-rabbitmq-message-broker/5-handle-consume-errors/src/Contexts/Shared/infrastructure/EventBus/RabbitMQ/RabbitMqConnection.ts

+20-28
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import amqplib, { ConsumeMessage } from 'amqplib';
22
import { ConnectionSettings } from './ConnectionSettings';
3-
import { RabbitMQConsumer } from './RabbitMQConsumer';
43
import { RabbitMQExchangeNameFormatter } from './RabbitMQExchangeNameFormatter';
4+
55
export class RabbitMqConnection {
66
private connectionSettings: ConnectionSettings;
77
private channel?: amqplib.ConfirmChannel;
@@ -66,17 +66,6 @@ export class RabbitMqConnection {
6666
return args;
6767
}
6868

69-
async consume(exchange: string, queue: string, consumer: RabbitMQConsumer) {
70-
await this.channel!.consume(queue, (message: ConsumeMessage | null) => {
71-
if (message) {
72-
const ack = this.getAck(message);
73-
const retry = this.getRetry(message, queue, exchange);
74-
const deadLetter = this.getDeadLetter(message, queue, exchange);
75-
consumer.onMessage({ message, ack, retry, deadLetter });
76-
}
77-
});
78-
}
79-
8069
async deleteQueue(queue: string) {
8170
return await this.channel!.deleteQueue(queue);
8271
}
@@ -129,28 +118,31 @@ export class RabbitMqConnection {
129118
return await this.connection?.close();
130119
}
131120

132-
getAck(message: ConsumeMessage) {
133-
return () => {
134-
this.channel!.ack(message);
135-
};
121+
async consume(queue: string, onMessage: (message: ConsumeMessage) => {}) {
122+
await this.channel!.consume(queue, (message: ConsumeMessage | null) => {
123+
if (!message) {
124+
return;
125+
}
126+
onMessage(message);
127+
});
136128
}
137129

138-
getRetry(message: ConsumeMessage, queue: string, exchange: string) {
139-
return async () => {
140-
const retryExchange = RabbitMQExchangeNameFormatter.retry(exchange);
141-
const options = this.getMessageOptions(message);
130+
ack(message: ConsumeMessage) {
131+
this.channel!.ack(message);
132+
}
142133

143-
return await this.publish({ exchange: retryExchange, routingKey: queue, content: message.content, options });
144-
};
134+
async retry(message: ConsumeMessage, queue: string, exchange: string) {
135+
const retryExchange = RabbitMQExchangeNameFormatter.retry(exchange);
136+
const options = this.getMessageOptions(message);
137+
138+
return await this.publish({ exchange: retryExchange, routingKey: queue, content: message.content, options });
145139
}
146140

147-
getDeadLetter(message: ConsumeMessage, queue: string, exchange: string) {
148-
return async () => {
149-
const deadLetterExchange = RabbitMQExchangeNameFormatter.deadLetter(exchange);
150-
const options = this.getMessageOptions(message);
141+
async deadLetter(message: ConsumeMessage, queue: string, exchange: string) {
142+
const deadLetterExchange = RabbitMQExchangeNameFormatter.deadLetter(exchange);
143+
const options = this.getMessageOptions(message);
151144

152-
return await this.publish({ exchange: deadLetterExchange, routingKey: queue, content: message.content, options });
153-
};
145+
return await this.publish({ exchange: deadLetterExchange, routingKey: queue, content: message.content, options });
154146
}
155147

156148
private getMessageOptions(message: ConsumeMessage) {

10-rabbitmq-message-broker/5-handle-consume-errors/tests/Contexts/Shared/infrastructure/EventBus/RabbitMQEventBus.test.ts

+3-2
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,11 @@ describe('RabbitMQEventBus test', () => {
139139
const deadLetterSubscriber = new DomainEventSubscriberDummy();
140140
const deadLetterSubscribers = new DomainEventSubscribers([dummySubscriber]);
141141
const deserializer = DomainEventDeserializer.configure(deadLetterSubscribers);
142-
const consumer = new RabbitMQConsumer({ subscriber: deadLetterSubscriber, deserializer, maxRetries: 3 });
143-
await connection.consume(exchange, deadLetterQueue, consumer);
142+
const consumer = new RabbitMQConsumer({ subscriber: deadLetterSubscriber, deserializer, connection, maxRetries: 3, queueName: deadLetterQueue, exchange });
143+
await connection.consume(deadLetterQueue, consumer.onMessage.bind(consumer));
144144

145145
await deadLetterSubscriber.assertConsumedEvents(events);
146146
}
147+
147148
});
148149
});

0 commit comments

Comments
 (0)