diff --git a/circuitbreaker.go b/circuitbreaker.go index 620cd94..e2ca98d 100644 --- a/circuitbreaker.go +++ b/circuitbreaker.go @@ -31,6 +31,7 @@ package circuit import ( + "container/ring" "context" "errors" "sync" @@ -75,12 +76,14 @@ const ( var ( defaultInitialBackOffInterval = 500 * time.Millisecond defaultBackoffMaxElapsedTime = 0 * time.Second + defaultErrorHistoryDepth = 10 ) // Error codes returned by Call var ( - ErrBreakerOpen = errors.New("breaker open") - ErrBreakerTimeout = errors.New("breaker time out") + ErrBreakerOpen = errors.New("breaker open") + ErrBreakerTimeout = errors.New("breaker time out") + ErrBreakerNoErrorRecorded = errors.New("no error in breaker history") ) // TripFunc is a function called by a Breaker's Fail() function and determines whether @@ -115,15 +118,21 @@ type Breaker struct { eventReceivers []chan BreakerEvent listeners []chan ListenerEvent backoffLock sync.Mutex + + //ring buffer for last N errors + errorsBuffer *ring.Ring + // errorsBufferLock used to prevent race accessing errorsBuffer (RWMutex is slower and atomic.Value same performance) + errorsBufferLock sync.Mutex } // Options holds breaker configuration options. type Options struct { - BackOff backoff.BackOff - Clock clock.Clock - ShouldTrip TripFunc - WindowTime time.Duration - WindowBuckets int + BackOff backoff.BackOff + Clock clock.Clock + ShouldTrip TripFunc + WindowTime time.Duration + WindowBuckets int + ErrorHistoryDepth int } // NewBreakerWithOptions creates a base breaker with a specified backoff, clock and TripFunc @@ -153,12 +162,17 @@ func NewBreakerWithOptions(options *Options) *Breaker { options.WindowBuckets = DefaultWindowBuckets } + if options.ErrorHistoryDepth <= 0 { + options.ErrorHistoryDepth = defaultErrorHistoryDepth + } + return &Breaker{ - BackOff: options.BackOff, - Clock: options.Clock, - ShouldTrip: options.ShouldTrip, - nextBackOff: options.BackOff.NextBackOff(), - counts: newWindow(options.WindowTime, options.WindowBuckets), + BackOff: options.BackOff, + Clock: options.Clock, + ShouldTrip: options.ShouldTrip, + nextBackOff: options.BackOff.NextBackOff(), + counts: newWindow(options.WindowTime, options.WindowBuckets), + errorsBuffer: ring.New(options.ErrorHistoryDepth), } } @@ -293,6 +307,44 @@ func (cb *Breaker) Fail() { } } +// FailWithError is the same as Fail, but keeps history of errors in internal ring buffer +func (cb *Breaker) FailWithError(err error) { + cb.errorsBufferLock.Lock() + defer cb.errorsBufferLock.Unlock() + + cb.errorsBuffer = cb.errorsBuffer.Next() + cb.errorsBuffer.Value = err + cb.Fail() +} + +// LastError returns last error from internal buffer +func (cb *Breaker) LastError() error { + cb.errorsBufferLock.Lock() + defer cb.errorsBufferLock.Unlock() + + if cb.errorsBuffer.Value == nil { + return ErrBreakerNoErrorRecorded + } + return cb.errorsBuffer.Value.(error) +} + +// Errors returns all errors from internal buffer +func (cb *Breaker) Errors() (errors []error) { + cb.errorsBufferLock.Lock() + defer cb.errorsBufferLock.Unlock() + + // reserve capacity to move last error to the end of slice without realloc + errors = make([]error, 0, cb.errorsBuffer.Len()+1) + cb.errorsBuffer.Do(func(x interface{}) { + if x != nil { + errors = append(errors, x.(error)) + } + }) + // move last error to the end + errors = append(errors[1:], errors[0]) + return errors +} + // Success is used to indicate a success condition the Breaker should record. If // the success was triggered by a retry attempt, the breaker will be Reset(). func (cb *Breaker) Success() { @@ -302,7 +354,9 @@ func (cb *Breaker) Success() { cb.backoffLock.Unlock() state := cb.state() - if state == halfopen { + // if state was halfopen and it's successful request this state will be `open`. + // due to cb.halfOpens is 1 at this point (request grouping) + if state == halfopen || state == open { cb.Reset() } atomic.StoreInt64(&cb.consecFailures, 0) @@ -362,7 +416,7 @@ func (cb *Breaker) CallContext(ctx context.Context, circuit func() error, timeou if err != nil { if ctx.Err() != context.Canceled { - cb.Fail() + cb.FailWithError(err) } return err } diff --git a/circuitbreaker_test.go b/circuitbreaker_test.go index 4bf9305..9c16bb9 100644 --- a/circuitbreaker_test.go +++ b/circuitbreaker_test.go @@ -2,8 +2,10 @@ package circuit import ( "context" + "errors" "fmt" - "sync/atomic" + "runtime" + "sync" "testing" "time" @@ -253,6 +255,52 @@ func TestThresholdBreakerCalling(t *testing.T) { } } +func TestThresholdBreakerErrorHistory(t *testing.T) { + cb := NewThresholdBreaker(2) + err := fmt.Errorf("error 1") + cb.FailWithError(err) + if cb.LastError() != err { + t.Fatal("expected last error to be `error 1`") + } + + cb = NewThresholdBreaker(1) + if cb.LastError() != nil { + t.Fatalf("expected last error to be `nil`, got %s", cb.LastError()) + } + + err = cb.Call(func() error { + return fmt.Errorf("circuit error") + }, 0) + if err == nil { + t.Fatal("expected threshold breaker to error") + } + if !cb.Tripped() { + t.Fatal("expected threshold breaker to be open") + } + if cb.LastError().Error() != "circuit error" { + t.Fatalf("expected last error to be `circut error`, got %s", cb.LastError()) + } + + cb.Success() + cb.Call(func() error { + return fmt.Errorf("circuit error 1") + }, 0) + if cb.LastError().Error() != "circuit error 1" { + t.Fatalf("expected last error to be `circut error 1`, got %s", cb.LastError()) + } + + errs := cb.Errors() + if len(errs) != 2 { + t.Fatalf("expected `%d` errors, got %d", 2, len(errs)) + } + if errs[0].Error() != "circuit error" { + t.Fatalf("expected `%s` error, got %s", "circuit error", errs[0].Error()) + } + if errs[1].Error() != "circuit error 1" { + t.Fatalf("expected `%s` error, got %s", "circuit error 1", errs[0].Error()) + } +} + func TestThresholdBreakerCallingContext(t *testing.T) { circuit := func() error { return fmt.Errorf("error") @@ -323,15 +371,10 @@ func TestThresholdBreakerResets(t *testing.T) { } func TestTimeoutBreaker(t *testing.T) { - wait := make(chan struct{}) - c := clock.NewMock() - called := int32(0) circuit := func() error { - wait <- struct{}{} - atomic.AddInt32(&called, 1) - <-wait + time.Sleep(100000000 * time.Millisecond) return nil } @@ -339,21 +382,29 @@ func TestTimeoutBreaker(t *testing.T) { cb.Clock = c errc := make(chan error) - go func() { errc <- cb.Call(circuit, time.Millisecond) }() - + wait := make(chan struct{}) + go func() { wait <- struct{}{}; errc <- cb.Call(circuit, time.Millisecond) }() <-wait - c.Add(time.Millisecond * 3) - wait <- struct{}{} + // yield and advance the clock + runtime.Gosched() + c.Add(time.Millisecond * 1000) err := <-errc - if err == nil { - t.Fatal("expected timeout breaker to return an error") + if err != ErrBreakerTimeout { + t.Fatalf("expected timeout breaker to return an error `%s`, got %s", ErrBreakerTimeout, err) } - go cb.Call(circuit, time.Millisecond) + cb.Clock = clock.NewMock() + go func() { wait <- struct{}{}; errc <- cb.Call(circuit, time.Millisecond) }() <-wait + // yield and advance the clock + runtime.Gosched() c.Add(time.Millisecond * 3) - wait <- struct{}{} + + err = <-errc + if err != ErrBreakerOpen { + t.Fatalf("expected timeout breaker to return an error `%s`, got %s", ErrBreakerOpen, err) + } if !cb.Tripped() { t.Fatal("expected timeout breaker to be open") @@ -474,3 +525,20 @@ func TestPartialSecondBackoff(t *testing.T) { t.Fatalf("expected breaker to be ready after more than nextBackoff time had passed") } } + +// TestGoroutineSafe verifies that the circuit breaker can be used concurrently +// without race conditions +func TestGoroutineSafe(t *testing.T) { + cb := NewBreaker() + wg := sync.WaitGroup{} + for i := 0; i < 2; i++ { + wg.Add(1) + go func() { + defer wg.Done() + cb.FailWithError(errors.New("x")) + cb.LastError() + cb.Errors() + }() + } + wg.Wait() +}