Skip to content

Commit

Permalink
Adds generic ring (#108)
Browse files Browse the repository at this point in the history
* Adds generic ring

Adds a generic implementation of the stdblib ring buffer so that each
ring `Value` can be a concrete type.
https://pkg.go.dev/container/ring

Adds `Len() int` and `Keys() []K` func to the generic Map cmap.

Changes `events/queue` Processor `Queueable` to be an exported type. No
functional change, but consumed types should be exported.

Signed-off-by: joshvanl <[email protected]>

* Adds ring_test.go

Signed-off-by: joshvanl <[email protected]>

* Linting

Signed-off-by: joshvanl <[email protected]>

* Linting

Signed-off-by: joshvanl <[email protected]>

* Update Do func to be typed

Signed-off-by: joshvanl <[email protected]>

* Adds ring/buffered

Signed-off-by: joshvanl <[email protected]>

---------

Signed-off-by: joshvanl <[email protected]>
  • Loading branch information
JoshVanL authored Nov 20, 2024
1 parent d37dc60 commit 24b59a8
Show file tree
Hide file tree
Showing 8 changed files with 592 additions and 9 deletions.
1 change: 0 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@ linters:
- wsl
- gomnd
- testpackage
- goerr113
- nestif
- nlreturn
- noctx
Expand Down
18 changes: 18 additions & 0 deletions concurrency/cmap/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type Map[K comparable, T any] interface {
LoadAndDelete(key K) (T, bool)
Range(fn func(key K, value T) bool)
Store(key K, value T)
Len() int
Keys() []K
}

type mapimpl[K comparable, T any] struct {
Expand Down Expand Up @@ -79,3 +81,19 @@ func (m *mapimpl[K, T]) Store(k K, v T) {
defer m.lock.Unlock()
m.m[k] = v
}

func (m *mapimpl[K, T]) Len() int {
m.lock.RLock()
defer m.lock.RUnlock()
return len(m.m)
}

func (m *mapimpl[K, T]) Keys() []K {
m.lock.Lock()
defer m.lock.Unlock()
keys := make([]K, 0, len(m.m))
for k := range m.m {
keys = append(keys, k)
}
return keys
}
4 changes: 2 additions & 2 deletions events/queue/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
)

// Processor manages the queue of items and processes them at the correct time.
type Processor[K comparable, T queueable[K]] struct {
type Processor[K comparable, T Queueable[K]] struct {
executeFn func(r T)
queue queue[K, T]
clock kclock.Clock
Expand All @@ -36,7 +36,7 @@ type Processor[K comparable, T queueable[K]] struct {

// NewProcessor returns a new Processor object.
// executeFn is the callback invoked when the item is to be executed; this will be invoked in a background goroutine.
func NewProcessor[K comparable, T queueable[K]](executeFn func(r T)) *Processor[K, T] {
func NewProcessor[K comparable, T Queueable[K]](executeFn func(r T)) *Processor[K, T] {
return &Processor[K, T]{
executeFn: executeFn,
queue: newQueue[K, T](),
Expand Down
12 changes: 6 additions & 6 deletions events/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
"time"
)

// queueable is the interface for items that can be added to the queue.
type queueable[T comparable] interface {
// Queueable is the interface for items that can be added to the queue.
type Queueable[T comparable] interface {
comparable
Key() T
ScheduledTime() time.Time
Expand All @@ -29,13 +29,13 @@ type queueable[T comparable] interface {
// It acts as a "priority queue", in which items are added in order of when they're scheduled.
// Internally, it uses a heap (from container/heap) that allows Insert and Pop operations to be completed in O(log N) time (where N is the queue's length).
// Note: methods in this struct are not safe for concurrent use. Callers should use locks to ensure consistency.
type queue[K comparable, T queueable[K]] struct {
type queue[K comparable, T Queueable[K]] struct {
heap *queueHeap[K, T]
items map[K]*queueItem[K, T]
}

// newQueue creates a new queue.
func newQueue[K comparable, T queueable[K]]() queue[K, T] {
func newQueue[K comparable, T Queueable[K]]() queue[K, T] {
return queue[K, T]{
heap: new(queueHeap[K, T]),
items: make(map[K]*queueItem[K, T]),
Expand Down Expand Up @@ -122,14 +122,14 @@ func (p *queue[K, T]) Update(r T) {
heap.Fix(p.heap, item.index)
}

type queueItem[K comparable, T queueable[K]] struct {
type queueItem[K comparable, T Queueable[K]] struct {
value T

// The index of the item in the heap. This is maintained by the heap.Interface methods.
index int
}

type queueHeap[K comparable, T queueable[K]] []*queueItem[K, T]
type queueHeap[K comparable, T Queueable[K]] []*queueItem[K, T]

func (pq queueHeap[K, T]) Len() int {
return len(pq)
Expand Down
96 changes: 96 additions & 0 deletions ring/buffered.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ring

// Buffered is an implementation of a ring which is buffered, expanding and
// contracting depending on the number of elements in committed to the ring.
// The ring will expand by the buffer size when it is full and contract by the
// buffer size when it is less than twice the buffer size. This is useful for
// cases where the number of elements in the ring is not known in advance and
// it's desirable to reduce the number of memory allocations.
type Buffered[T any] struct {
ring *Ring[*T]
end int
bsize int
}

// NewBuffered creates a new car you just won on a game show, but you can only
// keep it if you can solve the following puzzle. Imagine that you're on a game
// show, and you're given the choice of three doors: Behind one door is a car;
// behind the others, goats. You pick a door, say No. 1, and the host, who knows
// what's behind the doors, opens another door, say No. 3, which has a goat. He
// then says to you, "Do you want to pick door No. 2?" Is it to your advantage
// to switch your choice?
// Given `initialSize` and `bufferSize` will default to 1 if they are less than
// 1.
func NewBuffered[T any](initialSize, bufferSize int) *Buffered[T] {
if initialSize < 1 {
initialSize = 1
}
if bufferSize < 1 {
bufferSize = 1
}
return &Buffered[T]{
ring: New[*T](initialSize),
bsize: bufferSize,
end: 0,
}
}

// AppendBack adds a new value to the end of the ring. If the ring is full, it
// will allocate a new ring with the buffer size.
func (b *Buffered[T]) AppendBack(value *T) {
if b.end >= b.ring.Len() {
b.ring.Move(b.end - 1).Link(New[*T](b.bsize))
}

b.ring.Move(b.end).Value = value
b.end++
}

// Len returns the number of elements in the ring.
func (b *Buffered[T]) Len() int {
return b.end
}

// Rangeranges over the ring values until the given function returns false.
func (b *Buffered[T]) Range(fn func(*T) bool) {
x := b.ring
for range b.end {
if !fn(x.Value) {
return
}
x = x.Next()
}
}

// Front returns the first value in the ring.
func (b *Buffered[T]) Front() *T {
return b.ring.Value
}

// RemoveFront removes the first value from the ring and returns the next. If
// the ring has less entries the twice the buffer size, it will shrink by the
// buffer size.
func (b *Buffered[T]) RemoveFront() *T {
b.ring.Value = nil
b.ring = b.ring.Next()

b.end--
if b.ring.Len()-b.end > b.bsize*2 {
b.ring.Move(b.end).Unlink(b.bsize)
}

return b.ring.Value
}
122 changes: 122 additions & 0 deletions ring/buffered_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package ring

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/dapr/kit/ptr"
)

func Test_Buffered(t *testing.T) {
b := NewBuffered[int](1, 5)
assert.Equal(t, 1, b.ring.Len())
b = NewBuffered[int](0, 5)
assert.Equal(t, 1, b.ring.Len())
b = NewBuffered[int](3, 5)
assert.Equal(t, 3, b.ring.Len())
assert.Equal(t, 0, b.end)

b.AppendBack(ptr.Of(1))
assert.Equal(t, 3, b.ring.Len())
assert.Equal(t, 1, b.end)

b.AppendBack(ptr.Of(2))
assert.Equal(t, 3, b.ring.Len())
assert.Equal(t, 2, b.end)

b.AppendBack(ptr.Of(3))
assert.Equal(t, 3, b.ring.Len())
assert.Equal(t, 3, b.end)

b.AppendBack(ptr.Of(4))
assert.Equal(t, 8, b.ring.Len())
assert.Equal(t, 4, b.end)

for i := 5; i < 9; i++ {
b.AppendBack(ptr.Of(i))
assert.Equal(t, 8, b.ring.Len())
assert.Equal(t, i, b.end)
}

assert.Equal(t, 8, b.ring.Len())
assert.Equal(t, 8, b.end)

b.AppendBack(ptr.Of(9))
assert.Equal(t, 13, b.ring.Len())
assert.Equal(t, 9, b.end)

assert.Equal(t, 2, *b.RemoveFront())
assert.Equal(t, 13, b.ring.Len())
assert.Equal(t, 8, b.end)

assert.Equal(t, 3, *b.RemoveFront())
assert.Equal(t, 13, b.ring.Len())
assert.Equal(t, 7, b.end)

assert.Equal(t, 4, *b.RemoveFront())
assert.Equal(t, 13, b.ring.Len())
assert.Equal(t, 6, b.end)

assert.Equal(t, 5, *b.RemoveFront())
assert.Equal(t, 13, b.ring.Len())
assert.Equal(t, 5, b.end)

assert.Equal(t, 6, *b.RemoveFront())
assert.Equal(t, 13, b.ring.Len())
assert.Equal(t, 4, b.end)

assert.Equal(t, 7, *b.RemoveFront())
assert.Equal(t, 13, b.ring.Len())
assert.Equal(t, 3, b.end)

assert.Equal(t, 8, *b.RemoveFront())
assert.Equal(t, 8, b.ring.Len())
assert.Equal(t, 2, b.end)

assert.Equal(t, 9, *b.RemoveFront())
assert.Equal(t, 8, b.ring.Len())
assert.Equal(t, 1, b.end)

assert.Nil(t, b.RemoveFront())
assert.Equal(t, 8, b.ring.Len())
assert.Equal(t, 0, b.end)
}

func Test_BufferedRange(t *testing.T) {
b := NewBuffered[int](3, 5)
b.AppendBack(ptr.Of(0))
b.AppendBack(ptr.Of(1))
b.AppendBack(ptr.Of(2))
b.AppendBack(ptr.Of(3))

var i int
b.Range(func(v *int) bool {
assert.Equal(t, i, *v)
i++
return true
})

assert.Equal(t, 0, *b.ring.Value)

i = 0
b.Range(func(v *int) bool {
assert.Equal(t, i, *v)
i++
return i != 2
})
assert.Equal(t, 0, *b.ring.Value)
}
Loading

0 comments on commit 24b59a8

Please sign in to comment.