Skip to content

feature: implement max log entries and replay per stream #162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 39 additions & 12 deletions event_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,66 @@
package sse

import (
"container/list"
"strconv"
"time"
)

// EventLog holds all of previous events
type EventLog []*Event
// Events holds all of previous events
type Events []*Event

// Add event to eventlog
// EventLog holds the log of all of previous events
type EventLog struct {
MaxEntries int
log *list.List
}

func newEventLog(maxEntries int) *EventLog {
return &EventLog{
MaxEntries: maxEntries,
log: list.New(),
}
}

// Add event to log
func (e *EventLog) Add(ev *Event) {
if !ev.hasContent() {
return
}

ev.ID = []byte(e.currentindex())
ev.ID = []byte(e.currentIndex())
ev.timestamp = time.Now()
*e = append(*e, ev)

// if MaxEntries is set to greater than 0 (no limit) check entries
if e.MaxEntries > 0 {
// if we are at max entries limit
// then remove the item at the back
if e.log.Len() >= e.MaxEntries {
e.log.Remove(e.log.Back())
}
}
e.log.PushFront(ev)
}

// Clear events from eventlog
// Clear events from log
func (e *EventLog) Clear() {
*e = nil
e.log.Init()
}

// Replay events to a subscriber
func (e *EventLog) Replay(s *Subscriber) {
for i := 0; i < len(*e); i++ {
id, _ := strconv.Atoi(string((*e)[i].ID))
for l := e.log.Back(); l != nil; l = l.Prev() {
id, _ := strconv.Atoi(string(l.Value.(*Event).ID))
if id >= s.eventid {
s.connection <- (*e)[i]
s.connection <- l.Value.(*Event)
}
}
}

func (e *EventLog) currentindex() string {
return strconv.Itoa(len(*e))
func (e *EventLog) currentIndex() string {
return strconv.Itoa(e.log.Len())
}

func (e *EventLog) Len() int {
return e.log.Len()
}
17 changes: 14 additions & 3 deletions event_log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,27 @@ import (
)

func TestEventLog(t *testing.T) {
ev := make(EventLog, 0)
ev := newEventLog(0)
testEvent := &Event{Data: []byte("test")}

ev.Add(testEvent)
ev.Clear()

assert.Equal(t, 0, len(ev))
assert.Equal(t, 0, ev.Len())

ev.Add(testEvent)
ev.Add(testEvent)

assert.Equal(t, 2, len(ev))
assert.Equal(t, 2, ev.Len())
}

func TestEventLogMaxEntries(t *testing.T) {
ev := newEventLog(2)
testEvent := &Event{Data: []byte("test")}

ev.Add(testEvent)
ev.Add(testEvent)
ev.Add(testEvent)

assert.Equal(t, 2, ev.Len())
}
10 changes: 8 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
module github.com/r3labs/sse/v2

go 1.13
go 1.20

require (
github.com/stretchr/testify v1.7.0
golang.org/x/net v0.0.0-20191116160921-f9c825593386 // indirect
gopkg.in/cenkalti/backoff.v1 v1.1.0
)

require (
github.com/davecgh/go-spew v1.1.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/net v0.0.0-20210610124326-52da8fb2a613 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)
7 changes: 2 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20191116160921-f9c825593386 h1:ktbWvQrW08Txdxno1PiDpSxPXG6ndGsfnJjRRtkM0LQ=
golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/net v0.0.0-20210610124326-52da8fb2a613 h1:SqvqnUCcwFhyyRueFOEFTBaWeXYwK+CL/767809IlbQ=
golang.org/x/net v0.0.0-20210610124326-52da8fb2a613/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=
gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
Expand Down
19 changes: 18 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,24 @@ func (s *Server) CreateStream(id string) *Stream {
return s.streams[id]
}

str := newStream(id, s.BufferSize, s.AutoReplay, s.AutoStream, s.OnSubscribe, s.OnUnsubscribe)
str := newStream(id, s.BufferSize, 0, s.AutoReplay, s.AutoStream, s.OnSubscribe, s.OnUnsubscribe)
str.run()

s.streams[id] = str

return str
}

// CreateStreamWithOpts will create a new stream with options and register it
func (s *Server) CreateStreamWithOpts(id string, opts StreamOpts) *Stream {
s.muStreams.Lock()
defer s.muStreams.Unlock()

if s.streams[id] != nil {
return s.streams[id]
}

str := newStream(id, s.BufferSize, opts.MaxEntries, opts.AutoReplay, opts.AutoStream, s.OnSubscribe, s.OnUnsubscribe)
str.run()

s.streams[id] = str
Expand Down
15 changes: 12 additions & 3 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type Stream struct {
register chan *Subscriber
deregister chan *Subscriber
subscribers []*Subscriber
Eventlog EventLog
Eventlog *EventLog
subscriberCount int32
// Enables replaying of eventlog to newly added subscribers
AutoReplay bool
Expand All @@ -30,8 +30,17 @@ type Stream struct {
OnUnsubscribe func(streamID string, sub *Subscriber)
}

type StreamOpts struct {
// Max amount of log entries per stream
MaxEntries int
// Enables creation of a stream when a client connects
AutoStream bool
// Enables automatic replay for each new subscriber that connects
AutoReplay bool
}

// newStream returns a new stream
func newStream(id string, buffSize int, replay, isAutoStream bool, onSubscribe, onUnsubscribe func(string, *Subscriber)) *Stream {
func newStream(id string, buffSize, maxEntries int, replay, isAutoStream bool, onSubscribe, onUnsubscribe func(string, *Subscriber)) *Stream {
return &Stream{
ID: id,
AutoReplay: replay,
Expand All @@ -41,7 +50,7 @@ func newStream(id string, buffSize int, replay, isAutoStream bool, onSubscribe,
deregister: make(chan *Subscriber),
event: make(chan *Event, buffSize),
quit: make(chan struct{}),
Eventlog: make(EventLog, 0),
Eventlog: newEventLog(maxEntries),
OnSubscribe: onSubscribe,
OnUnsubscribe: onUnsubscribe,
}
Expand Down
10 changes: 5 additions & 5 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
// Maybe fix this in the future so we can test with -race enabled

func TestStreamAddSubscriber(t *testing.T) {
s := newStream("test", 1024, true, false, nil, nil)
s := newStream("test", 1024, 0, true, false, nil, nil)
s.run()
defer s.close()

Expand All @@ -34,7 +34,7 @@ func TestStreamAddSubscriber(t *testing.T) {
}

func TestStreamRemoveSubscriber(t *testing.T) {
s := newStream("test", 1024, true, false, nil, nil)
s := newStream("test", 1024, 0, true, false, nil, nil)
s.run()
defer s.close()

Expand All @@ -47,7 +47,7 @@ func TestStreamRemoveSubscriber(t *testing.T) {
}

func TestStreamSubscriberClose(t *testing.T) {
s := newStream("test", 1024, true, false, nil, nil)
s := newStream("test", 1024, 0, true, false, nil, nil)
s.run()
defer s.close()

Expand All @@ -59,7 +59,7 @@ func TestStreamSubscriberClose(t *testing.T) {
}

func TestStreamDisableAutoReplay(t *testing.T) {
s := newStream("test", 1024, true, false, nil, nil)
s := newStream("test", 1024, 0, true, false, nil, nil)
s.run()
defer s.close()

Expand All @@ -74,7 +74,7 @@ func TestStreamDisableAutoReplay(t *testing.T) {
func TestStreamMultipleSubscribers(t *testing.T) {
var subs []*Subscriber

s := newStream("test", 1024, true, false, nil, nil)
s := newStream("test", 1024, 0, true, false, nil, nil)
s.run()

for i := 0; i < 10; i++ {
Expand Down