From a3468daa640fd0d1d0a535503f7a650fccb5ffc4 Mon Sep 17 00:00:00 2001 From: mx Date: Fri, 16 Aug 2024 15:33:40 +0800 Subject: [PATCH] feat: add topic exchange content --- docs/.vitepress/config.ts | 2 +- ...23\345\260\276\345\206\262\347\252\201.md" | 10 +- docs/knowledge-deposition/RabbitMQ/Topics.md | 216 +++++++++++++++++- 3 files changed, 224 insertions(+), 4 deletions(-) diff --git a/docs/.vitepress/config.ts b/docs/.vitepress/config.ts index b14a2ada..5d067211 100644 --- a/docs/.vitepress/config.ts +++ b/docs/.vitepress/config.ts @@ -366,7 +366,7 @@ export default { { text: '工作队列', link: '/knowledge-deposition/RabbitMQ/工作队列'}, { text: '发布订阅', link: '/knowledge-deposition/RabbitMQ/发布订阅'}, { text: 'Routing', link: '/knowledge-deposition/RabbitMQ/Routing'}, - // { text: 'Topics', link: '/knowledge-deposition/RabbitMQ/Topics'}, + { text: 'Topics', link: '/knowledge-deposition/RabbitMQ/Topics'}, ] }, ], diff --git "a/docs/hand-notes/Git/Mac\347\216\257\345\242\203\344\270\213\345\222\214Windows\347\216\257\345\242\203\344\270\213Git\346\215\242\350\241\214\347\273\223\345\260\276\345\206\262\347\252\201.md" "b/docs/hand-notes/Git/Mac\347\216\257\345\242\203\344\270\213\345\222\214Windows\347\216\257\345\242\203\344\270\213Git\346\215\242\350\241\214\347\273\223\345\260\276\345\206\262\347\252\201.md" index d54047cd..6137e4ff 100644 --- "a/docs/hand-notes/Git/Mac\347\216\257\345\242\203\344\270\213\345\222\214Windows\347\216\257\345\242\203\344\270\213Git\346\215\242\350\241\214\347\273\223\345\260\276\345\206\262\347\252\201.md" +++ "b/docs/hand-notes/Git/Mac\347\216\257\345\242\203\344\270\213\345\222\214Windows\347\216\257\345\242\203\344\270\213Git\346\215\242\350\241\214\347\273\223\345\260\276\345\206\262\347\252\201.md" @@ -106,6 +106,12 @@ core.safecrlf=true git add --renormalize . ``` -> 这个命令会重新扫描所有的文件,并根据当前的`Git`配置(包括`core.autocrlf`和`.gitattributes(可选,我没配置这个文件)`)规范化它们的换行符。 +> 这个命令会重新扫描所有的文件,并根据当前的`Git`配置(包括`core.autocrlf`和`.gitattributes(可选,另一种方法,我没配置这个文件)`)规范化它们的换行符。 -> 然后我将上面的修改提交后重新`merge`,就是正常的修改的代码了 \ No newline at end of file +> 然后我将上面的修改提交后重新`merge`,现在的冲突都是代码层面的冲突,没有格式冲突了,然后就可以正常的修改的代码了 + +## 参考 + +- [Git 多平台换行符问题(LF or CRLF)](https://kuanghy.github.io/2017/03/19/git-lf-or-crlf) + +- [Windows环境和Mac环境下Git换行结尾冲突](https://blog.asroads.com/post/e037b783.html) \ No newline at end of file diff --git a/docs/knowledge-deposition/RabbitMQ/Topics.md b/docs/knowledge-deposition/RabbitMQ/Topics.md index 9390dd39..7e361241 100644 --- a/docs/knowledge-deposition/RabbitMQ/Topics.md +++ b/docs/knowledge-deposition/RabbitMQ/Topics.md @@ -10,8 +10,222 @@ layout: doc > 要符合上面要求的话,使用`Direct`交换机就实现不不了了,那么就要使用`Topic`交换机来实现 +## `Topic`交换机 + > `Topic`交换机背后的逻辑类似于`Direct`交换机,通过`routing key`来匹配要将消息发送给哪个队列,匹配时有两种特殊情况: - `*`:正好能够`匹配一个单词` -- `#`:可以匹配`0个或多个单词` \ No newline at end of file +- `#`:可以匹配`0个或多个单词` + +> 如下图: + +![rabbitMQ-exchange-topic](https://raw.githubusercontent.com/mx52jing/image-hosting/main/images/RabbitMQ/rabbitMQ-exchange-topic.png) + + +- 如果此时`routing key`是`quick.orange.rabbit`,那么此时消息会被传递到`Q1`、`Q2`两个队列 + +- 如果此时`routing key`是`lazy.orange.elephant`,那么此时消息会被传递到`Q1`、`Q2`两个队列 +- 如果此时`routing key`是`quick.orange.fox`,那么此时消息仅会被传递到`Q1`队列 +- 如果此时`routing key`是`lazy.brown.fox`,那么此时消息会仅会被传递到`Q2`队列 +- 如果此时`routing key`是`lazy.pink.rabbit`,那么此时消息会仅会被传递到`Q2`队列,而且只会`传递一次`,虽然`*.*.rabbit`和`lazt.#`两个规则都匹配到`Q2`,也只传递到`Q2`一次 +- 如果此时`routing key`是`quick.brown.fox`,因为此时`Q1`、`Q2`都不匹配,所以此时消息不会被传递到任何一个队列 + +:::tip +- `Topic`交换机可以表现出和其他交换一样的行为 + +- 在`Topic`交换机中,当使用`#`来作为`routing key`,那么它将会接收所有的消息,和`Fanout`交换机行为一致 +- 在`Topic`交换机中,当特殊字符`*`和`#`未在绑定中使用时,那么它的表现将会和`Direct`交换机一样 +::: + + +## 代码案例 + +> `receive.go` + +```Go +package main + +import ( + "fmt" + amqp "github.com/rabbitmq/amqp091-go" + "go-rabbitmq/shared" + "os" +) + +func startUpAndReceive() { + // create connection + conn, err := amqp.Dial("amqp://guest:guest@localhost:5672") + shared.FailOnError(err, "create connection error") + defer conn.Close() + + // create channel + ch, err := conn.Channel() + shared.FailOnError(err, "create channel error") + defer ch.Close() + exchangeName := "topicLogs" + // declare exchange + err = ch.ExchangeDeclare( + exchangeName, + "topic", + true, + false, + false, + false, + nil, + ) + shared.FailOnError(err, "declare exchange error") + + // declare queue + queue, err := ch.QueueDeclare( + "", + false, + false, + true, + false, + nil, + ) + shared.FailOnError(err, "declare queue error") + + if len(os.Args) < 2 { + fmt.Printf("Usage: %s [binding_key]...", os.Args[0]) + os.Exit(1) + } + // bind queue and exchange + for _, routingKey := range os.Args[1:] { + fmt.Printf("Binding queue [%s] to exchange [%s] with routing key [%s]\n", queue.Name, "topicLogs", routingKey) + err = ch.QueueBind( + queue.Name, + routingKey, + "topicLogs", + false, + nil, + ) + shared.FailOnError(err, "bind queue error") + } + + // consume message + messages, err := ch.Consume( + queue.Name, + "", + true, + false, + false, + false, + nil, + ) + shared.FailOnError(err, "consume message error") + + lockChan := make(chan struct{}) + go func() { + for delivery := range messages { + fmt.Printf("Receive message %s\n", delivery.Body) + } + }() + <-lockChan +} + +func main() { + startUpAndReceive() +} +``` + +> `emit.go` + +```Go +package main + +import ( + "context" + "fmt" + amqp "github.com/rabbitmq/amqp091-go" + "go-rabbitmq/shared" + "os" + "time" +) + +func startUpAndEmit() { + // create connection + conn, err := amqp.Dial("amqp://guest:guest@localhost:5672") + shared.FailOnError(err, "create connection error") + defer conn.Close() + + // create channel + ch, err := conn.Channel() + shared.FailOnError(err, "create channel error") + defer ch.Close() + exchangeName := "topicLogs" + // declare exchange + err = ch.ExchangeDeclare( + exchangeName, + "topic", + true, + false, + false, + false, + nil, + ) + shared.FailOnError(err, "declare exchange error") + + // publish message + ctx, cancelFunc := context.WithTimeout(context.Background(), 6*time.Second) + defer cancelFunc() + message := shared.BodyFrom(os.Args) + + err = ch.PublishWithContext( + ctx, + exchangeName, + shared.GetLogSeverity(os.Args), + false, + false, + amqp.Publishing{ + ContentType: "text/plain", + Body: []byte(message), + }, + ) + shared.FailOnError(err, "publish message error") + fmt.Printf("Successful Send message [%s]\n", message) +} + +func main() { + startUpAndEmit() +} +``` + +> 运行代码 + +1. 启动第一个`receive.go`,称之为`Q1` + +```shell +go run topics/receive.go "*.*.rabbit" "lazy.#" +# Binding queue [amq.gen-FesQOFM2vxs9rSbf8X4bDA] to exchange [topicLogs] with routing key [*.*.rabbit] +# Binding queue [amq.gen-FesQOFM2vxs9rSbf8X4bDA] to exchange [topicLogs] with routing key [lazy.#] +``` + +2. 启动第二个`receive.go`,称之为`Q2` + +```shell +go run topics/receive.go "#" +# Binding queue [amq.gen-A-k6dZNWLLu_ECKzetmjbg] to exchange [topicLogs] with routing key [#] +``` +3. 启动第三个`receive.go`,称之为`Q3` + +```shell +go run topics/receive.go "aaa" +# Binding queue [amq.gen-qvNPx8QUU102E-RodSHUyA] to exchange [topicLogs] with routing key [aaa] +``` +4. 启动`emit.go`,发送消息 + +```shell +# 消息传递给Q2 +go run topics/emit.go "quick.brown.fox" "fox is coming" +# 消息传递给Q1、Q2 +go run topics/emit.go "quick.brown.rabbit" "rabbit is coming" +# 消息传递给Q2、Q3 +go run topics/emit.go "aaa" "behavior is similar to direct exchange" +# 消息传递给Q1,Q2,并且只传递给Q1一次 +go run topics/emit.go "lazy.blue.rabbit" "blue rabbit is coming" +# 消息传递给Q2 +go run topics/emit.go "smart.block.sheep" "sheep rabbit is coming" +``` +