Skip to content

Commit ece30b4

Browse files
committed
First step towards handler modularization. Extracted CLI and Websocket handlers.
1 parent 815eaf4 commit ece30b4

File tree

6 files changed

+240
-108
lines changed

6 files changed

+240
-108
lines changed

client.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func getHostName() string {
4242
return hostname
4343
}
4444

45-
// Init initializes the client to start sending Chunks at the given address
45+
// Listen initializes the client to start sending Chunks at the given address
4646
func Init(opts ConsumerOptions, stream io.Reader, conn io.Writer) error {
4747
if opts.Id == "" {
4848
opts.Id, _ = shortid.Generate()

cmd/siph/main.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"os"
99

1010
"github.com/sergi/siphon"
11+
"github.com/sergi/siphon/handlers/cli"
12+
"github.com/sergi/siphon/handlers/websocket"
1113
"gopkg.in/alecthomas/kingpin.v2"
1214
)
1315

@@ -32,7 +34,7 @@ var (
3234
app = kingpin.New("siph", "A real-time stream utility.")
3335

3436
server = app.Command("server", "Run in Server mode")
35-
serverUDPPort = server.Flag("udp-port", "UDP port to run in Server mode").Short('u').Int()
37+
serverUDPPort = server.Flag("udp-port", "UDP port to run in Server mode").Short('u').Default("1200").Int()
3638
serverWsPort = server.Flag("ws-port", "WebSockets port to run in Server mode").Short('w').Int()
3739

3840
client = app.Command("client", "Stream output to a server")
@@ -70,9 +72,8 @@ func main() {
7072
switch kingpin.MustParse(app.Parse(os.Args[1:])) {
7173

7274
case server.FullCommand():
73-
server := siphon.NewServer(*serverUDPPort, *serverWsPort)
7475
fmt.Print(siphonTemplate)
75-
err := server.Init()
76+
_, err := siphon.NewServer(siphon.Port(*serverUDPPort), cli.New(), websocket.New())
7677
if err != nil {
7778
fmt.Println(err)
7879
os.Exit(ExitCodeError)

handlers/cli/cli.go

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package cli
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"os"
7+
"sync"
8+
"time"
9+
10+
"github.com/sergi/siphon"
11+
)
12+
13+
const pingPeriod = 5 * time.Minute
14+
15+
type Handler struct {
16+
mu sync.Mutex
17+
Writer io.Writer
18+
}
19+
20+
func New(options ...func(*Handler)) func(*siphon.Server) {
21+
return func(server *siphon.Server) {
22+
h := &Handler{
23+
Writer: os.Stdout,
24+
}
25+
26+
for _, option := range options {
27+
option(h)
28+
}
29+
30+
go h.handleConnection(server)
31+
}
32+
}
33+
34+
func (h *Handler) handleConnection(s *siphon.Server) {
35+
sub := s.Subscribe(siphon.DefaultChannel)
36+
t := time.NewTicker(pingPeriod)
37+
38+
var message []byte
39+
40+
for {
41+
select {
42+
case <-t.C:
43+
message = nil
44+
case message = <-sub:
45+
}
46+
47+
_, err := h.Writer.Write(message)
48+
if err != nil {
49+
fmt.Print(err)
50+
break
51+
}
52+
}
53+
54+
t.Stop()
55+
s.Unsubscribe(siphon.DefaultChannel, sub)
56+
}

handlers/cli/cli_test.go

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package cli
2+
3+
import (
4+
"fmt"
5+
"path/filepath"
6+
"reflect"
7+
"runtime"
8+
"testing"
9+
)
10+
11+
// assert fails the test if the condition is false.
12+
func assert(tb testing.TB, condition bool, msg string, v ...interface{}) {
13+
if !condition {
14+
_, file, line, _ := runtime.Caller(1)
15+
fmt.Printf("\033[31m%s:%d: "+msg+"\033[39m\n\n", append([]interface{}{filepath.Base(file), line}, v...)...)
16+
tb.FailNow()
17+
}
18+
}
19+
20+
// ok fails the test if an err is not nil.
21+
func ok(tb testing.TB, err error) {
22+
if err != nil {
23+
_, file, line, _ := runtime.Caller(1)
24+
fmt.Printf("\033[31m%s:%d: unexpected error: %s\033[39m\n\n", filepath.Base(file), line, err.Error())
25+
tb.FailNow()
26+
}
27+
}
28+
29+
// equals fails the test if exp is not equal to act.
30+
func equals(tb testing.TB, exp, act interface{}) {
31+
if !reflect.DeepEqual(exp, act) {
32+
_, file, line, _ := runtime.Caller(1)
33+
fmt.Printf("\033[31m%s:%d:\n\n\texp: %#v\n\n\tgot: %#v\033[39m\n\n", filepath.Base(file), line, exp, act)
34+
tb.FailNow()
35+
}
36+
}
37+
38+
func TestNewCli(t *testing.T) {
39+
40+
}

handlers/websocket/websocket.go

+99
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package websocket
2+
3+
import (
4+
"fmt"
5+
"log"
6+
"net/http"
7+
"strconv"
8+
"sync/atomic"
9+
"time"
10+
11+
"github.com/gorilla/websocket"
12+
"github.com/sergi/siphon"
13+
)
14+
15+
const defaultWsPort = 3000
16+
17+
const (
18+
maxMessageSize = 1024
19+
pingPeriod = 5 * time.Minute
20+
)
21+
22+
var upgrader = websocket.Upgrader{
23+
ReadBufferSize: 1024,
24+
WriteBufferSize: maxMessageSize,
25+
CheckOrigin: func(r *http.Request) bool { // WARNING! That's probably messed up https://stackoverflow.com/questions/33323337/update-send-the-origin-header-with-js-websockets
26+
return true
27+
},
28+
}
29+
30+
type Handler struct {
31+
connected int64
32+
failed int64
33+
wsPort int
34+
}
35+
36+
func New(options ...func(*Handler)) func(*siphon.Server) {
37+
return func(server *siphon.Server) {
38+
h := &Handler{
39+
wsPort: defaultWsPort,
40+
}
41+
42+
for _, option := range options {
43+
option(h)
44+
}
45+
46+
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
47+
ws, err := upgrader.Upgrade(w, r, nil)
48+
if err != nil {
49+
fmt.Println(err)
50+
return
51+
}
52+
// channel := r.URL.Path[1:]
53+
// launch a new goroutine so that this function can return and the http server can free up
54+
// buffers associated with this connection
55+
go h.handleConnection(ws, server)
56+
})
57+
58+
go func() {
59+
log.Println("Initializing Websocket Handler at localhost:" + strconv.Itoa(h.wsPort) + "/ws")
60+
if err := http.ListenAndServe(":"+strconv.Itoa(h.wsPort), nil); err != nil {
61+
log.Fatal(err)
62+
}
63+
}()
64+
}
65+
}
66+
67+
func Port(port int) func(*Handler) error {
68+
return func(h *Handler) error {
69+
h.wsPort = port
70+
return nil
71+
}
72+
}
73+
74+
func (h *Handler) handleConnection(ws *websocket.Conn, s *siphon.Server) {
75+
atomic.AddInt64(&h.connected, 1)
76+
sub := s.Subscribe(siphon.DefaultChannel)
77+
t := time.NewTicker(pingPeriod)
78+
79+
var message []byte
80+
81+
for {
82+
select {
83+
case <-t.C:
84+
message = nil
85+
case message = <-sub:
86+
}
87+
88+
ws.SetWriteDeadline(time.Now().Add(30 * time.Second))
89+
if err := ws.WriteMessage(websocket.TextMessage, message); err != nil {
90+
break
91+
}
92+
}
93+
atomic.AddInt64(&h.connected, -1)
94+
atomic.AddInt64(&h.failed, 1)
95+
96+
t.Stop()
97+
ws.Close()
98+
s.Unsubscribe(siphon.DefaultChannel, sub)
99+
}

0 commit comments

Comments
 (0)