Skip to content

Implement circuit breaker #129

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions circuitbreaker/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Circuit Breaker Middleware for Echo

This package provides a custom Circuit Breaker middleware for the Echo framework in Golang. It helps protect your application from cascading failures by limiting requests to failing services and resetting based on configurable timeouts and success criteria.

## Features

- Configurable failure handling
- Timeout-based state reset
- Automatic transition between states: Closed, Open, and Half-Open
- Easy integration with Echo framework

## Usage

```go
package main

import (
"net/http"
"time"

"github.com/labstack/echo-contrib/circuitbreaker"

"github.com/labstack/echo/v4"
)

func main() {

cbConfig := circuitbreaker.CircuitBreakerConfig{
Threshold: 5, // Number of failures before opening circuit
Timeout: 10 * time.Second, // Time to stay open before transitioning to half-open
ResetTimeout: 5 * time.Second, // Time before allowing a test request in half-open state
SuccessReset: 3, // Number of successes needed to move back to closed state
}

e := echo.New()
e.Use(circuitbreaker.CircuitBreakerMiddleware(cbConfig))

e.GET("/example", func(c echo.Context) error {
return c.String(http.StatusOK, "Success")
})

// Start server
e.Logger.Fatal(e.Start(":8081"))
}
```

### Circuit Breaker States

1. **Closed**: Requests pass through normally. If failures exceed the threshold, it transitions to Open.
2. **Open**: Requests are blocked. After the timeout period, it moves to Half-Open.
3. **Half-Open**: Allows a limited number of test requests. If successful, it resets to Closed, otherwise, it goes back to Open.







170 changes: 170 additions & 0 deletions circuitbreaker/circuit_breaker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: © 2017 LabStack and Echo contributors

// Package circuitbreaker provides a circuit breaker middleware for Echo.
package circuitbreaker

import (
"net/http"
"sync"
"time"

"github.com/labstack/echo/v4"
)

// CircuitBreakerState represents the state of the circuit breaker
type CircuitBreakerState string

const (
StateClosed CircuitBreakerState = "closed" // Normal operation
StateOpen CircuitBreakerState = "open" // Requests are blocked
StateHalfOpen CircuitBreakerState = "half-open" // Limited requests allowed to check recovery
)

// CircuitBreaker controls the flow of requests based on failure thresholds
type CircuitBreaker struct {
failureCount int
successCount int
state CircuitBreakerState
mutex sync.Mutex
threshold int
timeout time.Duration
resetTimeout time.Duration
successReset int
lastFailure time.Time
exitChan chan struct{}
}

// CircuitBreakerConfig holds configuration options for the circuit breaker
type CircuitBreakerConfig struct {
Threshold int // Maximum failures before switching to open state
Timeout time.Duration // Time window before attempting recovery
ResetTimeout time.Duration // Interval for monitoring the circuit state
SuccessReset int // Number of successful attempts to move to closed state
OnOpen func(ctx echo.Context) error // Callback for open state
OnHalfOpen func(ctx echo.Context) error // Callback for half-open state
OnClose func(ctx echo.Context) error // Callback for closed state
}

// Default configuration values for the circuit breaker
var DefaultCircuitBreakerConfig = CircuitBreakerConfig{
Threshold: 5,
Timeout: 30 * time.Second,
ResetTimeout: 10 * time.Second,
SuccessReset: 3,
OnOpen: func(ctx echo.Context) error {
return ctx.JSON(http.StatusServiceUnavailable, map[string]string{"error": "service unavailable"})
},
OnHalfOpen: func(ctx echo.Context) error {
return ctx.JSON(http.StatusTooManyRequests, map[string]string{"error": "service under recovery"})
},
OnClose: nil,
}

// NewCircuitBreaker initializes a circuit breaker with the given configuration
func NewCircuitBreaker(config CircuitBreakerConfig) *CircuitBreaker {
if config.Threshold <= 0 {
config.Threshold = DefaultCircuitBreakerConfig.Threshold
}
if config.Timeout == 0 {
config.Timeout = DefaultCircuitBreakerConfig.Timeout
}
if config.ResetTimeout == 0 {
config.ResetTimeout = DefaultCircuitBreakerConfig.ResetTimeout
}
if config.SuccessReset <= 0 {
config.SuccessReset = DefaultCircuitBreakerConfig.SuccessReset
}
if config.OnOpen == nil {
config.OnOpen = DefaultCircuitBreakerConfig.OnOpen
}
if config.OnHalfOpen == nil {
config.OnHalfOpen = DefaultCircuitBreakerConfig.OnHalfOpen
}

cb := &CircuitBreaker{
threshold: config.Threshold,
timeout: config.Timeout,
resetTimeout: config.ResetTimeout,
successReset: config.SuccessReset,
state: StateClosed,
exitChan: make(chan struct{}),
}
go cb.monitorReset()
return cb
}

// monitorReset periodically checks if the circuit should move from open to half-open state
func (cb *CircuitBreaker) monitorReset() {
for {
select {
case <-time.After(cb.resetTimeout):
cb.mutex.Lock()
if cb.state == StateOpen && time.Since(cb.lastFailure) > cb.timeout {
cb.state = StateHalfOpen
cb.successCount = 0
cb.failureCount = 0 // Reset failure count
}
cb.mutex.Unlock()
case <-cb.exitChan:
return
}
}
}

// AllowRequest checks if requests are allowed based on the circuit state
func (cb *CircuitBreaker) AllowRequest() bool {

cb.mutex.Lock()
defer cb.mutex.Unlock()

return cb.state != StateOpen
}

// ReportSuccess updates the circuit breaker on a successful request
func (cb *CircuitBreaker) ReportSuccess() {
cb.mutex.Lock()
defer cb.mutex.Unlock()

cb.successCount++
if cb.state == StateHalfOpen && cb.successCount >= cb.successReset {
cb.state = StateClosed
cb.failureCount = 0
cb.successCount = 0
}
}

// ReportFailure updates the circuit breaker on a failed request
func (cb *CircuitBreaker) ReportFailure() {
cb.mutex.Lock()
defer cb.mutex.Unlock()

cb.failureCount++
cb.lastFailure = time.Now()

if cb.failureCount >= cb.threshold {
cb.state = StateOpen
}
}

// CircuitBreakerMiddleware applies the circuit breaker to Echo requests
func CircuitBreakerMiddleware(config CircuitBreakerConfig) echo.MiddlewareFunc {
cb := NewCircuitBreaker(config)

return func(next echo.HandlerFunc) echo.HandlerFunc {
return func(ctx echo.Context) error {
if !cb.AllowRequest() {
return config.OnOpen(ctx)
}

err := next(ctx)
if err != nil {
cb.ReportFailure()
return err
}

cb.ReportSuccess()
return nil
}
}
}
76 changes: 76 additions & 0 deletions circuitbreaker/circuit_breaker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package circuitbreaker

import (
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/labstack/echo/v4"
"github.com/stretchr/testify/assert"
)

// TestNewCircuitBreaker ensures circuit breaker initializes with correct defaults
func TestNewCircuitBreaker(t *testing.T) {
cb := NewCircuitBreaker(CircuitBreakerConfig{})
assert.Equal(t, StateClosed, cb.state)
assert.Equal(t, DefaultCircuitBreakerConfig.Threshold, cb.threshold)
assert.Equal(t, DefaultCircuitBreakerConfig.Timeout, cb.timeout)
assert.Equal(t, DefaultCircuitBreakerConfig.ResetTimeout, cb.resetTimeout)
assert.Equal(t, DefaultCircuitBreakerConfig.SuccessReset, cb.successReset)
}

// TestAllowRequest checks request allowance in different states
func TestAllowRequest(t *testing.T) {
cb := NewCircuitBreaker(CircuitBreakerConfig{Threshold: 3})

assert.True(t, cb.AllowRequest())
cb.ReportFailure()
cb.ReportFailure()
cb.ReportFailure()
assert.False(t, cb.AllowRequest())
}

// TestReportSuccess verifies state transitions after successful requests
func TestReportSuccess(t *testing.T) {
cb := NewCircuitBreaker(CircuitBreakerConfig{Threshold: 2, SuccessReset: 2})
cb.state = StateHalfOpen
cb.ReportSuccess()
assert.Equal(t, StateHalfOpen, cb.state)
cb.ReportSuccess()
assert.Equal(t, StateClosed, cb.state)
}

// TestReportFailure checks state transitions after failures
func TestReportFailure(t *testing.T) {
cb := NewCircuitBreaker(CircuitBreakerConfig{Threshold: 2})
cb.ReportFailure()
assert.Equal(t, StateClosed, cb.state)
cb.ReportFailure()
assert.Equal(t, StateOpen, cb.state)
}

// TestMonitorReset ensures circuit moves to half-open after timeout
func TestMonitorReset(t *testing.T) {
cb := NewCircuitBreaker(CircuitBreakerConfig{Threshold: 1, Timeout: 1 * time.Second, ResetTimeout: 500 * time.Millisecond})
cb.ReportFailure()
time.Sleep(2 * time.Second) // Wait for reset logic
assert.Equal(t, StateHalfOpen, cb.state)
}

// TestCircuitBreakerMiddleware verifies middleware behavior
func TestCircuitBreakerMiddleware(t *testing.T) {
e := echo.New()
req := httptest.NewRequest(http.MethodGet, "/", nil)
rec := httptest.NewRecorder()
ctx := e.NewContext(req, rec)

handler := CircuitBreakerMiddleware(DefaultCircuitBreakerConfig)(func(c echo.Context) error {
return c.String(http.StatusOK, "success")
})

err := handler(ctx)
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, rec.Code)
assert.Equal(t, "success", rec.Body.String())
}