Skip to content

Commit aa1b501

Browse files
committed
Fix linter complains, test coverage, improve DiscardAfterThreshold API
1 parent 2c26467 commit aa1b501

4 files changed

Lines changed: 67 additions & 38 deletions

File tree

group_options.go

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@ package syncs
33
import "context"
44

55
type options struct {
6-
ctx context.Context
7-
preLock bool
8-
termOnError bool
9-
discardIfFull bool
10-
tresholdDiscard int
6+
ctx context.Context
7+
preLock bool
8+
termOnError bool
9+
discardIfFull bool
10+
tresholdSize int
1111
}
1212

1313
// GroupOption functional option type
@@ -36,18 +36,17 @@ func Discard(o *options) {
3636
o.preLock = true // discard implies preemptive
3737
}
3838

39-
// DiscardAfterTreshold works similarly to Discard, but buffers task until buffer treshold reach
40-
// For example, if 10 gouroutines are allowed and bufferTreshold is equal to 5, then 10 tasks
41-
// can run simultaneously in gouroutines and 5 tasks can be kept in buffer until gouroutines become
42-
// available.
43-
func DiscardAfterTreshold(bufferSize int) GroupOption {
39+
// DiscardAfterTreshold works similarly to Discard, but buffers tasks if all goroutines are busy
40+
// until the treshold size of 'active' tasks (i.e. executing and scheduled for execution) is achived
41+
// If this value is lower than size, it will be ignored and common Discard mode will is used
42+
func DiscardAfterTreshold(tresholdSize int) GroupOption {
4443
return func(o *options) {
4544
o.discardIfFull = true
4645
o.preLock = true
4746

48-
if bufferSize < 1 {
49-
bufferSize = 0
47+
if tresholdSize < 1 {
48+
tresholdSize = 0
5049
}
51-
o.tresholdDiscard = bufferSize
50+
o.tresholdSize = tresholdSize
5251
}
5352
}

semaphore_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,16 @@ func TestSemaphore(t *testing.T) {
4141

4242
// if number of locks are less than capacity, all should be acquired
4343
if tt.lockTimes <= tt.capacity {
44-
assert.Equal(t, int32(tt.lockTimes), atomic.LoadInt32(&locks))
44+
assert.Equal(t, tt.lockTimes, int(atomic.LoadInt32(&locks)))
4545
wg.Wait()
4646
return
4747
}
4848
// if number of locks exceed capacity, it should hang after reaching the capacity
49-
assert.Equal(t, int32(tt.capacity), atomic.LoadInt32(&locks))
49+
assert.Equal(t, tt.capacity, int(atomic.LoadInt32(&locks)))
5050
sema.Unlock()
5151
time.Sleep(10 * time.Millisecond)
5252
// after unlock, it should be able to acquire another lock
53-
assert.Equal(t, int32(tt.capacity+1), atomic.LoadInt32(&locks))
53+
assert.Equal(t, tt.capacity+1, int(atomic.LoadInt32(&locks)))
5454
wg.Wait()
5555
})
5656
}
@@ -81,7 +81,7 @@ func TestSemaphore_TryLock(t *testing.T) {
8181
}
8282

8383
// Check the acquired locks, it should not exceed capacity.
84-
assert.Equal(t, int32(tt.expectedLocks), atomic.LoadInt32(&locks))
84+
assert.Equal(t, tt.expectedLocks, int(atomic.LoadInt32(&locks)))
8585
})
8686
}
8787
}

sizedgroup.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ func NewSizedGroup(size int, opts ...GroupOption) *SizedGroup {
3030

3131
// queue size either equal to number of workers or larger, otherwise does not make sense
3232
queueSize := size
33-
if res.tresholdDiscard > 0 {
34-
queueSize += res.tresholdDiscard
33+
if res.tresholdSize > size {
34+
queueSize = res.tresholdSize
3535
}
3636

3737
res.jobQueue = make(chan func(ctx context.Context), queueSize)

sizedgroup_test.go

Lines changed: 49 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ func TestSizedGroup(t *testing.T) {
1717
var c uint32
1818

1919
for i := 0; i < 1000; i++ {
20-
swg.Go(func(ctx context.Context) {
20+
swg.Go(func(context.Context) {
2121
time.Sleep(5 * time.Millisecond)
2222
atomic.AddUint32(&c, 1)
2323
})
@@ -32,7 +32,7 @@ func TestSizedGroup_Discard(t *testing.T) {
3232
var c uint32
3333

3434
for i := 0; i < 100; i++ {
35-
swg.Go(func(ctx context.Context) {
35+
swg.Go(func(context.Context) {
3636
time.Sleep(5 * time.Millisecond)
3737
atomic.AddUint32(&c, 1)
3838
})
@@ -42,21 +42,6 @@ func TestSizedGroup_Discard(t *testing.T) {
4242
assert.Equal(t, uint32(10), c, fmt.Sprintf("%d, not all routines have been executed", c))
4343
}
4444

45-
func TestSizedGroup_DiscardAfterTreshold(t *testing.T) {
46-
swg := NewSizedGroup(10, DiscardAfterTreshold(10))
47-
var c uint32
48-
49-
for i := 0; i < 100; i++ {
50-
swg.Go(func(ctx context.Context) {
51-
time.Sleep(5 * time.Millisecond)
52-
atomic.AddUint32(&c, 1)
53-
})
54-
}
55-
assert.True(t, runtime.NumGoroutine() < 15, "goroutines %d", runtime.NumGoroutine())
56-
swg.Wait()
57-
assert.Equal(t, uint32(20), c, fmt.Sprintf("%d, wrong number of routines have been executed", c))
58-
}
59-
6045
func TestSizedGroup_Preemptive(t *testing.T) {
6146
swg := NewSizedGroup(10, Preemptive)
6247
var c uint32
@@ -79,7 +64,7 @@ func TestSizedGroup_Canceled(t *testing.T) {
7964
var c uint32
8065

8166
for i := 0; i < 100; i++ {
82-
swg.Go(func(ctx context.Context) {
67+
swg.Go(func(context.Context) {
8368
select {
8469
case <-ctx.Done():
8570
return
@@ -92,14 +77,59 @@ func TestSizedGroup_Canceled(t *testing.T) {
9277
assert.True(t, c < 100)
9378
}
9479

80+
func TestSizedGroup_DiscardAfterTreshold(t *testing.T) {
81+
swg := NewSizedGroup(10, DiscardAfterTreshold(20))
82+
var c uint32
83+
84+
for i := 0; i < 100; i++ {
85+
swg.Go(func(context.Context) {
86+
time.Sleep(5 * time.Millisecond)
87+
atomic.AddUint32(&c, 1)
88+
})
89+
}
90+
assert.True(t, runtime.NumGoroutine() < 15, "goroutines %d", runtime.NumGoroutine())
91+
swg.Wait()
92+
assert.Equal(t, uint32(20), c, fmt.Sprintf("%d, wrong number of routines have been executed", c))
93+
}
94+
95+
func TestSizedGroup_DiscardAfterTreshold_WithNegativeTreshold(t *testing.T) {
96+
swg := NewSizedGroup(10, DiscardAfterTreshold(-1))
97+
var c uint32
98+
99+
for i := 0; i < 100; i++ {
100+
swg.Go(func(context.Context) {
101+
time.Sleep(5 * time.Millisecond)
102+
atomic.AddUint32(&c, 1)
103+
})
104+
}
105+
assert.True(t, runtime.NumGoroutine() < 15, "goroutines %d", runtime.NumGoroutine())
106+
swg.Wait()
107+
assert.Equal(t, uint32(10), c, fmt.Sprintf("%d, wrong number of routines have been executed", c))
108+
}
109+
110+
func TestSizedGroup_DiscardAfterTreshold_WithTresholdNotAboveSize(t *testing.T) {
111+
swg := NewSizedGroup(10, DiscardAfterTreshold(10))
112+
var c uint32
113+
114+
for i := 0; i < 100; i++ {
115+
swg.Go(func(context.Context) {
116+
time.Sleep(5 * time.Millisecond)
117+
atomic.AddUint32(&c, 1)
118+
})
119+
}
120+
assert.True(t, runtime.NumGoroutine() < 15, "goroutines %d", runtime.NumGoroutine())
121+
swg.Wait()
122+
assert.Equal(t, uint32(10), c, fmt.Sprintf("%d, wrong number of routines have been executed", c))
123+
}
124+
95125
// illustrates the use of a SizedGroup for concurrent, limited execution of goroutines.
96126
func ExampleSizedGroup_go() {
97127

98128
grp := NewSizedGroup(10) // create sized waiting group allowing maximum 10 goroutines
99129

100130
var c uint32
101131
for i := 0; i < 1000; i++ {
102-
grp.Go(func(ctx context.Context) { // Go call is non-blocking, like regular go statement
132+
grp.Go(func(context.Context) { // Go call is non-blocking, like regular go statement
103133
// do some work in 10 goroutines in parallel
104134
atomic.AddUint32(&c, 1)
105135
time.Sleep(10 * time.Millisecond)

0 commit comments

Comments
 (0)