-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathtopic.go
93 lines (77 loc) · 2.05 KB
/
topic.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package client
import "context"
// Topic is a accessor to a server topic
type Topic struct {
ID string
s service
}
func newTopic(id string, s service) *Topic {
return &Topic{
ID: id,
s: s,
}
}
// ByTopicID implements sort.Interface for the Topic.id
type ByTopicID []*Topic
func (a ByTopicID) Len() int { return len(a) }
func (a ByTopicID) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByTopicID) Less(i, j int) bool { return a[i].ID < a[j].ID }
// PublishResult is a result for publish message
type PublishResult struct {
done chan struct{}
msgID string
err error
}
// Get returns msgID and error
func (p *PublishResult) Get(ctx context.Context) (string, error) {
// return result if already close done channel
select {
case <-p.done:
return p.msgID, p.err
default:
}
// waiting receive done channel and context channel
select {
case <-p.done:
return p.msgID, p.err
case <-ctx.Done():
return "", p.err
}
}
// Exists return whether the topic exists on the server.
func (t *Topic) Exists(ctx context.Context) (bool, error) {
return t.s.topicExists(ctx, t.ID)
}
// Delete deletes the topic
func (t *Topic) Delete(ctx context.Context) error {
return t.s.deleteTopic(ctx, t.ID)
}
// Subscriptions returns subscription list matched topic
func (t *Topic) Subscriptions(ctx context.Context) ([]*Subscription, error) {
subIDs, err := t.s.listTopicSubscriptions(ctx, t.ID)
if err != nil {
return nil, err
}
subs := []*Subscription{}
for _, id := range subIDs {
subs = append(subs, newSubscription(id, t.s))
}
return subs, nil
}
// Publish asynchronously send message, and return immediate PublishResult
func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult {
pr := &PublishResult{
done: make(chan struct{}),
}
go func() {
msgID, err := t.s.publishMessages(ctx, t.ID, msg)
pr.msgID = msgID
pr.err = err
close(pr.done)
}()
return pr
}
// StatsDetail returns stats detail of the Topic
func (t *Topic) StatsDetail(ctx context.Context) ([]byte, error) {
return t.s.statsTopicDetail(ctx, t.ID)
}