From 5d994f60ea215f9dd7d2c7b61057297ffcddd939 Mon Sep 17 00:00:00 2001 From: mx Date: Fri, 9 Aug 2024 09:14:45 +0800 Subject: [PATCH] feat: add mq publish/subscribe content --- docs/.vitepress/config.ts | 1 + ...21\345\270\203\350\256\242\351\230\205.md" | 475 ++++++++++++++++++ ...11\350\243\205\345\220\257\345\212\250.md" | 27 +- 3 files changed, 500 insertions(+), 3 deletions(-) create mode 100644 "docs/knowledge-deposition/RabbitMQ/\345\217\221\345\270\203\350\256\242\351\230\205.md" diff --git a/docs/.vitepress/config.ts b/docs/.vitepress/config.ts index 28bca478..fb128ccf 100644 --- a/docs/.vitepress/config.ts +++ b/docs/.vitepress/config.ts @@ -356,6 +356,7 @@ export default { items: [ { text: '基本概念和安装启动', link: '/knowledge-deposition/RabbitMQ/基本概念和安装启动'}, { text: '工作队列', link: '/knowledge-deposition/RabbitMQ/工作队列'}, + { text: '发布订阅', link: '/knowledge-deposition/RabbitMQ/发布订阅'}, ] }, ], diff --git "a/docs/knowledge-deposition/RabbitMQ/\345\217\221\345\270\203\350\256\242\351\230\205.md" "b/docs/knowledge-deposition/RabbitMQ/\345\217\221\345\270\203\350\256\242\351\230\205.md" new file mode 100644 index 00000000..be3bb1cb --- /dev/null +++ "b/docs/knowledge-deposition/RabbitMQ/\345\217\221\345\270\203\350\256\242\351\230\205.md" @@ -0,0 +1,475 @@ +--- +layout: doc +--- + +# 发布订阅 + +> 之前的代码实现的都是`一对一`的模式,也就是`一个任务分配给一个消费者去消费`,但是其实也可以实现`一个任务分配给多个消费者` + +> 可以使用`发布/订阅`模式来实现,这种模式下,要使用到`交换机(Exchange)` + +## 交换机 + +> `交换机(Exchange)`的作用从`Producer`处接收`消息(Message)`,然后将消息发送到`队列(Queues)`中 + +> `交换机(Exchange)`必须明确自己如何去处理消息,这个规则取决于`交换机的类型` + +> `交换机(Exchange)`有这几种类型:`Topic`、`Direct`、`Headers`、`Fanout` + +> 之前的代码中都没有用到`交换机(Exchange)`,都是直接声明一个`具名队列` + +> 如下:函数`ch.PublishWithContext`中`exchange name`是空字符串,这时会使用一个默认的交换机 + +```Go +// 3. declare a queue,it will only be created if it doesn't exist already +queue, err := ch.QueueDeclare( + "second_queue", // queue name + true, // durable + false, //delete when unused + false, //exclusive + false, // no-wait + nil, // arguments +) + +// publish message +err = ch.PublishWithContext( + withTimeoutCtx, + "", //exchange name + queue.Name, // routing key + false, // mandatory + false, //immediate + amqp.Publishing{ + DeliveryMode: amqp.Persistent, + Body: []byte(messageBody), + ContentType: "text/plain", + }, +) +``` + +### `Direct`(直连模式) + +> `交换机(Exchange)`将消息发送到某个匹配的`队列中`,匹配规则是:这个`队列的binding key`要和`消息的routing key`一致 + +:::tip +- 队列的`binding key`在调用`ch.QueueBind`方法时指定 + +- 消息的`routing key`在调用`ch.PublishWithContext`方法中指定 +::: + +> 适用于明确指定的路由,例如,处理特定类型的任务,简单示例代码如下: + +> `producer.go` + +```Go +// xxxx 其他代码 +// declare exchange +err = ch.ExchangeDeclare( + "directLogs", // exchange name + "direct", // exchange type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments +) +// xxxx 其他代码 +err = ch.PublishWithContext( + ctx, // context + "directLogs", // exchange name + "directRoutingKey", // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + DeliveryMode: amqp.Persistent, + ContentType: "text/plain", + Body: []byte(messageBody), + }, +) +``` + +> `consumer.go` + +```Go +// xxxx 其他代码 +// declare exchange +err = ch.ExchangeDeclare( + "directLogs", // exchange name + "direct", // exchange type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments +) +q, err := ch.QueueDeclare( + "directQueue", // name + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments +) +// 将交换机和队列进行绑定 +err = ch.QueueBind( + q.Name, // queue name + "directRoutingKey", // binding key + "directLogs", // exchange name + false, + nil, +) +// xxxx 其他代码 +``` + +### `Topic`(主题模式) + +> 将接收到的消息放到和交换机指定的`routing key`匹配的`队列`里面 +> +> 额外增加使用`*`(匹配`一个单词`)或使用`#`(匹配`多个单词`) + +> 比起`Direct`模式,在验证`routing key`的时候,多了匹配规则 + +> `producer.go` + +```Go +// xxxx 其他代码 +// declare exchange +err = ch.ExchangeDeclare( + "topicTask", // exchange name + "topic", // exchange type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments +) +// xxxx 其他代码 +err = ch.PublishWithContext( + ctx, // context + "topicTask", // exchange name + "topicRoutingKey.abc", // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + DeliveryMode: amqp.Persistent, + ContentType: "text/plain", + Body: []byte(messageBody), + }, +) +``` + +> `consumer.go` + +```Go +// xxxx 其他代码 +// declare exchange +err = ch.ExchangeDeclare( + "topicTask", // exchange name + "topic", // exchange type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments +) +q, err := ch.QueueDeclare( + "topicQueue", // name + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments +) +// 将交换机和队列进行绑定 +err = ch.QueueBind( + q.Name, // queue name + "topicRoutingKey.*", // routing key 多了一个匹配规则 + "topicTask", // exchange name + false, + nil, +) +// xxxx 其他代码 +``` + +### `Headers`(头部模式) + +> 使用`消息头属性(headers)`来路由消息,而`不是路由键`,可以匹配多个头 +> +> 使用`Headers`模式,不用指定`routing key` +> +> `amqp.Table`的数据类型是`Map` + +```Go +type Table map[string]interface{} +``` + +> `producer.go` + +```Go +// xxxx 其他代码 +// declare exchange +err = ch.ExchangeDeclare( + "headersTask", // exchange name + "headers", // exchange type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments +) +// xxxx 其他代码 +err = ch.PublishWithContext( + ctx, // context + "headersTask", // exchange name + "", // routing key Headers模式下不用指定routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + DeliveryMode: amqp.Persistent, + ContentType: "text/plain", + Body: []byte(messageBody), + Headers: amqp.Table{ // Headers模式下,会检查该字段,要传该字段 + "format": "pdf", + "type": "report", + }, + }, +) +``` + +> `consumer.go` + +```Go +// xxxx 其他代码 +// declare exchange +err = ch.ExchangeDeclare( + "headersTask", // exchange name + "headers", // exchange type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments +) +q, err := ch.QueueDeclare( + "headersQueue", // name + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments +) +// 将交换机和队列进行绑定 +err = ch.QueueBind( + q.Name, // queue name + "", // routing key + "headersTask", // exchange name + false, + amqp.Table{ // 此处要和上面发送时保持一致 + "format": "pdf", + "type": "report", + }, +) +// xxxx 其他代码 +``` + +### `Fanout`(广播模式) + +> 把消息放入`交换机所有的队列`,实现广播功能 +> +> `Fanout`模式会忽略`routing key` + +> `producer.go` + +```Go +// xxxx 其他代码 +// declare exchange +err = ch.ExchangeDeclare( + "fanoutTask", // exchange name + "fanout", // exchange type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments +) +// xxxx 其他代码 +err = ch.PublishWithContext( + ctx, // context + "fanoutTask", // exchange name + "", // routing key Fanout 模式下不用指定routing key,会忽略该字段 + false, // mandatory + false, // immediate + amqp.Publishing{ + DeliveryMode: amqp.Persistent, + ContentType: "text/plain", + Body: []byte(messageBody), + }, +) +``` + +> `consumer.go` + +```Go +// xxxx 其他代码 +// declare exchange +err = ch.ExchangeDeclare( + "fanoutTask", // exchange name + "fanout", // exchange type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments +) +q, err := ch.QueueDeclare( + "fanoutQueue", // name + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments +) +// 将交换机和队列进行绑定 +err = ch.QueueBind( + q.Name, // queue name + "", // routing key + "fanoutTask", // exchange name + false, + nil +) +// xxxx 其他代码 +``` + +## 一个简单案例 + +> 生产者生产出一些日志消息,所有的消费者接收消息并打印出来,这种场景可以使用`Fanout(广播模式)` + +> 完整代码如下: + +> 生产方`emit.go` + +```Go +package main + +import ( + "context" + "fmt" + amqp "github.com/rabbitmq/amqp091-go" + "go-rabbitmq/shared" + "strings" + "time" +) + +func startUpAndSend() { + // create connection + conn, err := amqp.Dial("amqp://guest:guest@localhost:5672") + shared.FailOnError(err, "create connection error") + defer conn.Close() + + // create channel + ch, err := conn.Channel() + shared.FailOnError(err, "create channel error") + defer ch.Close() + + // declare exchange + err = ch.ExchangeDeclare( + "logsExchange", + "fanout", // exchange type => headers, topic, direct, fanout + false, // durable + false, // auto-delete + false, // internal + false, // no-wait + nil, // args + ) + shared.FailOnError(err, "declare exchange error") + + // publish message + ctx, cancelFunc := context.WithTimeout(context.Background(), 6*time.Second) + defer cancelFunc() + for i := 0; i < 6; i++ { + go func(i int) { + messageBody := fmt.Sprintf("我是第[%d]条消息%s", i+1, strings.Repeat(".", i+1)) + err = ch.PublishWithContext( + ctx, + "logsExchange", + "", + false, + false, + amqp.Publishing{ + DeliveryMode: amqp.Persistent, + ContentType: "text/plain", + Body: []byte(messageBody), + }, + ) + shared.FailOnError(err, "publish message error") + fmt.Printf(" [x] Sent %s\n", messageBody) + }(i) + } + time.Sleep(6 * time.Second) +} + +func main() { + startUpAndSend() +} +``` + +> 消费方`receive.go` + +```Go +package main + +import ( + "fmt" + amqp "github.com/rabbitmq/amqp091-go" + "go-rabbitmq/shared" +) + +func startUpAndReceive() { + // create connection + conn, err := amqp.Dial("amqp://guest:guest@localhost:5672") + shared.FailOnError(err, "create connection error") + defer conn.Close() + + // create channel + ch, err := conn.Channel() + shared.FailOnError(err, "create channel error") + defer ch.Close() + + // declare exchange + err = ch.ExchangeDeclare( + "logsExchange", + "fanout", // exchange type => headers, topic, direct, fanout + false, // durable + false, // auto-delete + false, // internal + false, // no-wait + nil, // args + ) + shared.FailOnError(err, "declare exchange error") + + // declare unnamed queue + queue, err := ch.QueueDeclare("", false, false, true, false, nil) + shared.FailOnError(err, "declare queue error") + + // bind exchange and queue + err = ch.QueueBind(queue.Name, "", "logsExchange", false, nil) + shared.FailOnError(err, "bind exchange and queue error") + + // consume message + lockChan := make(chan struct{}) + messages, err := ch.Consume(queue.Name, "", true, false, false, false, nil) + shared.FailOnError(err, "consume message error") + go func() { + for msg := range messages { + fmt.Printf("Received a message: %s\n", msg.Body) + } + }() + fmt.Println("[*] Waiting for messages. To exit press CTRL+C") + <-lockChan +} + +func main() { + startUpAndReceive() +} +``` + +> 上面代码,在多个窗口分别运行`receive.go`中,然后运行`emit.go`,每个窗口都会接收到发送的数据 \ No newline at end of file diff --git "a/docs/knowledge-deposition/RabbitMQ/\345\237\272\346\234\254\346\246\202\345\277\265\345\222\214\345\256\211\350\243\205\345\220\257\345\212\250.md" "b/docs/knowledge-deposition/RabbitMQ/\345\237\272\346\234\254\346\246\202\345\277\265\345\222\214\345\256\211\350\243\205\345\220\257\345\212\250.md" index 552d029a..c5b2f0e3 100644 --- "a/docs/knowledge-deposition/RabbitMQ/\345\237\272\346\234\254\346\246\202\345\277\265\345\222\214\345\256\211\350\243\205\345\220\257\345\212\250.md" +++ "b/docs/knowledge-deposition/RabbitMQ/\345\237\272\346\234\254\346\246\202\345\277\265\345\222\214\345\256\211\350\243\205\345\220\257\345\212\250.md" @@ -8,6 +8,8 @@ layout: doc - [官方网站](https://www.rabbitmq.com/) +- [RabbitMQ版本号信息](https://www.rabbitmq.com/release-information) + > `RabbitMQ`是一款可靠且成熟的消息传递和流媒体代理,可以轻松部署在云环境、本地环境以及本地计算机上 @@ -42,7 +44,9 @@ layout: doc > 直接使用`docker run`来启动一个`image` :::tip -- `3.13-management`版本自带了`web管理界面` +- `3.13-management`版本自带了`web管理界面`,web界面的端口号默认是`15672` + +- `MQ`服务默认端口号是`5672` ::: ```shell @@ -63,9 +67,26 @@ services: - "15672:15672" ``` -## 使用`homebrew`安装启动教程 +### 使用`homebrew`安装启动教程 + +:::tip +- `RabbitMQ`是用`Erlang`编写的,因此需要先安装`Erlang` +::: + +- 使用`Homebrew`安装`Erlang` + +```shell +brew install erlang +``` + +- 安装`RabbitMQ` + +```shell +brew install rabbitmq +``` + +- 具体教程点击[Mac安装教程](https://www.rabbitmq.com/docs/install-homebrew) -- Mac[安装教程](https://www.rabbitmq.com/docs/install-homebrew) ## 简单示例