From 167143e38d488da9cfd218b0d5cf12b0dc8e54f2 Mon Sep 17 00:00:00 2001 From: Kane Date: Tue, 11 Apr 2017 00:46:49 -0700 Subject: [PATCH 1/3] remember last error and keep internal buffer of last N errors. fixed unreliable timeout test --- circuitbreaker.go | 71 +++++++++++++++++++++++++++++-------- circuitbreaker_test.go | 79 ++++++++++++++++++++++++++++++++++-------- 2 files changed, 121 insertions(+), 29 deletions(-) diff --git a/circuitbreaker.go b/circuitbreaker.go index 620cd94..96aeaa0 100644 --- a/circuitbreaker.go +++ b/circuitbreaker.go @@ -31,6 +31,7 @@ package circuit import ( + "container/ring" "context" "errors" "sync" @@ -75,12 +76,14 @@ const ( var ( defaultInitialBackOffInterval = 500 * time.Millisecond defaultBackoffMaxElapsedTime = 0 * time.Second + defaultErrorHistoryDepth = 10 ) // Error codes returned by Call var ( - ErrBreakerOpen = errors.New("breaker open") - ErrBreakerTimeout = errors.New("breaker time out") + ErrBreakerOpen = errors.New("breaker open") + ErrBreakerTimeout = errors.New("breaker time out") + ErrBreakerNoErrorRecorded = errors.New("no error in breaker history") ) // TripFunc is a function called by a Breaker's Fail() function and determines whether @@ -115,15 +118,19 @@ type Breaker struct { eventReceivers []chan BreakerEvent listeners []chan ListenerEvent backoffLock sync.Mutex + + //ring buffer for last N errors + errorsBuffer *ring.Ring } // Options holds breaker configuration options. type Options struct { - BackOff backoff.BackOff - Clock clock.Clock - ShouldTrip TripFunc - WindowTime time.Duration - WindowBuckets int + BackOff backoff.BackOff + Clock clock.Clock + ShouldTrip TripFunc + WindowTime time.Duration + WindowBuckets int + ErrorHistoryDepth int } // NewBreakerWithOptions creates a base breaker with a specified backoff, clock and TripFunc @@ -153,12 +160,17 @@ func NewBreakerWithOptions(options *Options) *Breaker { options.WindowBuckets = DefaultWindowBuckets } + if options.ErrorHistoryDepth <= 0 { + options.ErrorHistoryDepth = defaultErrorHistoryDepth + } + return &Breaker{ - BackOff: options.BackOff, - Clock: options.Clock, - ShouldTrip: options.ShouldTrip, - nextBackOff: options.BackOff.NextBackOff(), - counts: newWindow(options.WindowTime, options.WindowBuckets), + BackOff: options.BackOff, + Clock: options.Clock, + ShouldTrip: options.ShouldTrip, + nextBackOff: options.BackOff.NextBackOff(), + counts: newWindow(options.WindowTime, options.WindowBuckets), + errorsBuffer: ring.New(options.ErrorHistoryDepth), } } @@ -293,6 +305,35 @@ func (cb *Breaker) Fail() { } } +// FailWithError is the same as Fail, but keeps history of errors in internal ring buffer +func (cb *Breaker) FailWithError(err error) { + cb.errorsBuffer = cb.errorsBuffer.Next() + cb.errorsBuffer.Value = err + cb.Fail() +} + +// LastError returns last error from internal buffer +func (cb *Breaker) LastError() error { + if cb.errorsBuffer.Value == nil { + return ErrBreakerNoErrorRecorded + } + return cb.errorsBuffer.Value.(error) +} + +// Errors returns all errors from internal buffer +func (cb *Breaker) Errors() (errors []error) { + // reserve capacity to move last error to the end of slice without realloc + errors = make([]error, 0, cb.errorsBuffer.Len()+1) + cb.errorsBuffer.Do(func(x interface{}) { + if x != nil { + errors = append(errors, x.(error)) + } + }) + // move last error to the end + errors = append(errors[1:], errors[0]) + return errors +} + // Success is used to indicate a success condition the Breaker should record. If // the success was triggered by a retry attempt, the breaker will be Reset(). func (cb *Breaker) Success() { @@ -302,7 +343,9 @@ func (cb *Breaker) Success() { cb.backoffLock.Unlock() state := cb.state() - if state == halfopen { + // if state was halfopen and it's successful request this state will be `open`. + // due to cb.halfOpens is 1 at this point (request grouping) + if state == halfopen || state == open { cb.Reset() } atomic.StoreInt64(&cb.consecFailures, 0) @@ -362,7 +405,7 @@ func (cb *Breaker) CallContext(ctx context.Context, circuit func() error, timeou if err != nil { if ctx.Err() != context.Canceled { - cb.Fail() + cb.FailWithError(err) } return err } diff --git a/circuitbreaker_test.go b/circuitbreaker_test.go index 4bf9305..60938e6 100644 --- a/circuitbreaker_test.go +++ b/circuitbreaker_test.go @@ -3,7 +3,7 @@ package circuit import ( "context" "fmt" - "sync/atomic" + "runtime" "testing" "time" @@ -253,6 +253,52 @@ func TestThresholdBreakerCalling(t *testing.T) { } } +func TestThresholdBreakerErrorHistory(t *testing.T) { + cb := NewThresholdBreaker(2) + err := fmt.Errorf("error 1") + cb.FailWithError(err) + if cb.LastError() != err { + t.Fatal("expected last error to be `error 1`") + } + + cb = NewThresholdBreaker(1) + if cb.LastError() != nil { + t.Fatalf("expected last error to be `nil`, got %s", cb.LastError()) + } + + err = cb.Call(func() error { + return fmt.Errorf("circuit error") + }, 0) + if err == nil { + t.Fatal("expected threshold breaker to error") + } + if !cb.Tripped() { + t.Fatal("expected threshold breaker to be open") + } + if cb.LastError().Error() != "circuit error" { + t.Fatalf("expected last error to be `circut error`, got %s", cb.LastError()) + } + + cb.Success() + cb.Call(func() error { + return fmt.Errorf("circuit error 1") + }, 0) + if cb.LastError().Error() != "circuit error 1" { + t.Fatalf("expected last error to be `circut error 1`, got %s", cb.LastError()) + } + + errs := cb.Errors() + if len(errs) != 2 { + t.Fatalf("expected `%d` errors, got %d", 2, len(errs)) + } + if errs[0].Error() != "circuit error" { + t.Fatalf("expected `%s` error, got %s", "circuit error", errs[0].Error()) + } + if errs[1].Error() != "circuit error 1" { + t.Fatalf("expected `%s` error, got %s", "circuit error 1", errs[0].Error()) + } +} + func TestThresholdBreakerCallingContext(t *testing.T) { circuit := func() error { return fmt.Errorf("error") @@ -323,15 +369,10 @@ func TestThresholdBreakerResets(t *testing.T) { } func TestTimeoutBreaker(t *testing.T) { - wait := make(chan struct{}) - c := clock.NewMock() - called := int32(0) circuit := func() error { - wait <- struct{}{} - atomic.AddInt32(&called, 1) - <-wait + time.Sleep(100000000 * time.Millisecond) return nil } @@ -339,21 +380,29 @@ func TestTimeoutBreaker(t *testing.T) { cb.Clock = c errc := make(chan error) - go func() { errc <- cb.Call(circuit, time.Millisecond) }() - + wait := make(chan struct{}) + go func() { wait <- struct{}{}; errc <- cb.Call(circuit, time.Millisecond) }() <-wait - c.Add(time.Millisecond * 3) - wait <- struct{}{} + // yield and advance the clock + runtime.Gosched() + c.Add(time.Millisecond * 1000) err := <-errc - if err == nil { - t.Fatal("expected timeout breaker to return an error") + if err != ErrBreakerTimeout { + t.Fatalf("expected timeout breaker to return an error `%s`, got %s", ErrBreakerTimeout, err) } - go cb.Call(circuit, time.Millisecond) + cb.Clock = clock.NewMock() + go func() { wait <- struct{}{}; errc <- cb.Call(circuit, time.Millisecond) }() <-wait + // yield and advance the clock + runtime.Gosched() c.Add(time.Millisecond * 3) - wait <- struct{}{} + + err = <-errc + if err != ErrBreakerOpen { + t.Fatalf("expected timeout breaker to return an error `%s`, got %s", ErrBreakerOpen, err) + } if !cb.Tripped() { t.Fatal("expected timeout breaker to be open") From b584fb2bd8c30a46aaf3685ce62524a2fdec3c25 Mon Sep 17 00:00:00 2001 From: Kane Date: Wed, 3 May 2017 17:21:11 -0700 Subject: [PATCH 2/3] fix data race --- circuitbreaker.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/circuitbreaker.go b/circuitbreaker.go index 96aeaa0..e2ca98d 100644 --- a/circuitbreaker.go +++ b/circuitbreaker.go @@ -121,6 +121,8 @@ type Breaker struct { //ring buffer for last N errors errorsBuffer *ring.Ring + // errorsBufferLock used to prevent race accessing errorsBuffer (RWMutex is slower and atomic.Value same performance) + errorsBufferLock sync.Mutex } // Options holds breaker configuration options. @@ -307,6 +309,9 @@ func (cb *Breaker) Fail() { // FailWithError is the same as Fail, but keeps history of errors in internal ring buffer func (cb *Breaker) FailWithError(err error) { + cb.errorsBufferLock.Lock() + defer cb.errorsBufferLock.Unlock() + cb.errorsBuffer = cb.errorsBuffer.Next() cb.errorsBuffer.Value = err cb.Fail() @@ -314,6 +319,9 @@ func (cb *Breaker) FailWithError(err error) { // LastError returns last error from internal buffer func (cb *Breaker) LastError() error { + cb.errorsBufferLock.Lock() + defer cb.errorsBufferLock.Unlock() + if cb.errorsBuffer.Value == nil { return ErrBreakerNoErrorRecorded } @@ -322,6 +330,9 @@ func (cb *Breaker) LastError() error { // Errors returns all errors from internal buffer func (cb *Breaker) Errors() (errors []error) { + cb.errorsBufferLock.Lock() + defer cb.errorsBufferLock.Unlock() + // reserve capacity to move last error to the end of slice without realloc errors = make([]error, 0, cb.errorsBuffer.Len()+1) cb.errorsBuffer.Do(func(x interface{}) { From 64c7f9a8d132ec5c186679c9323430f3fb824e1a Mon Sep 17 00:00:00 2001 From: Sam Nguyen <118077+dtjm@users.noreply.github.com> Date: Mon, 6 Nov 2017 10:52:41 -0800 Subject: [PATCH 3/3] Add test for race conditions (#1) --- circuitbreaker_test.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/circuitbreaker_test.go b/circuitbreaker_test.go index 60938e6..9c16bb9 100644 --- a/circuitbreaker_test.go +++ b/circuitbreaker_test.go @@ -2,8 +2,10 @@ package circuit import ( "context" + "errors" "fmt" "runtime" + "sync" "testing" "time" @@ -523,3 +525,20 @@ func TestPartialSecondBackoff(t *testing.T) { t.Fatalf("expected breaker to be ready after more than nextBackoff time had passed") } } + +// TestGoroutineSafe verifies that the circuit breaker can be used concurrently +// without race conditions +func TestGoroutineSafe(t *testing.T) { + cb := NewBreaker() + wg := sync.WaitGroup{} + for i := 0; i < 2; i++ { + wg.Add(1) + go func() { + defer wg.Done() + cb.FailWithError(errors.New("x")) + cb.LastError() + cb.Errors() + }() + } + wg.Wait() +}