Skip to content

Commit 4bd71e7

Browse files
committed
Rewrite fan-in/out messaging patterns
1 parent 8b0943a commit 4bd71e7

File tree

4 files changed

+54
-40
lines changed

4 files changed

+54
-40
lines changed

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ __Messaging Patterns__:
7777

7878
| Pattern | Description |
7979
|:-------:| ----------- |
80-
| [Fan-In](fan/fan_in.go) | Funnels tasks to a work sink (e.g. server) |
81-
| [Fan-Out](fan/fan_out.go) | Distributes tasks amongs workers |
80+
| [Fan-In](messaging/fan_in.md) | Funnels tasks to a work sink (e.g. server) |
81+
| [Fan-Out](messaging/fan_out.md) | Distributes tasks amongs workers (e.g. producer) |
8282
| [Futures & Promises](futures_promises.go) | Acts as a place-holder of a result that is initally unknown for synchronization purposes |
8383
| [Publish/Subscribe](messaging/publish_subscribe.md) | Passes information to a collection of recipients who subscribed to a topic |
8484
| [Push & Pull](push_pull.go) | Distributes messages to multiple workers, arranged in a pipeline |

fan/fan_in.go

-33
This file was deleted.

messaging/fan_in.md

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
Fan-In Messaging Patterns
2+
===================================
3+
Fan-In is a messaging pattern used to create a funnel for work amongst workers (clients: source, server: destination).
4+
5+
We can model fan-in using the Go channels.
6+
7+
```go
8+
// Merge different channels in one channel
9+
func Merge(cs ...<-chan int) <-chan int {
10+
var wg sync.WaitGroup
11+
12+
out := make(chan int)
13+
14+
// Start an send goroutine for each input channel in cs. send
15+
// copies values from c to out until c is closed, then calls wg.Done.
16+
send := func(c <-chan int) {
17+
for n := range c {
18+
out <- n
19+
}
20+
wg.Done()
21+
}
22+
23+
wg.Add(len(cs))
24+
for _, c := range cs {
25+
go send(c)
26+
}
27+
28+
// Start a goroutine to close out once all the send goroutines are
29+
// done. This must start after the wg.Add call.
30+
go func() {
31+
wg.Wait()
32+
close(out)
33+
}()
34+
return out
35+
}
36+
```
37+
38+
The `Merge` function converts a list of channels to a single channel by starting a goroutine for each inbound channel that copies the values to the sole outbound channel.
39+
40+
Once all the output goroutines have been started, `Merge` a goroutine is started to close the main channel.

fan/fan_out.go renamed to messaging/fan_out.md

+12-5
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1-
package fan
1+
Fan-Out Messaging Pattern
2+
=========================
3+
Fan-Out is a messaging pattern used for distributing work amongst workers (producer: source, consumers: destination).
24

3-
// Out implements fan.Out messaging pattern
4-
// Split a channel into n channels that receive messages
5-
// in a round-robin fashion.
6-
func Out(ch <-chan int, n int) []<-chan int {
5+
We can model fan-out using the Go channels.
6+
7+
```go
8+
// Split a channel into n channels that receive messages in a round-robin fashion.
9+
func Split(ch <-chan int, n int) []<-chan int {
710
cs := make([]chan int)
811
for i := 0; i < n; i++ {
912
cs = append(cs, make(chan int))
@@ -38,3 +41,7 @@ func Out(ch <-chan int, n int) []<-chan int {
3841

3942
return cs
4043
}
44+
```
45+
46+
The `Split` function converts a single channel into a list of channels by using
47+
a goroutine to copy received values to channels in the list in a round-robin fashion.

0 commit comments

Comments
 (0)