diff --git a/docs/.vitepress/config.ts b/docs/.vitepress/config.ts index 59afe5ad..2d18391b 100644 --- a/docs/.vitepress/config.ts +++ b/docs/.vitepress/config.ts @@ -349,6 +349,14 @@ export default { { text: '操作动态配置文件', link: '/knowledge-deposition/Nacos/操作动态配置文件'}, ] }, + { + text: 'RabbitMQ', + collapsible: false, + collapsed: false, + items: [ + { text: '基本概念和安装启动', link: '/knowledge-deposition/RabbitMQ/基本概念和安装启动'}, + ] + }, ], '/algorithm/': [ { diff --git "a/docs/knowledge-deposition/RabbitMQ/\345\237\272\346\234\254\346\246\202\345\277\265\345\222\214\345\256\211\350\243\205\345\220\257\345\212\250.md" "b/docs/knowledge-deposition/RabbitMQ/\345\237\272\346\234\254\346\246\202\345\277\265\345\222\214\345\256\211\350\243\205\345\220\257\345\212\250.md" new file mode 100644 index 00000000..e935a7ee --- /dev/null +++ "b/docs/knowledge-deposition/RabbitMQ/\345\237\272\346\234\254\346\246\202\345\277\265\345\222\214\345\256\211\350\243\205\345\220\257\345\212\250.md" @@ -0,0 +1,343 @@ +--- +layout: doc +--- + +# 安装启动和基本使用 + +## 简介 + +- [官方网站](https://www.rabbitmq.com/) + +> `RabbitMQ`是一款可靠且成熟的消息传递和流媒体代理,可以轻松部署在云环境、本地环境以及本地计算机上 + + +## 核心概念 + +> `RabbitMQ`是一个`消息代理`(broker):它接受并转发消息 + +> 可以把它想象成是一个`邮局`:我们把要寄出的信件放入邮箱时,可以确信邮递员最终会将信件送到收件人手中 +> 在这个类比中,`RabbitMQ`就像`一个邮箱`、`一个邮局`和`一名邮递员` + +> 它有以下几个重要的概念: + +1. `message`:RabbitMQ`接收`、`存储`和`转发`=>`二进制数据块`—-也就是`message` +2. `Producer`:生产者是消息的发送方,它将消息发送到`RabbitMQ`的交换器(`Exchange`)中 +3. `Exchange`: +4. `Queue`: +5. `Consumer`: + + +## 安装和启动 + +- [RabbitMQ各个版本的时间排布](https://www.rabbitmq.com/release-information) + +- [Docker社区镜像](https://hub.docker.com/_/rabbitmq/) + +### Docker启动 + +> 直接使用`docker run`来启动一个`image` + +:::tip +- `3.13-management`版本自带了`web管理界面` +::: + +```shell +docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management +``` + +> 或者使用`docker-compose` + +```shell +version: "2.8" +services: + RabbitMQ: + image: rabbitmq:3.13-management + container_name: RabbitMQ + restart: always + ports: + - "5672:5672" + - "15672:15672" +``` + +## 使用`homebrew`安装启动教程 + +- Mac[安装教程](https://www.rabbitmq.com/docs/install-homebrew) + +## 简单示例 + +> 原理如图所示: + +![](https://raw.githubusercontent.com/mx52jing/image-hosting/main/images/RabbitMQ/rabbitMQ-single-single.png) + +- 这里使用[Go语言](https://www.rabbitmq.com/tutorials/tutorial-one-go)演示 + + +:::tip +- RabbitMQ支持多种协议,我们使用`AMQP 0-9-1`,这是一种用于消息传递的开放通用协议 + +- 有许多不同语言的RabbitMQ客户端,我们使用`Go amqp`客户端 +::: + +> 初始化项目并下载依赖 + +```shell +go mod init +go get github.com/rabbitmq/amqp091-go +``` + +写一个错误处理的通用函数 + +```Go +// FailOnError a helper function to check the return value for each amqp call +func FailOnError(err error, msg string) { + if err != nil { + log.Panicf("【%s】: %s", msg, err) + } +} +``` + +### 生产者(Producer) + +1. 连接RabbitMQ服务 + +```Go +// 1. connect to the RabbitMQ +conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") +shared.FailOnError(err, "Failed to connect to RabbitMQ") +defer conn.Close() +``` + +2. 开启一个`channel` + +```Go +// 2. open a channel +ch, err := conn.Channel() +shared.FailOnError(err, "create channel error") +defer ch.Close() +``` + +:::warning +- 这里的`ch`不是go原生的channel,而是一个`AMQP channel` + +```Go +func (c *Connection) Channel() (*Channel, error) +``` +::: + +3. 声明一个队列,并发送消息 + +```Go +// 3. declare a queue,it will only be created if it doesn't exist already +queue, err := ch.QueueDeclare( + "first_queue", // queue name + false, // durable + false, //delete when unused + false, //exclusive + false, // no-wait + nil, // arguments +) +shared.FailOnError(err, "declare queue error") + +withTimeoutCtx, cancelFunc := context.WithTimeout(context.Background(), 6*time.Second) +defer cancelFunc() +// declare a message to be sent +messageBody := "hello, I am the first message, you have received me" +err = ch.PublishWithContext( + withTimeoutCtx, + "", //exchange name + queue.Name, // routing key + false, // mandatory + false, //immediate + amqp091.Publishing{ + Body: []byte(messageBody), + ContentType: "text/plain", + }, +) +shared.FailOnError(err, "Publish error") +log.Printf(" [x] Sent %s\n", messageBody) +``` + +:::warning +- 这里的`queue`只有不存在时才会被创建,但是`Producer`和`Consumer`的代码中都要声明,因为不知道`Producer`和`Consumer`哪个会先启动 +::: + +### 消费者/接收者(Consumer) + +> 我们启动一个Consumer,然后让其一直监听是否有可接收的匹配消息 + +1. 和`Producer`一样,都需要`创建链接`、`channel`、`声明队列` + +```Go +// 1、create connection +conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") +shared.FailOnError(err, "create connection error") +defer conn.Close() + +// open a channel +ch, err := conn.Channel() +shared.FailOnError(err, "open a channel error") +defer ch.Close() + +// 3、declare queue +queue, err := ch.QueueDeclare( + "first_queue", + false, + false, + false, + false, + nil, +) +shared.FailOnError(err, "declare queue error") +``` + +2. 消费消息 + +```Go +// 4、consume message +message, err := ch.Consume( + queue.Name, + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args +) +shared.FailOnError(err, "consume queue message error") +waitCh := make(chan struct{}) +go func() { + for msg := range message { + log.Printf("Received a message: %s \n", msg.Body) + } +}() +log.Printf(" [*] Waiting for messages. To exit press CTRL+C") +<-waitCh +``` + +:::tip +- 为了能持续接受到消息,这里使用`channel`做锁 +::: + + +## 完整代码 + +:::tip +- 这个例子中`Producer`和`Consumer`通过`命名队列`来确定发送和接受的消息 +::: + +> `send.go` + +```Go +package main + +import ( + "context" + "github.com/rabbitmq/amqp091-go" + "go-rabbitmq/shared" + "log" + "time" +) + +func connectAndSendMessage() { + // 1. connect to the RabbitMQ + conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") + shared.FailOnError(err, "Failed to connect to RabbitMQ") + defer conn.Close() + + // 2. open a channel + ch, err := conn.Channel() + shared.FailOnError(err, "create channel error") + defer ch.Close() + + // 3. declare a queue,it will only be created if it doesn't exist already + queue, err := ch.QueueDeclare( + "first_queue", // queue name + false, // durable + false, //delete when unused + false, //exclusive + false, // no-wait + nil, // arguments + ) + shared.FailOnError(err, "declare queue error") + + withTimeoutCtx, cancelFunc := context.WithTimeout(context.Background(), 6*time.Second) + defer cancelFunc() + // declare a message to be sent + messageBody := "hello, I am the first message, you have received me" + err = ch.PublishWithContext( + withTimeoutCtx, + "", //exchange name + queue.Name, // routing key + false, // mandatory + false, //immediate + amqp091.Publishing{ + Body: []byte(messageBody), + ContentType: "text/plain", + }, + ) + shared.FailOnError(err, "Publish error") + log.Printf(" [x] Sent %s\n", messageBody) +} + +func main() { + connectAndSendMessage() +} +``` + +> `receive.go` + +```Go +package main + +import ( + "github.com/rabbitmq/amqp091-go" + "go-rabbitmq/shared" + "log" +) + +func createReceiverAndReceiveMessage() { + // 1、create connection + conn, err := amqp091.Dial("amqp://guest:guest@localhost:5672/") + shared.FailOnError(err, "create connection error") + defer conn.Close() + + // open a channel + ch, err := conn.Channel() + shared.FailOnError(err, "open a channel error") + defer ch.Close() + + // 3、declare queue + queue, err := ch.QueueDeclare( + "first_queue", + false, + false, + false, + false, + nil, + ) + shared.FailOnError(err, "declare queue error") + // 4、consume message + message, err := ch.Consume( + queue.Name, + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + shared.FailOnError(err, "consume queue message error") + waitCh := make(chan struct{}) + go func() { + for msg := range message { + log.Printf("Received a message: %s \n", msg.Body) + } + }() + log.Printf("[*] Waiting for messages. To exit press CTRL+C") + <-waitCh +} + +func main() { + createReceiverAndReceiveMessage() +} +```