-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbroadcast.go
133 lines (120 loc) · 3.19 KB
/
broadcast.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package main
import "time"
// BroadcastMsg is WHAT is broadcast
type BroadcastMsg struct {
CreateTime time.Time
Msg string
}
// BroadcastListener is used by the Broadcaster to track listeners
type BroadcastListener struct {
lastMsg int
Delivery chan BroadcastMsg
response chan bool
}
// Broadcaster is a concurrent-safe broadcast q
type Broadcaster struct {
incoming chan BroadcastMsg
newListener chan *BroadcastListener
quit chan int
shutdown chan int
destinations []*BroadcastListener // Everyone who needs a message
msgs []BroadcastMsg // All messages that need to be sent
}
// NewBroadcaster returns a Broacaster ready to go (when someone calls Start)
func NewBroadcaster() *Broadcaster {
return &Broadcaster{
incoming: make(chan BroadcastMsg),
newListener: make(chan *BroadcastListener),
quit: make(chan int),
shutdown: make(chan int),
destinations: make([]*BroadcastListener, 0, 8),
msgs: make([]BroadcastMsg, 0, 32),
}
}
// Start the broadcast process
func (b *Broadcaster) Start() error {
// Our actual logic to broadcast all messages to all listeners
handleMsgs := func() {
for _, listener := range b.destinations {
if listener.Delivery == nil {
continue // listener is dead
}
if len(b.msgs) < 1 || listener.lastMsg >= len(b.msgs)-1 {
continue // Nothing to deliver for listener
}
startIdx := listener.lastMsg + 1
for i, msg := range b.msgs[startIdx:] {
listener.Delivery <- msg
listener.lastMsg = startIdx + i
if !<-listener.response {
// Listener wants to quit
close(listener.Delivery)
listener.Delivery = nil
break
}
}
}
}
// The loop for broadcasting
go func() {
// Clean up all channels
defer func() {
for _, listener := range b.destinations {
if listener.Delivery != nil {
close(listener.Delivery)
listener.Delivery = nil
}
}
}()
defer close(b.incoming)
defer close(b.newListener)
defer close(b.shutdown) // This is how they known we're done
// Quit, receive a new listener, or receive a new msg
for {
select {
case msg := <-b.incoming:
b.msgs = append(b.msgs, msg)
handleMsgs()
case listener := <-b.newListener:
b.destinations = append(b.destinations, listener)
handleMsgs()
case <-b.quit:
return
}
}
}()
return nil
}
// Kill the broadcast process. Once this function returns, the broadcaster is
// stopped and can NOT be reused
func (b *Broadcaster) Kill() error {
close(b.quit)
<-b.shutdown
return nil
}
// GetListener returns a listener channel that receives BroadcastMsg objects
func (b *Broadcaster) GetListener() *BroadcastListener {
list := &BroadcastListener{
lastMsg: -1,
Delivery: make(chan BroadcastMsg),
response: make(chan bool),
}
b.newListener <- list
return list
}
// Send a BroadcastMsg to all listeners
func (b *Broadcaster) Send(msg string) error {
newMsg := BroadcastMsg{
CreateTime: time.Now().Local(),
Msg: msg,
}
b.incoming <- newMsg
return nil
}
// Respond is called after Listen to continue receiving (or not)
func (li *BroadcastListener) Respond(continueRecv bool) {
li.response <- continueRecv
if !continueRecv {
close(li.response)
}
}