-
Notifications
You must be signed in to change notification settings - Fork 3
/
pubsub.go
143 lines (121 loc) · 3.24 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package magina
import (
"fmt"
"strings"
"github.com/streadway/amqp"
)
// PubSubExchanger is the exchange in RabbitMQ for publish & subscribe.
type PubSubExchanger struct {
TopicQueue map[string]string
TopicChan map[string]chan ExchangeMessage
MessageIds MessageIds
Channel *amqp.Channel
}
// NewPubSubExchanger creates a new exchange for pubsub.
func NewPubSubExchanger(channel *amqp.Channel) *PubSubExchanger {
return &PubSubExchanger{
Channel: channel,
}
}
// Init exchange
func (pubsub *PubSubExchanger) Init() error {
if pubsub.TopicQueue == nil {
pubsub.TopicQueue = make(map[string]string)
}
if pubsub.TopicChan == nil {
pubsub.TopicChan = make(map[string]chan ExchangeMessage)
}
return pubsub.Channel.ExchangeDeclare(
defaultPubsubExchange, // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
}
func (pubsub *PubSubExchanger) convMQTTTopic2AMQP(topic string) string {
return strings.Replace(strings.Replace(topic, "/", ".", -1), "+", "*", -1)
}
func (pubsub *PubSubExchanger) convAMQPopic2MQTT(topic string) string {
return strings.Replace(strings.Replace(topic, ".", "/", -1), "*", "+", -1)
}
// Publish a massage
func (pubsub *PubSubExchanger) Publish(msg ExchangeMessage) error {
if pubsub.Channel == nil {
return fmt.Errorf("client channel not ready")
}
err := pubsub.Channel.Publish(defaultPubsubExchange,
pubsub.convMQTTTopic2AMQP(msg.Topic), false, false,
amqp.Publishing{
ContentType: "text/plain",
Body: msg.Payload,
})
return err
}
// Subscribe topic
func (pubsub *PubSubExchanger) Subscribe(topic string) (chan ExchangeMessage, error) {
if pubsub.Channel == nil {
return nil, fmt.Errorf("client channel not ready")
}
q, err := pubsub.Channel.QueueDeclare(
"", // name
true, // durable
true, // delete when usused
true, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, err
}
err = pubsub.Channel.QueueBind(
q.Name, // queue name
pubsub.convMQTTTopic2AMQP(topic), // routing key
defaultPubsubExchange, // exchange
false,
nil,
)
if err != nil {
return nil, err
}
pubsub.TopicQueue[topic] = q.Name
msgs, err := pubsub.Channel.Consume(
q.Name, // queue
"", // consumer
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
if err != nil {
return nil, err
}
msgChan := make(chan ExchangeMessage)
pubsub.TopicChan[topic] = msgChan
go func() {
for d := range msgs {
msgChan <- ExchangeMessage{pubsub.convAMQPopic2MQTT(d.RoutingKey), d.Body}
}
close(msgChan)
}()
return pubsub.TopicChan[topic], nil
}
// Unsubscribe topic
func (pubsub *PubSubExchanger) Unsubscribe(topic string) error {
if pubsub.Channel == nil {
return fmt.Errorf("client channel not ready")
}
if queueName, exist := pubsub.TopicQueue[topic]; exist {
err := pubsub.Channel.QueueUnbind(queueName, pubsub.convMQTTTopic2AMQP(topic), defaultPubsubExchange, nil)
delete(pubsub.TopicQueue, topic)
close(pubsub.TopicChan[topic])
delete(pubsub.TopicChan, topic)
if err != nil {
return err
}
}
return nil
}