Skip to content

Commit

Permalink
feat: add topic exchange content
Browse files Browse the repository at this point in the history
  • Loading branch information
mx52jing committed Aug 16, 2024
1 parent 17a9d46 commit a3468da
Show file tree
Hide file tree
Showing 3 changed files with 224 additions and 4 deletions.
2 changes: 1 addition & 1 deletion docs/.vitepress/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ export default {
{ text: '工作队列', link: '/knowledge-deposition/RabbitMQ/工作队列'},
{ text: '发布订阅', link: '/knowledge-deposition/RabbitMQ/发布订阅'},
{ text: 'Routing', link: '/knowledge-deposition/RabbitMQ/Routing'},
// { text: 'Topics', link: '/knowledge-deposition/RabbitMQ/Topics'},
{ text: 'Topics', link: '/knowledge-deposition/RabbitMQ/Topics'},
]
},
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ core.safecrlf=true
git add --renormalize .
```

> 这个命令会重新扫描所有的文件,并根据当前的`Git`配置(包括`core.autocrlf``.gitattributes(可选,我没配置这个文件)`)规范化它们的换行符。
> 这个命令会重新扫描所有的文件,并根据当前的`Git`配置(包括`core.autocrlf``.gitattributes(可选,另一种方法,我没配置这个文件)`)规范化它们的换行符。
> 然后我将上面的修改提交后重新`merge`,就是正常的修改的代码了
> 然后我将上面的修改提交后重新`merge`,现在的冲突都是代码层面的冲突,没有格式冲突了,然后就可以正常的修改的代码了
## 参考

- [Git 多平台换行符问题(LF or CRLF)](https://kuanghy.github.io/2017/03/19/git-lf-or-crlf)

- [Windows环境和Mac环境下Git换行结尾冲突](https://blog.asroads.com/post/e037b783.html)
216 changes: 215 additions & 1 deletion docs/knowledge-deposition/RabbitMQ/Topics.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,222 @@ layout: doc
> 要符合上面要求的话,使用`Direct`交换机就实现不不了了,那么就要使用`Topic`交换机来实现
## `Topic`交换机

> `Topic`交换机背后的逻辑类似于`Direct`交换机,通过`routing key`来匹配要将消息发送给哪个队列,匹配时有两种特殊情况:
- `*`:正好能够`匹配一个单词`

- `#`:可以匹配`0个或多个单词`
- `#`:可以匹配`0个或多个单词`

> 如下图:
![rabbitMQ-exchange-topic](https://raw.githubusercontent.com/mx52jing/image-hosting/main/images/RabbitMQ/rabbitMQ-exchange-topic.png)


- 如果此时`routing key``quick.orange.rabbit`,那么此时消息会被传递到`Q1``Q2`两个队列

- 如果此时`routing key``lazy.orange.elephant`,那么此时消息会被传递到`Q1``Q2`两个队列
- 如果此时`routing key``quick.orange.fox`,那么此时消息仅会被传递到`Q1`队列
- 如果此时`routing key``lazy.brown.fox`,那么此时消息会仅会被传递到`Q2`队列
- 如果此时`routing key``lazy.pink.rabbit`,那么此时消息会仅会被传递到`Q2`队列,而且只会`传递一次`,虽然`*.*.rabbit``lazt.#`两个规则都匹配到`Q2`,也只传递到`Q2`一次
- 如果此时`routing key``quick.brown.fox`,因为此时`Q1``Q2`都不匹配,所以此时消息不会被传递到任何一个队列

:::tip
- `Topic`交换机可以表现出和其他交换一样的行为

-`Topic`交换机中,当使用`#`来作为`routing key`,那么它将会接收所有的消息,和`Fanout`交换机行为一致
-`Topic`交换机中,当特殊字符`*``#`未在绑定中使用时,那么它的表现将会和`Direct`交换机一样
:::


## 代码案例

> `receive.go`
```Go
package main

import (
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"go-rabbitmq/shared"
"os"
)

func startUpAndReceive() {
// create connection
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
shared.FailOnError(err, "create connection error")
defer conn.Close()

// create channel
ch, err := conn.Channel()
shared.FailOnError(err, "create channel error")
defer ch.Close()
exchangeName := "topicLogs"
// declare exchange
err = ch.ExchangeDeclare(
exchangeName,
"topic",
true,
false,
false,
false,
nil,
)
shared.FailOnError(err, "declare exchange error")

// declare queue
queue, err := ch.QueueDeclare(
"",
false,
false,
true,
false,
nil,
)
shared.FailOnError(err, "declare queue error")

if len(os.Args) < 2 {
fmt.Printf("Usage: %s [binding_key]...", os.Args[0])
os.Exit(1)
}
// bind queue and exchange
for _, routingKey := range os.Args[1:] {
fmt.Printf("Binding queue [%s] to exchange [%s] with routing key [%s]\n", queue.Name, "topicLogs", routingKey)
err = ch.QueueBind(
queue.Name,
routingKey,
"topicLogs",
false,
nil,
)
shared.FailOnError(err, "bind queue error")
}

// consume message
messages, err := ch.Consume(
queue.Name,
"",
true,
false,
false,
false,
nil,
)
shared.FailOnError(err, "consume message error")

lockChan := make(chan struct{})
go func() {
for delivery := range messages {
fmt.Printf("Receive message %s\n", delivery.Body)
}
}()
<-lockChan
}

func main() {
startUpAndReceive()
}
```

> `emit.go`
```Go
package main

import (
"context"
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
"go-rabbitmq/shared"
"os"
"time"
)

func startUpAndEmit() {
// create connection
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672")
shared.FailOnError(err, "create connection error")
defer conn.Close()

// create channel
ch, err := conn.Channel()
shared.FailOnError(err, "create channel error")
defer ch.Close()
exchangeName := "topicLogs"
// declare exchange
err = ch.ExchangeDeclare(
exchangeName,
"topic",
true,
false,
false,
false,
nil,
)
shared.FailOnError(err, "declare exchange error")

// publish message
ctx, cancelFunc := context.WithTimeout(context.Background(), 6*time.Second)
defer cancelFunc()
message := shared.BodyFrom(os.Args)

err = ch.PublishWithContext(
ctx,
exchangeName,
shared.GetLogSeverity(os.Args),
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
},
)
shared.FailOnError(err, "publish message error")
fmt.Printf("Successful Send message [%s]\n", message)
}

func main() {
startUpAndEmit()
}
```

> 运行代码
1. 启动第一个`receive.go`,称之为`Q1`

```shell
go run topics/receive.go "*.*.rabbit" "lazy.#"
# Binding queue [amq.gen-FesQOFM2vxs9rSbf8X4bDA] to exchange [topicLogs] with routing key [*.*.rabbit]
# Binding queue [amq.gen-FesQOFM2vxs9rSbf8X4bDA] to exchange [topicLogs] with routing key [lazy.#]
```

2. 启动第二个`receive.go`,称之为`Q2`

```shell
go run topics/receive.go "#"
# Binding queue [amq.gen-A-k6dZNWLLu_ECKzetmjbg] to exchange [topicLogs] with routing key [#]
```
3. 启动第三个`receive.go`,称之为`Q3`

```shell
go run topics/receive.go "aaa"
# Binding queue [amq.gen-qvNPx8QUU102E-RodSHUyA] to exchange [topicLogs] with routing key [aaa]
```
4. 启动`emit.go`,发送消息

```shell
# 消息传递给Q2
go run topics/emit.go "quick.brown.fox" "fox is coming"
# 消息传递给Q1、Q2
go run topics/emit.go "quick.brown.rabbit" "rabbit is coming"
# 消息传递给Q2、Q3
go run topics/emit.go "aaa" "behavior is similar to direct exchange"
# 消息传递给Q1,Q2,并且只传递给Q1一次
go run topics/emit.go "lazy.blue.rabbit" "blue rabbit is coming"
# 消息传递给Q2
go run topics/emit.go "smart.block.sheep" "sheep rabbit is coming"
```

0 comments on commit a3468da

Please sign in to comment.