Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions lib/vmpooler/adaptive_timeout.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# frozen_string_literal: true

module Vmpooler
# Adaptive timeout that adjusts based on observed connection performance
# to optimize between responsiveness and reliability.
#
# Tracks recent connection durations and adjusts timeout to p95 + buffer,
# reducing timeout on failures to fail faster during outages.
class AdaptiveTimeout
attr_reader :current_timeout

# Initialize adaptive timeout
#
# @param name [String] Name for logging (e.g., "vsphere_connections")
# @param logger [Object] Logger instance
# @param metrics [Object] Metrics instance
# @param min [Integer] Minimum timeout in seconds
# @param max [Integer] Maximum timeout in seconds
# @param initial [Integer] Initial timeout in seconds
# @param max_samples [Integer] Number of recent samples to track
def initialize(name:, logger:, metrics:, min: 5, max: 60, initial: 30, max_samples: 100)
@name = name
@logger = logger
@metrics = metrics
@min_timeout = min
@max_timeout = max
@current_timeout = initial
@recent_durations = []
@max_samples = max_samples
@mutex = Mutex.new
end

# Get current timeout value (thread-safe)
# @return [Integer] Current timeout in seconds
def timeout
@mutex.synchronize { @current_timeout }
end

# Record a successful operation duration
# @param duration [Float] Duration in seconds
def record_success(duration)
@mutex.synchronize do
@recent_durations << duration
@recent_durations.shift if @recent_durations.size > @max_samples

# Adjust timeout based on recent performance
adjust_timeout if @recent_durations.size >= 10
end
end

# Record a failure (timeout or error)
# Reduces current timeout to fail faster on subsequent attempts
def record_failure
@mutex.synchronize do
old_timeout = @current_timeout
# Reduce timeout by 20% on failure, but don't go below minimum
@current_timeout = [(@current_timeout * 0.8).round, @min_timeout].max

if old_timeout != @current_timeout
@logger.log('d', "[*] [adaptive_timeout] '#{@name}' reduced timeout #{old_timeout}s → #{@current_timeout}s after failure")
@metrics.gauge("adaptive_timeout.current.#{@name}", @current_timeout)
end
end
end

# Reset to initial timeout (useful after recovery)
def reset
@mutex.synchronize do
@recent_durations.clear
old_timeout = @current_timeout
@current_timeout = [@max_timeout, 30].min

@logger.log('d', "[*] [adaptive_timeout] '#{@name}' reset timeout #{old_timeout}s → #{@current_timeout}s")
@metrics.gauge("adaptive_timeout.current.#{@name}", @current_timeout)
end
end

# Get statistics about recent durations
# @return [Hash] Statistics including min, max, avg, p95
def stats
@mutex.synchronize do
return { samples: 0 } if @recent_durations.empty?

sorted = @recent_durations.sort
{
samples: sorted.size,
min: sorted.first.round(2),
max: sorted.last.round(2),
avg: (sorted.sum / sorted.size.to_f).round(2),
p50: percentile(sorted, 0.50).round(2),
p95: percentile(sorted, 0.95).round(2),
p99: percentile(sorted, 0.99).round(2),
current_timeout: @current_timeout
}
end
end

private

def adjust_timeout
return if @recent_durations.empty?

sorted = @recent_durations.sort
p95_duration = percentile(sorted, 0.95)

# Set timeout to p95 + 50% buffer, bounded by min/max
new_timeout = (p95_duration * 1.5).round
new_timeout = [[new_timeout, @min_timeout].max, @max_timeout].min

# Only adjust if change is significant (> 5 seconds)
if (new_timeout - @current_timeout).abs > 5
old_timeout = @current_timeout
@current_timeout = new_timeout

@logger.log('d', "[*] [adaptive_timeout] '#{@name}' adjusted timeout #{old_timeout}s → #{@current_timeout}s (p95: #{p95_duration.round(2)}s)")
@metrics.gauge("adaptive_timeout.current.#{@name}", @current_timeout)
@metrics.gauge("adaptive_timeout.p95.#{@name}", p95_duration)
end
end

def percentile(sorted_array, percentile)
return 0 if sorted_array.empty?

index = (sorted_array.size * percentile).ceil - 1
index = [index, 0].max
index = [index, sorted_array.size - 1].min
sorted_array[index]
end
end
end
189 changes: 189 additions & 0 deletions lib/vmpooler/circuit_breaker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
# frozen_string_literal: true

module Vmpooler
# Circuit breaker pattern implementation to prevent cascading failures
# when a provider becomes unresponsive or experiences repeated failures.
#
# States:
# - CLOSED: Normal operation, requests flow through
# - OPEN: Provider is failing, reject requests immediately (fail fast)
# - HALF_OPEN: Testing if provider has recovered with limited requests
class CircuitBreaker
STATES = %i[closed open half_open].freeze

class CircuitOpenError < StandardError; end

attr_reader :state, :failure_count, :success_count

# Initialize a new circuit breaker
#
# @param name [String] Name for logging/metrics (e.g., "vsphere_provider")
# @param logger [Object] Logger instance
# @param metrics [Object] Metrics instance
# @param failure_threshold [Integer] Number of failures before opening circuit
# @param timeout [Integer] Seconds to wait in open state before testing (half-open)
# @param half_open_attempts [Integer] Number of successful test requests needed to close
def initialize(name:, logger:, metrics:, failure_threshold: 5, timeout: 30, half_open_attempts: 3)
@name = name
@logger = logger
@metrics = metrics
@failure_threshold = failure_threshold
@timeout = timeout
@half_open_attempts = half_open_attempts

@state = :closed
@failure_count = 0
@success_count = 0
@last_failure_time = nil
@mutex = Mutex.new
end

# Execute a block with circuit breaker protection
#
# @yield Block to execute if circuit allows
# @return Result of the block
# @raise CircuitOpenError if circuit is open and timeout hasn't elapsed
def call
check_state

begin
result = yield
on_success
result
rescue StandardError => e
on_failure(e)
raise
end
end

# Check if circuit allows requests
# @return [Boolean] true if circuit is closed or half-open
def allow_request?
@mutex.synchronize do
case @state
when :closed
true
when :half_open
true
when :open
if should_attempt_reset?
true
else
false
end
end
end
end

# Get current circuit breaker status
# @return [Hash] Status information
def status
@mutex.synchronize do
{
name: @name,
state: @state,
failure_count: @failure_count,
success_count: @success_count,
last_failure_time: @last_failure_time,
next_retry_time: next_retry_time
}
end
end

private

def check_state
@mutex.synchronize do
case @state
when :open
if should_attempt_reset?
transition_to_half_open
else
time_remaining = (@timeout - (Time.now - @last_failure_time)).round(1)
raise CircuitOpenError, "Circuit breaker '#{@name}' is open (#{@failure_count} failures, retry in #{time_remaining}s)"
end
when :half_open
# Allow limited requests through for testing
when :closed
# Normal operation
end
end
end

def should_attempt_reset?
return false unless @last_failure_time

Time.now - @last_failure_time >= @timeout
end

def next_retry_time
return nil unless @last_failure_time && @state == :open

@last_failure_time + @timeout
end

def on_success
@mutex.synchronize do
case @state
when :closed
# Reset failure count on success in closed state
@failure_count = 0 if @failure_count > 0
when :half_open
@success_count += 1
@failure_count = 0
@logger.log('d', "[+] [circuit_breaker] '#{@name}' successful test request (#{@success_count}/#{@half_open_attempts})")

transition_to_closed if @success_count >= @half_open_attempts
when :open
# Should not happen, but reset if we somehow get a success
transition_to_closed
end
end
end

def on_failure(error)
@mutex.synchronize do
@failure_count += 1
@last_failure_time = Time.now

case @state
when :closed
@logger.log('d', "[!] [circuit_breaker] '#{@name}' failure #{@failure_count}/#{@failure_threshold}: #{error.class}")
transition_to_open if @failure_count >= @failure_threshold
when :half_open
@logger.log('d', "[!] [circuit_breaker] '#{@name}' failed during half-open test")
transition_to_open
when :open
# Already open, just log
@logger.log('d', "[!] [circuit_breaker] '#{@name}' additional failure while open")
end
end
end

def transition_to_open
@state = :open
@success_count = 0
@logger.log('s', "[!] [circuit_breaker] '#{@name}' OPENED after #{@failure_count} failures (will retry in #{@timeout}s)")
@metrics.increment("circuit_breaker.opened.#{@name}")
@metrics.gauge("circuit_breaker.state.#{@name}", 1) # 1 = open
end

def transition_to_half_open
@state = :half_open
@success_count = 0
@failure_count = 0
@logger.log('s', "[*] [circuit_breaker] '#{@name}' HALF-OPEN, testing provider health")
@metrics.increment("circuit_breaker.half_open.#{@name}")
@metrics.gauge("circuit_breaker.state.#{@name}", 0.5) # 0.5 = half-open
end

def transition_to_closed
@state = :closed
@failure_count = 0
@success_count = 0
@logger.log('s', "[+] [circuit_breaker] '#{@name}' CLOSED, provider recovered")
@metrics.increment("circuit_breaker.closed.#{@name}")
@metrics.gauge("circuit_breaker.state.#{@name}", 0) # 0 = closed
end
end
end
28 changes: 28 additions & 0 deletions lib/vmpooler/generic_connection_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,34 @@ def with_metrics(options = {})
end
end
end

# Get connection pool health status
# @return [Hash] Health status including utilization and queue depth
def health_status
{
size: @size,
available: @available.length,
in_use: @size - @available.length,
utilization: ((@size - @available.length).to_f / @size * 100).round(2),
waiting_threads: (@queue.respond_to?(:length) ? @queue.length : 0),
state: determine_health_state
}
end

private

def determine_health_state
utilization = ((@size - @available.length).to_f / @size * 100)
waiting = @queue.respond_to?(:length) ? @queue.length : 0

if utilization >= 90 || waiting > 5
:critical # Pool exhausted or many waiting threads
elsif utilization >= 70 || waiting > 2
:warning # Pool under stress
else
:healthy # Normal operation
end
end
end
end
end
18 changes: 18 additions & 0 deletions lib/vmpooler/metrics/promstats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,24 @@ def vmpooler_metrics_table
torun: %i[manager],
docstring: 'vmpooler clone metrics',
param_labels: %i[poolname]
},
circuit_breaker: {
mtype: M_GAUGE,
torun: %i[manager],
docstring: 'Circuit breaker state and failure tracking',
param_labels: %i[metric_path]
},
connection_pool: {
mtype: M_GAUGE,
torun: %i[manager],
docstring: 'Connection pool health metrics',
param_labels: %i[metric_path]
},
adaptive_timeout: {
mtype: M_GAUGE,
torun: %i[manager],
docstring: 'Adaptive timeout statistics',
param_labels: %i[metric_path]
}
}
end
Expand Down
Loading