Skip to content

Commit

Permalink
fix: avoid duplicate sequence numbers and fix late subscriptions (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
mhabedinpour authored Dec 11, 2023
1 parent 2b1d5af commit ea67f96
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 13 deletions.
3 changes: 2 additions & 1 deletion eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (b *Bus[T]) subscribe(topic string, once bool) (<-chan T, func()) {

if _, ok := b.subscriptions[topic]; !ok {
b.subscriptions[topic] = make([]subscription[T], 0)
b.publishSequences[topic] = 1
}

sub := subscription[T]{
Expand All @@ -50,7 +51,7 @@ func (b *Bus[T]) subscribe(topic string, once bool) (<-chan T, func()) {
}
// once subscriptions don't need serializer
if !once {
sub.serializer = NewSerializer()
sub.serializer = NewSerializer(b.publishSequences[topic] - 1)
}

b.subscriptions[topic] = append(b.subscriptions[topic], sub)
Expand Down
18 changes: 16 additions & 2 deletions eventbus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestBus_Subscribe(t *testing.T) {
cA1, uA1 := bus.Subscribe(topicA)
cA2, _ := bus.Subscribe(topicA)
cB1, _ := bus.Subscribe(topicB)
a1Counter, a2Counter, b1Counter := 0, 0, 0
a1Counter, a2Counter, b1Counter, b2Counter := 0, 0, 0, 0

ctx, ctxCancel := context.WithCancel(context.Background())
defer ctxCancel()
Expand Down Expand Up @@ -54,11 +54,25 @@ func TestBus_Subscribe(t *testing.T) {
bus.Publish(topicA, eventsA[1])
bus.Publish(topicB, eventsB[0])

cB2, _ := bus.Subscribe(topicB)
go func() {
for {
select {
case <-ctx.Done():
return
case <-cB2:
b2Counter++
}
}
}()
bus.Publish(topicB, eventsB[1])

time.Sleep(1 * time.Second)

assert.Equal(t, 1, a1Counter)
assert.Equal(t, 2, a2Counter)
assert.Equal(t, 1, b1Counter)
assert.Equal(t, 2, b1Counter)
assert.Equal(t, 1, b2Counter)

assert.Equal(t, 2, len(bus.subscriptions))
assert.Equal(t, 2, len(bus.publishSequences))
Expand Down
13 changes: 10 additions & 3 deletions serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,13 @@ func (s *Serializer) Execute(cb callback, sequence uint64) {

item := s.queue[0]

if item.sequence != s.lastExecutedSequence && item.sequence != s.lastExecutedSequence+1 {
if item.sequence <= s.lastExecutedSequence {
s.queue = s.queue[1:]

continue
}

if item.sequence != s.lastExecutedSequence+1 {
break
}

Expand All @@ -58,8 +64,9 @@ func (s *Serializer) Execute(cb callback, sequence uint64) {
}

// NewSerializer creates a new serializer.
func NewSerializer() *Serializer {
func NewSerializer(lastExecutedSequence uint64) *Serializer {
return &Serializer{
queue: make([]queueItem, 0),
queue: make([]queueItem, 0),
lastExecutedSequence: lastExecutedSequence,
}
}
14 changes: 7 additions & 7 deletions serializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

func TestSerializer_Execute(t *testing.T) {
serializer := eventbus.NewSerializer()
serializer := eventbus.NewSerializer(0)
b1 := false
b2 := false
b3 := false
Expand All @@ -16,7 +16,7 @@ func TestSerializer_Execute(t *testing.T) {

serializer.Execute(func() {
b1 = true
}, 0)
}, 1)
assert.Equal(t, true, b1)
assert.Equal(t, false, b2)
assert.Equal(t, false, b3)
Expand All @@ -25,7 +25,7 @@ func TestSerializer_Execute(t *testing.T) {

serializer.Execute(func() {
b3 = true
}, 2)
}, 3)
assert.Equal(t, true, b1)
assert.Equal(t, false, b2)
assert.Equal(t, false, b3)
Expand All @@ -34,7 +34,7 @@ func TestSerializer_Execute(t *testing.T) {

serializer.Execute(func() {
b4 = true
}, 2)
}, 3)
assert.Equal(t, true, b1)
assert.Equal(t, false, b2)
assert.Equal(t, false, b3)
Expand All @@ -43,7 +43,7 @@ func TestSerializer_Execute(t *testing.T) {

serializer.Execute(func() {
b5 = true
}, 3)
}, 4)
assert.Equal(t, true, b1)
assert.Equal(t, false, b2)
assert.Equal(t, false, b3)
Expand All @@ -52,10 +52,10 @@ func TestSerializer_Execute(t *testing.T) {

serializer.Execute(func() {
b2 = true
}, 1)
}, 2)
assert.Equal(t, true, b1)
assert.Equal(t, true, b2)
assert.Equal(t, true, b3)
assert.Equal(t, false, b3)
assert.Equal(t, true, b4)
assert.Equal(t, true, b5)
}

0 comments on commit ea67f96

Please sign in to comment.