From 31fc0f80159530dc869a5638deb205663ebe0f9e Mon Sep 17 00:00:00 2001 From: mx Date: Thu, 8 Aug 2024 10:40:15 +0800 Subject: [PATCH] feat: add mq documentation --- docs/.vitepress/config.ts | 1 + ...11\350\243\205\345\220\257\345\212\250.md" | 14 +- ...45\344\275\234\351\230\237\345\210\227.md" | 264 ++++++++++++++++++ 3 files changed, 274 insertions(+), 5 deletions(-) create mode 100644 "docs/knowledge-deposition/RabbitMQ/\345\267\245\344\275\234\351\230\237\345\210\227.md" diff --git a/docs/.vitepress/config.ts b/docs/.vitepress/config.ts index 2d18391b..28bca478 100644 --- a/docs/.vitepress/config.ts +++ b/docs/.vitepress/config.ts @@ -355,6 +355,7 @@ export default { collapsed: false, items: [ { text: '基本概念和安装启动', link: '/knowledge-deposition/RabbitMQ/基本概念和安装启动'}, + { text: '工作队列', link: '/knowledge-deposition/RabbitMQ/工作队列'}, ] }, ], diff --git "a/docs/knowledge-deposition/RabbitMQ/\345\237\272\346\234\254\346\246\202\345\277\265\345\222\214\345\256\211\350\243\205\345\220\257\345\212\250.md" "b/docs/knowledge-deposition/RabbitMQ/\345\237\272\346\234\254\346\246\202\345\277\265\345\222\214\345\256\211\350\243\205\345\220\257\345\212\250.md" index e935a7ee..552d029a 100644 --- "a/docs/knowledge-deposition/RabbitMQ/\345\237\272\346\234\254\346\246\202\345\277\265\345\222\214\345\256\211\350\243\205\345\220\257\345\212\250.md" +++ "b/docs/knowledge-deposition/RabbitMQ/\345\237\272\346\234\254\346\246\202\345\277\265\345\222\214\345\256\211\350\243\205\345\220\257\345\212\250.md" @@ -20,11 +20,15 @@ layout: doc > 它有以下几个重要的概念: -1. `message`:RabbitMQ`接收`、`存储`和`转发`=>`二进制数据块`—-也就是`message` -2. `Producer`:生产者是消息的发送方,它将消息发送到`RabbitMQ`的交换器(`Exchange`)中 -3. `Exchange`: -4. `Queue`: -5. `Consumer`: +1. `Message`:RabbitMQ`接收`、`存储`和`转发`=>`二进制数据块`—-也就是`message` +2. `Producer`:生产者是一个发送消息的用户应用程序 +3. `Exchange`:交换机一边`接收来自生产者的消息`,另一边将它们`推送到队列`中。 交换机必须确切地知道如何处理接收到的消息,包括如下几条: + - `消息是否应该添加到到特定队列` + + - `消息是否应该应添加到多个队列` + - `消息是否应该被丢弃` +4. `Queue`:队列是一个存储消息的缓冲区 +5. `Consumer`:消费者是接收消息的用户应用程序 ## 安装和启动 diff --git "a/docs/knowledge-deposition/RabbitMQ/\345\267\245\344\275\234\351\230\237\345\210\227.md" "b/docs/knowledge-deposition/RabbitMQ/\345\267\245\344\275\234\351\230\237\345\210\227.md" new file mode 100644 index 00000000..bd87eeaf --- /dev/null +++ "b/docs/knowledge-deposition/RabbitMQ/\345\267\245\344\275\234\351\230\237\345\210\227.md" @@ -0,0 +1,264 @@ +--- +layout: doc +--- + +# 工作队列 + +## 循环分发消息 + +> 默认情况下,`RabbitMQ`将按顺序将每条消息发送给下一个消费者,每个`Consumer`将收到相同数量的消息,这种分发消息的方式称为`循环轮询` +> +> 我们可以同时开启多个协程去消费消息来测试这一特性,消费消息的我们称之为`worker` + +> 写一个通用的接收消息的通用函数`consumeMessage`(worker.go),然后开启`3`个协程去消费消息 +> +> 可以看到,每个message被消费后,会等待一段时间,才能去消费下一个消息(消息文案中包含几个`.`,就等待`10-.的个数`秒) + +```Go +for i := 1; i <= 3; i++ { + go consumeMessage(ch, queue.Name, i) +} +func consumeMessage(ch *amqp.Channel, queueName string, serialNumber int) { + // 4、consume message + message, err := ch.Consume( + queueName, + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + shared.FailOnError(err, "consume queue message error") + for msg := range message { + fmt.Printf("【NO %d】 Received a message: %s", serialNumber, msg.Body) + dotCount := bytes.Count(msg.Body, []byte(".")) + duration := time.Duration(10 - dotCount) + time.Sleep(duration * time.Second) + fmt.Printf("Done \n") + } +} +``` + +> 而在生产消息时(task.go),一次性发送多条 + +```Go +for i := 0; i < 10; i++ { + messageBody := fmt.Sprintf("我是第[%d]条消息%s \n", i+1, strings.Repeat(".", i+1)) + err = ch.PublishWithContext( + withTimeoutCtx, + "", //exchange name + queue.Name, // routing key + false, // mandatory + false, //immediate + amqp.Publishing{ + Body: []byte(messageBody), + ContentType: "text/plain", + }, + ) + shared.FailOnError(err, "Publish message error") + fmt.Printf(" [x] Sent %s\n", messageBody) +} +``` + +> 先运行消费者代码`worker.go`,然后执行生产者代码`task.go`,打印如下: + +```shell +# workQueues/task.go 打印 + [x] Sent 我是第[1]条消息. + [x] Sent 我是第[2]条消息.. + [x] Sent 我是第[3]条消息... + [x] Sent 我是第[4]条消息.... + [x] Sent 我是第[5]条消息..... + [x] Sent 我是第[6]条消息...... + [x] Sent 我是第[7]条消息....... + [x] Sent 我是第[8]条消息........ + [x] Sent 我是第[9]条消息......... + [x] Sent 我是第[10]条消息.......... + +# go run workQueues/worker.go 打印 +【NO 1】 Received a message: 我是第[3]条消息..., Will wait [7s] +【NO 3】 Received a message: 我是第[1]条消息., Will wait [9s] +【NO 2】 Received a message: 我是第[2]条消息.., Will wait [8s] +【NO 1】Done +【NO 1】 Received a message: 我是第[6]条消息......, Will wait [4s] +【NO 2】Done +【NO 2】 Received a message: 我是第[5]条消息....., Will wait [5s] +【NO 3】Done +【NO 3】 Received a message: 我是第[4]条消息...., Will wait [6s] +【NO 1】Done +【NO 1】 Received a message: 我是第[9]条消息........., Will wait [1s] +【NO 1】Done +【NO 2】Done +【NO 2】 Received a message: 我是第[8]条消息........, Will wait [2s] +【NO 2】Done +【NO 3】Done +【NO 3】 Received a message: 我是第[7]条消息......., Will wait [3s] +【NO 3】Done +【NO 3】 Received a message: 我是第[10]条消息.........., Will wait [0s] +【NO 3】Done +``` + +> 可以看到上面的打印,消息是轮训发放的,就算要等很久才能消费,也会按轮训的顺序分配 +> +> 序号【`NO1`】消费第`3、6、9`条消息 +> +> 序号【`NO2`】消费第`2、5、8`条消息 +> +> 序号【`NO3`】消费第`1、4、7、10`条消息 + +> 但是其实`第4条`消息`【NO1】`来消费是比较合适的,因为它等待的时间最短,可以更早地去处理下一条消息 +> +> +> 要想达到这样的效果,可以通过`ch.Qos`方法,将`prefetch`设置为`1`,它表示在消费者确认消息之前,`RabbitMQ`可以`发送给消费者的最大消息数`,也就是告诉`RabbutMQ`一次不要给一个工作者多于`1条`消息,这通常用于确保消费者逐条处理消息,避免消息堆积。 + +```Go +err = ch.Qos( + 1, // prefetch count + 0, // prefetch size + false, // global +) +``` + +- `prefetchCount(int)`: + +> 解释: 预取计数(Prefetch Count)定义了在消费者确认消息之前,RabbitMQ 可以发送给消费者的最大消息数。简单来说,就是`消费者在确认之前最多可以收到多少条消息`。 + +> 示例中的值: 1 表示消费者在确认一条消息之前最多只能收到一条消息。这通常用于确保消费者逐条处理消息,避免消息堆积。 + +- `prefetchSize(int)`: + +> 解释: 预取大小(Prefetch Size)定义了在消费者确认消息之前,RabbitMQ可以发送给消费者的最大字节数。通常情况下,这个值设置为`0`,表示`不限制大小`。 + +> 示例中的值: 0 表示没有预取大小的限制。 + +- `global(bool)`: + +> 解释: 全局标志(Global Flag)决定了`QoS`设置是应用于整个通道还是仅应用于当前消费者。 + +> 示例中的值: `false`表示`QoS`设置仅应用于`当前消费者` +> + +> 如果设置为`true`,则QoS设置应用于`整个channel上所有的消费者` + +> 调整后输出如下: +```shell +【NO 2】 Received a message: 我是第[3]条消息..., Will wait [7s] +【NO 3】 Received a message: 我是第[1]条消息., Will wait [9s] +【NO 1】 Received a message: 我是第[2]条消息.., Will wait [8s] +【NO 2】Done +【NO 2】 Received a message: 我是第[4]条消息...., Will wait [6s] +【NO 1】Done +【NO 1】 Received a message: 我是第[5]条消息....., Will wait [5s] +【NO 3】Done +【NO 3】 Received a message: 我是第[6]条消息......, Will wait [4s] +【NO 3】Done +【NO 2】Done +【NO 1】Done +【NO 2】 Received a message: 我是第[8]条消息........, Will wait [2s] +【NO 3】 Received a message: 我是第[7]条消息......., Will wait [3s] +【NO 1】 Received a message: 我是第[9]条消息........., Will wait [1s] +【NO 1】Done +【NO 1】 Received a message: 我是第[10]条消息.........., Will wait [0s] +【NO 1】Done +【NO 2】Done +【NO 3】Done +``` + +> 如上:【NO2】先消费第`3`条消息,如果没有上面的设置那么它完成后会先去消费第`6`条消息,但是通过`Qos`方法设置后,先消费第`4`条消息 + +## 消息确认 + +> 现在这个场景下,如果我们启动`worker.go`后,然后启动`task.go`,此时消息如果还没有接收完,就断开`worker.go`,那么此时刚才未处理的消息就会消失,再次 +> 执行`worker.go`也不会接收到任何消息了 + +:::tip +- 上面的现象是因为在调用`ch.Consume`函数时,将`第3个参数auto-ack`设置为了`true`,这表明只要`Producer`成功发送消息后,就将该消息标记为`已确认`,默认该消息已经完成处理,不会管`Consumer`是否收到 +::: + +> 要处理上面的问题,需要以下步骤 +> +> 1、将`auto-ack`设置为`false` +> +> 2、消息处理完成后手动调用`Ack`函数确认已收到消息 + +> `worker.go`修改如下: + +```Go +func consumeMessage(ch *amqp.Channel, queueName string, serialNumber int) { + // 4、consume message + message, err := ch.Consume( + queueName, + "", // consumer + false, // auto-ack 不自动确认消息 + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + shared.FailOnError(err, "consume queue message error") + go func() { + for msg := range message { + dotCount := bytes.Count(msg.Body, []byte(".")) + duration := time.Duration(10 - dotCount) + fmt.Printf("【NO %d】 Received a message: %s, Will wait [%s]\n", serialNumber, msg.Body, duration*time.Second) + time.Sleep(duration * time.Second) + fmt.Printf("Done \n") + // Manually confirm that the message has been delivered + msg.Ack(false) + } + }() +} +``` + +## 消息持久化 + +> 现在的代码中,如果在`发送/接收`消息的过程中,`RabbitMQ`服务挂了,或者`出错退出了`,那么消息就会丢失 +> +> 为了解决这个问题,可以设置`消息持久化`,对于`Producer`和`Consumer`都要设置 + +> 1、发送方 +> - 调用`ch.PublishWithContext`函数时,设置`消息body(amqp.Publishing)`时,将模式`DeliveryMode`改为持久化 +> - 声明队列调用`ch.QueueDeclare`函数时,将第二个参数`durable`设置为`true` + +> `task.go`修改如下: + +```Go +// 3. declare a queue,it will only be created if it doesn't exist already +queue, err := ch.QueueDeclare( + "second_queue", // queue name + true, // durable + false, //delete when unused + false, //exclusive + false, // no-wait + nil, // arguments +) +err = ch.PublishWithContext( + withTimeoutCtx, + "", //exchange name + queue.Name, // routing key + false, // mandatory + false, //immediate + amqp.Publishing{ + DeliveryMode: amqp.Persistent, // DeliveryMode uint8 // Transient (0 or 1) or Persistent (2) + Body: []byte(messageBody), + ContentType: "text/plain", + }, +) +``` + + +> 2、接收方 +> - 声明队列调用`ch.QueueDeclare`函数时,将第二个参数`durable`设置为`true` + +```Go +// 3. declare a queue,it will only be created if it doesn't exist already +queue, err := ch.QueueDeclare( + "second_queue", // queue name + true, // durable + false, //delete when unused + false, //exclusive + false, // no-wait + nil, // arguments +) +``` \ No newline at end of file