Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 68 additions & 14 deletions circuitbreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
package circuit

import (
"container/ring"
"context"
"errors"
"sync"
Expand Down Expand Up @@ -75,12 +76,14 @@ const (
var (
defaultInitialBackOffInterval = 500 * time.Millisecond
defaultBackoffMaxElapsedTime = 0 * time.Second
defaultErrorHistoryDepth = 10
)

// Error codes returned by Call
var (
ErrBreakerOpen = errors.New("breaker open")
ErrBreakerTimeout = errors.New("breaker time out")
ErrBreakerOpen = errors.New("breaker open")
ErrBreakerTimeout = errors.New("breaker time out")
ErrBreakerNoErrorRecorded = errors.New("no error in breaker history")
)

// TripFunc is a function called by a Breaker's Fail() function and determines whether
Expand Down Expand Up @@ -115,15 +118,21 @@ type Breaker struct {
eventReceivers []chan BreakerEvent
listeners []chan ListenerEvent
backoffLock sync.Mutex

//ring buffer for last N errors
errorsBuffer *ring.Ring
// errorsBufferLock used to prevent race accessing errorsBuffer (RWMutex is slower and atomic.Value same performance)
errorsBufferLock sync.Mutex
}

// Options holds breaker configuration options.
type Options struct {
BackOff backoff.BackOff
Clock clock.Clock
ShouldTrip TripFunc
WindowTime time.Duration
WindowBuckets int
BackOff backoff.BackOff
Clock clock.Clock
ShouldTrip TripFunc
WindowTime time.Duration
WindowBuckets int
ErrorHistoryDepth int
}

// NewBreakerWithOptions creates a base breaker with a specified backoff, clock and TripFunc
Expand Down Expand Up @@ -153,12 +162,17 @@ func NewBreakerWithOptions(options *Options) *Breaker {
options.WindowBuckets = DefaultWindowBuckets
}

if options.ErrorHistoryDepth <= 0 {
options.ErrorHistoryDepth = defaultErrorHistoryDepth
}

return &Breaker{
BackOff: options.BackOff,
Clock: options.Clock,
ShouldTrip: options.ShouldTrip,
nextBackOff: options.BackOff.NextBackOff(),
counts: newWindow(options.WindowTime, options.WindowBuckets),
BackOff: options.BackOff,
Clock: options.Clock,
ShouldTrip: options.ShouldTrip,
nextBackOff: options.BackOff.NextBackOff(),
counts: newWindow(options.WindowTime, options.WindowBuckets),
errorsBuffer: ring.New(options.ErrorHistoryDepth),
}
}

Expand Down Expand Up @@ -293,6 +307,44 @@ func (cb *Breaker) Fail() {
}
}

// FailWithError is the same as Fail, but keeps history of errors in internal ring buffer
func (cb *Breaker) FailWithError(err error) {
cb.errorsBufferLock.Lock()
defer cb.errorsBufferLock.Unlock()

cb.errorsBuffer = cb.errorsBuffer.Next()
cb.errorsBuffer.Value = err
cb.Fail()
}

// LastError returns last error from internal buffer
func (cb *Breaker) LastError() error {
cb.errorsBufferLock.Lock()
defer cb.errorsBufferLock.Unlock()

if cb.errorsBuffer.Value == nil {
return ErrBreakerNoErrorRecorded
}
return cb.errorsBuffer.Value.(error)
}

// Errors returns all errors from internal buffer
func (cb *Breaker) Errors() (errors []error) {
cb.errorsBufferLock.Lock()
defer cb.errorsBufferLock.Unlock()

// reserve capacity to move last error to the end of slice without realloc
errors = make([]error, 0, cb.errorsBuffer.Len()+1)
cb.errorsBuffer.Do(func(x interface{}) {
if x != nil {
errors = append(errors, x.(error))
}
})
// move last error to the end
errors = append(errors[1:], errors[0])
return errors
}

// Success is used to indicate a success condition the Breaker should record. If
// the success was triggered by a retry attempt, the breaker will be Reset().
func (cb *Breaker) Success() {
Expand All @@ -302,7 +354,9 @@ func (cb *Breaker) Success() {
cb.backoffLock.Unlock()

state := cb.state()
if state == halfopen {
// if state was halfopen and it's successful request this state will be `open`.
// due to cb.halfOpens is 1 at this point (request grouping)
if state == halfopen || state == open {
cb.Reset()
}
atomic.StoreInt64(&cb.consecFailures, 0)
Expand Down Expand Up @@ -362,7 +416,7 @@ func (cb *Breaker) CallContext(ctx context.Context, circuit func() error, timeou

if err != nil {
if ctx.Err() != context.Canceled {
cb.Fail()
cb.FailWithError(err)
}
return err
}
Expand Down
98 changes: 83 additions & 15 deletions circuitbreaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package circuit

import (
"context"
"errors"
"fmt"
"sync/atomic"
"runtime"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -253,6 +255,52 @@ func TestThresholdBreakerCalling(t *testing.T) {
}
}

func TestThresholdBreakerErrorHistory(t *testing.T) {
cb := NewThresholdBreaker(2)
err := fmt.Errorf("error 1")
cb.FailWithError(err)
if cb.LastError() != err {
t.Fatal("expected last error to be `error 1`")
}

cb = NewThresholdBreaker(1)
if cb.LastError() != nil {
t.Fatalf("expected last error to be `nil`, got %s", cb.LastError())
}

err = cb.Call(func() error {
return fmt.Errorf("circuit error")
}, 0)
if err == nil {
t.Fatal("expected threshold breaker to error")
}
if !cb.Tripped() {
t.Fatal("expected threshold breaker to be open")
}
if cb.LastError().Error() != "circuit error" {
t.Fatalf("expected last error to be `circut error`, got %s", cb.LastError())
}

cb.Success()
cb.Call(func() error {
return fmt.Errorf("circuit error 1")
}, 0)
if cb.LastError().Error() != "circuit error 1" {
t.Fatalf("expected last error to be `circut error 1`, got %s", cb.LastError())
}

errs := cb.Errors()
if len(errs) != 2 {
t.Fatalf("expected `%d` errors, got %d", 2, len(errs))
}
if errs[0].Error() != "circuit error" {
t.Fatalf("expected `%s` error, got %s", "circuit error", errs[0].Error())
}
if errs[1].Error() != "circuit error 1" {
t.Fatalf("expected `%s` error, got %s", "circuit error 1", errs[0].Error())
}
}

func TestThresholdBreakerCallingContext(t *testing.T) {
circuit := func() error {
return fmt.Errorf("error")
Expand Down Expand Up @@ -323,37 +371,40 @@ func TestThresholdBreakerResets(t *testing.T) {
}

func TestTimeoutBreaker(t *testing.T) {
wait := make(chan struct{})

c := clock.NewMock()
called := int32(0)

circuit := func() error {
wait <- struct{}{}
atomic.AddInt32(&called, 1)
<-wait
time.Sleep(100000000 * time.Millisecond)
return nil
}

cb := NewThresholdBreaker(1)
cb.Clock = c

errc := make(chan error)
go func() { errc <- cb.Call(circuit, time.Millisecond) }()

wait := make(chan struct{})
go func() { wait <- struct{}{}; errc <- cb.Call(circuit, time.Millisecond) }()
<-wait
c.Add(time.Millisecond * 3)
wait <- struct{}{}
// yield and advance the clock
runtime.Gosched()
c.Add(time.Millisecond * 1000)

err := <-errc
if err == nil {
t.Fatal("expected timeout breaker to return an error")
if err != ErrBreakerTimeout {
t.Fatalf("expected timeout breaker to return an error `%s`, got %s", ErrBreakerTimeout, err)
}

go cb.Call(circuit, time.Millisecond)
cb.Clock = clock.NewMock()
go func() { wait <- struct{}{}; errc <- cb.Call(circuit, time.Millisecond) }()
<-wait
// yield and advance the clock
runtime.Gosched()
c.Add(time.Millisecond * 3)
wait <- struct{}{}

err = <-errc
if err != ErrBreakerOpen {
t.Fatalf("expected timeout breaker to return an error `%s`, got %s", ErrBreakerOpen, err)
}

if !cb.Tripped() {
t.Fatal("expected timeout breaker to be open")
Expand Down Expand Up @@ -474,3 +525,20 @@ func TestPartialSecondBackoff(t *testing.T) {
t.Fatalf("expected breaker to be ready after more than nextBackoff time had passed")
}
}

// TestGoroutineSafe verifies that the circuit breaker can be used concurrently
// without race conditions
func TestGoroutineSafe(t *testing.T) {
cb := NewBreaker()
wg := sync.WaitGroup{}
for i := 0; i < 2; i++ {
wg.Add(1)
go func() {
defer wg.Done()
cb.FailWithError(errors.New("x"))
cb.LastError()
cb.Errors()
}()
}
wg.Wait()
}