From 2d153a5277395ee234275688fa49245fa13281dc Mon Sep 17 00:00:00 2001 From: ze0s Date: Sat, 20 May 2023 00:57:44 +0200 Subject: [PATCH 1/5] feat: implement max entries limit for event log --- event_log.go | 37 +++++++++++++++++++++++++++++-------- event_log_test.go | 17 ++++++++++++++--- server.go | 19 ++++++++++++++++++- stream.go | 19 ++++++++++++++++--- stream_test.go | 10 +++++----- 5 files changed, 82 insertions(+), 20 deletions(-) diff --git a/event_log.go b/event_log.go index aa17dad..02490d9 100644 --- a/event_log.go +++ b/event_log.go @@ -9,8 +9,21 @@ import ( "time" ) -// EventLog holds all of previous events -type EventLog []*Event +// Events holds all of previous events +type Events []*Event + +// EventLog holds the log of all of previous events +type EventLog struct { + MaxEntries int + Log Events +} + +func newEventLog(maxEntries int) *EventLog { + return &EventLog{ + MaxEntries: maxEntries, + Log: make(Events, 0), + } +} // Add event to eventlog func (e *EventLog) Add(ev *Event) { @@ -20,24 +33,32 @@ func (e *EventLog) Add(ev *Event) { ev.ID = []byte(e.currentindex()) ev.timestamp = time.Now() - *e = append(*e, ev) + + // if MaxEntries is greater than 0, and we are at max entries limit + // then reset the first log item and then pop it + if e.MaxEntries > 0 && len(e.Log) == e.MaxEntries { + e.Log[0] = nil + e.Log = e.Log[1:] + } + + e.Log = append(e.Log, ev) } // Clear events from eventlog func (e *EventLog) Clear() { - *e = nil + e.Log = nil } // Replay events to a subscriber func (e *EventLog) Replay(s *Subscriber) { - for i := 0; i < len(*e); i++ { - id, _ := strconv.Atoi(string((*e)[i].ID)) + for i := 0; i < len(e.Log); i++ { + id, _ := strconv.Atoi(string((e.Log)[i].ID)) if id >= s.eventid { - s.connection <- (*e)[i] + s.connection <- (e.Log)[i] } } } func (e *EventLog) currentindex() string { - return strconv.Itoa(len(*e)) + return strconv.Itoa(len(e.Log)) } diff --git a/event_log_test.go b/event_log_test.go index 66cfaf2..663582b 100644 --- a/event_log_test.go +++ b/event_log_test.go @@ -11,16 +11,27 @@ import ( ) func TestEventLog(t *testing.T) { - ev := make(EventLog, 0) + ev := newEventLog(0) testEvent := &Event{Data: []byte("test")} ev.Add(testEvent) ev.Clear() - assert.Equal(t, 0, len(ev)) + assert.Equal(t, 0, len(ev.Log)) ev.Add(testEvent) ev.Add(testEvent) - assert.Equal(t, 2, len(ev)) + assert.Equal(t, 2, len(ev.Log)) +} + +func TestEventLogMaxEntries(t *testing.T) { + ev := newEventLog(2) + testEvent := &Event{Data: []byte("test")} + + ev.Add(testEvent) + ev.Add(testEvent) + ev.Add(testEvent) + + assert.Equal(t, 2, len(ev.Log)) } diff --git a/server.go b/server.go index d1b27af..25f486c 100644 --- a/server.go +++ b/server.go @@ -82,7 +82,24 @@ func (s *Server) CreateStream(id string) *Stream { return s.streams[id] } - str := newStream(id, s.BufferSize, s.AutoReplay, s.AutoStream, s.OnSubscribe, s.OnUnsubscribe) + str := newStream(id, s.BufferSize, 0, s.AutoReplay, s.AutoStream, s.OnSubscribe, s.OnUnsubscribe) + str.run() + + s.streams[id] = str + + return str +} + +// CreateStreamWithOpts will create a new stream with options and register it +func (s *Server) CreateStreamWithOpts(id string, opts StreamOpts) *Stream { + s.muStreams.Lock() + defer s.muStreams.Unlock() + + if s.streams[id] != nil { + return s.streams[id] + } + + str := newStream(id, s.BufferSize, opts.MaxEntries, opts.AutoReplay, opts.AutoStream, opts.OnSubscribe, opts.OnUnsubscribe) str.run() s.streams[id] = str diff --git a/stream.go b/stream.go index bfbcb9b..f87d8d5 100644 --- a/stream.go +++ b/stream.go @@ -19,7 +19,7 @@ type Stream struct { register chan *Subscriber deregister chan *Subscriber subscribers []*Subscriber - Eventlog EventLog + Eventlog *EventLog subscriberCount int32 // Enables replaying of eventlog to newly added subscribers AutoReplay bool @@ -30,8 +30,21 @@ type Stream struct { OnUnsubscribe func(streamID string, sub *Subscriber) } +type StreamOpts struct { + // Max amount of log entries per stream + MaxEntries int + // Enables creation of a stream when a client connects + AutoStream bool + // Enables automatic replay for each new subscriber that connects + AutoReplay bool + + // Specifies the function to run when client subscribe or un-subscribe + OnSubscribe func(streamID string, sub *Subscriber) + OnUnsubscribe func(streamID string, sub *Subscriber) +} + // newStream returns a new stream -func newStream(id string, buffSize int, replay, isAutoStream bool, onSubscribe, onUnsubscribe func(string, *Subscriber)) *Stream { +func newStream(id string, buffSize, maxEntries int, replay, isAutoStream bool, onSubscribe, onUnsubscribe func(string, *Subscriber)) *Stream { return &Stream{ ID: id, AutoReplay: replay, @@ -41,7 +54,7 @@ func newStream(id string, buffSize int, replay, isAutoStream bool, onSubscribe, deregister: make(chan *Subscriber), event: make(chan *Event, buffSize), quit: make(chan struct{}), - Eventlog: make(EventLog, 0), + Eventlog: newEventLog(maxEntries), OnSubscribe: onSubscribe, OnUnsubscribe: onUnsubscribe, } diff --git a/stream_test.go b/stream_test.go index 1c89a6e..90366c0 100644 --- a/stream_test.go +++ b/stream_test.go @@ -16,7 +16,7 @@ import ( // Maybe fix this in the future so we can test with -race enabled func TestStreamAddSubscriber(t *testing.T) { - s := newStream("test", 1024, true, false, nil, nil) + s := newStream("test", 1024, 0, true, false, nil, nil) s.run() defer s.close() @@ -34,7 +34,7 @@ func TestStreamAddSubscriber(t *testing.T) { } func TestStreamRemoveSubscriber(t *testing.T) { - s := newStream("test", 1024, true, false, nil, nil) + s := newStream("test", 1024, 0, true, false, nil, nil) s.run() defer s.close() @@ -47,7 +47,7 @@ func TestStreamRemoveSubscriber(t *testing.T) { } func TestStreamSubscriberClose(t *testing.T) { - s := newStream("test", 1024, true, false, nil, nil) + s := newStream("test", 1024, 0, true, false, nil, nil) s.run() defer s.close() @@ -59,7 +59,7 @@ func TestStreamSubscriberClose(t *testing.T) { } func TestStreamDisableAutoReplay(t *testing.T) { - s := newStream("test", 1024, true, false, nil, nil) + s := newStream("test", 1024, 0, true, false, nil, nil) s.run() defer s.close() @@ -74,7 +74,7 @@ func TestStreamDisableAutoReplay(t *testing.T) { func TestStreamMultipleSubscribers(t *testing.T) { var subs []*Subscriber - s := newStream("test", 1024, true, false, nil, nil) + s := newStream("test", 1024, 0, true, false, nil, nil) s.run() for i := 0; i < 10; i++ { From f58b04b2c2b7d4d8700de4e7d8d23b62af1392e5 Mon Sep 17 00:00:00 2001 From: ze0s Date: Sat, 20 May 2023 13:15:22 +0200 Subject: [PATCH 2/5] feat: keep subscribe handlers from server --- server.go | 2 +- stream.go | 4 ---- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/server.go b/server.go index 25f486c..bf8efef 100644 --- a/server.go +++ b/server.go @@ -99,7 +99,7 @@ func (s *Server) CreateStreamWithOpts(id string, opts StreamOpts) *Stream { return s.streams[id] } - str := newStream(id, s.BufferSize, opts.MaxEntries, opts.AutoReplay, opts.AutoStream, opts.OnSubscribe, opts.OnUnsubscribe) + str := newStream(id, s.BufferSize, opts.MaxEntries, opts.AutoReplay, opts.AutoStream, s.OnSubscribe, s.OnUnsubscribe) str.run() s.streams[id] = str diff --git a/stream.go b/stream.go index f87d8d5..c7f3640 100644 --- a/stream.go +++ b/stream.go @@ -37,10 +37,6 @@ type StreamOpts struct { AutoStream bool // Enables automatic replay for each new subscriber that connects AutoReplay bool - - // Specifies the function to run when client subscribe or un-subscribe - OnSubscribe func(streamID string, sub *Subscriber) - OnUnsubscribe func(streamID string, sub *Subscriber) } // newStream returns a new stream From 8a8f9cbb07b3c3b19d0c4595d5577028e46de838 Mon Sep 17 00:00:00 2001 From: ze0s Date: Sat, 20 May 2023 13:23:37 +0200 Subject: [PATCH 3/5] refactor: split max entries checks --- event_log.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/event_log.go b/event_log.go index 02490d9..ccf5f13 100644 --- a/event_log.go +++ b/event_log.go @@ -34,11 +34,14 @@ func (e *EventLog) Add(ev *Event) { ev.ID = []byte(e.currentindex()) ev.timestamp = time.Now() - // if MaxEntries is greater than 0, and we are at max entries limit - // then reset the first log item and then pop it - if e.MaxEntries > 0 && len(e.Log) == e.MaxEntries { - e.Log[0] = nil - e.Log = e.Log[1:] + // if MaxEntries is set to greater than 0 (no limit) check entries + if e.MaxEntries > 0 { + // ifa we are at max entries limit + // then reset the first log item and then pop it + if len(e.Log) >= e.MaxEntries { + e.Log[0] = nil + e.Log = e.Log[1:] + } } e.Log = append(e.Log, ev) From 66ab1201482de04f33a73a08de42eed5eb49d3b9 Mon Sep 17 00:00:00 2001 From: ze0s Date: Sat, 20 May 2023 14:55:22 +0200 Subject: [PATCH 4/5] refactor: use std lib container/list as storage for event log --- event_log.go | 39 +++++++++++++++++++++------------------ event_log_test.go | 6 +++--- 2 files changed, 24 insertions(+), 21 deletions(-) diff --git a/event_log.go b/event_log.go index ccf5f13..1394ac9 100644 --- a/event_log.go +++ b/event_log.go @@ -5,6 +5,7 @@ package sse import ( + "container/list" "strconv" "time" ) @@ -15,53 +16,55 @@ type Events []*Event // EventLog holds the log of all of previous events type EventLog struct { MaxEntries int - Log Events + log *list.List } func newEventLog(maxEntries int) *EventLog { return &EventLog{ MaxEntries: maxEntries, - Log: make(Events, 0), + log: list.New(), } } -// Add event to eventlog +// Add event to log func (e *EventLog) Add(ev *Event) { if !ev.hasContent() { return } - ev.ID = []byte(e.currentindex()) + ev.ID = []byte(e.currentIndex()) ev.timestamp = time.Now() // if MaxEntries is set to greater than 0 (no limit) check entries if e.MaxEntries > 0 { - // ifa we are at max entries limit - // then reset the first log item and then pop it - if len(e.Log) >= e.MaxEntries { - e.Log[0] = nil - e.Log = e.Log[1:] + // if we are at max entries limit + // then remove the item at the back + if e.log.Len() >= e.MaxEntries { + e.log.Remove(e.log.Back()) } } - - e.Log = append(e.Log, ev) + e.log.PushFront(ev) } -// Clear events from eventlog +// Clear events from log func (e *EventLog) Clear() { - e.Log = nil + e.log.Init() } // Replay events to a subscriber func (e *EventLog) Replay(s *Subscriber) { - for i := 0; i < len(e.Log); i++ { - id, _ := strconv.Atoi(string((e.Log)[i].ID)) + for l := e.log.Back(); l != nil; l = l.Prev() { + id, _ := strconv.Atoi(string(l.Value.(*Event).ID)) if id >= s.eventid { - s.connection <- (e.Log)[i] + s.connection <- l.Value.(*Event) } } } -func (e *EventLog) currentindex() string { - return strconv.Itoa(len(e.Log)) +func (e *EventLog) currentIndex() string { + return strconv.Itoa(e.log.Len()) +} + +func (e *EventLog) Len() int { + return e.log.Len() } diff --git a/event_log_test.go b/event_log_test.go index 663582b..bc78244 100644 --- a/event_log_test.go +++ b/event_log_test.go @@ -17,12 +17,12 @@ func TestEventLog(t *testing.T) { ev.Add(testEvent) ev.Clear() - assert.Equal(t, 0, len(ev.Log)) + assert.Equal(t, 0, ev.Len()) ev.Add(testEvent) ev.Add(testEvent) - assert.Equal(t, 2, len(ev.Log)) + assert.Equal(t, 2, ev.Len()) } func TestEventLogMaxEntries(t *testing.T) { @@ -33,5 +33,5 @@ func TestEventLogMaxEntries(t *testing.T) { ev.Add(testEvent) ev.Add(testEvent) - assert.Equal(t, 2, len(ev.Log)) + assert.Equal(t, 2, ev.Len()) } From 530e06346d7d287315a50b44985095a01b0c0b30 Mon Sep 17 00:00:00 2001 From: ze0s Date: Sat, 20 May 2023 14:56:37 +0200 Subject: [PATCH 5/5] chore: update go version and deps --- go.mod | 10 ++++++++-- go.sum | 7 ++----- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index aa376f2..a648d02 100644 --- a/go.mod +++ b/go.mod @@ -1,9 +1,15 @@ module github.com/r3labs/sse/v2 -go 1.13 +go 1.20 require ( github.com/stretchr/testify v1.7.0 - golang.org/x/net v0.0.0-20191116160921-f9c825593386 // indirect gopkg.in/cenkalti/backoff.v1 v1.1.0 ) + +require ( + github.com/davecgh/go-spew v1.1.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/net v0.0.0-20210610124326-52da8fb2a613 // indirect + gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect +) diff --git a/go.sum b/go.sum index 31187d6..2042974 100644 --- a/go.sum +++ b/go.sum @@ -5,11 +5,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/net v0.0.0-20191116160921-f9c825593386 h1:ktbWvQrW08Txdxno1PiDpSxPXG6ndGsfnJjRRtkM0LQ= -golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/net v0.0.0-20210610124326-52da8fb2a613 h1:SqvqnUCcwFhyyRueFOEFTBaWeXYwK+CL/767809IlbQ= +golang.org/x/net v0.0.0-20210610124326-52da8fb2a613/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y= gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=