diff --git a/performance/config/alerts.yaml b/performance/config/alerts.yaml index 778ff282..95ab6f16 100644 --- a/performance/config/alerts.yaml +++ b/performance/config/alerts.yaml @@ -63,6 +63,28 @@ groups: description: "Cache hit rate is {{ $value | humanizePercentage }}, below 80% threshold" runbook_url: "https://docs.predictiq.com/runbooks/low-cache-hit-rate" + - alert: CacheCircuitBreakerOpen + expr: cache_circuit_breaker_state{state="open"} == 1 + for: 2m + labels: + severity: critical + component: cache + annotations: + summary: "Redis cache circuit breaker has been open for 2+ minutes" + description: "The Redis cache circuit breaker is open, causing cache bypass and degraded performance. Check Redis connectivity and health." + runbook_url: "https://docs.predictiq.com/runbooks/cache-circuit-breaker-open" + + - alert: CacheCircuitBreakerHalfOpen + expr: cache_circuit_breaker_state{state="half_open"} == 1 + for: 1m + labels: + severity: warning + component: cache + annotations: + summary: "Redis cache circuit breaker is in half-open state" + description: "The Redis cache circuit breaker is probing for recovery. If this persists, Redis may be unstable." + runbook_url: "https://docs.predictiq.com/runbooks/cache-circuit-breaker-half-open" + - name: database_performance interval: 30s rules: diff --git a/performance/config/grafana-dashboard.json b/performance/config/grafana-dashboard.json index 3810f27c..5d42cd2b 100644 --- a/performance/config/grafana-dashboard.json +++ b/performance/config/grafana-dashboard.json @@ -242,11 +242,218 @@ "notifications": [] } }, + { + "id": 11, + "title": "Cache Circuit Breaker State", + "type": "stat", + "gridPos": { "x": 12, "y": 16, "w": 6, "h": 4 }, + "targets": [ + { + "expr": "cache_circuit_breaker_state{state=\"closed\"}", + "legendFormat": "Closed", + "refId": "A" + }, + { + "expr": "cache_circuit_breaker_state{state=\"open\"}", + "legendFormat": "Open", + "refId": "B" + }, + { + "expr": "cache_circuit_breaker_state{state=\"half_open\"}", + "legendFormat": "Half-Open", + "refId": "C" + } + ], + "options": { + "colorMode": "background", + "graphMode": "none", + "justifyMode": "auto", + "orientation": "horizontal", + "reduceOptions": { + "calcs": ["lastNotNull"], + "fields": "", + "values": false + }, + "textMode": "value_and_name" + }, + "fieldConfig": { + "defaults": { + "mappings": [ + { + "type": "value", + "options": { + "0": { + "text": "Inactive", + "color": "transparent" + }, + "1": { + "text": "Active", + "color": "green" + } + } + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "value": null, + "color": "transparent" + }, + { + "value": 1, + "color": "green" + } + ] + } + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "Open" + }, + "properties": [ + { + "id": "thresholds", + "value": { + "mode": "absolute", + "steps": [ + { + "value": null, + "color": "transparent" + }, + { + "value": 1, + "color": "red" + } + ] + } + }, + { + "id": "mappings", + "value": [ + { + "type": "value", + "options": { + "0": { + "text": "Inactive", + "color": "transparent" + }, + "1": { + "text": "OPEN - Cache Bypassed", + "color": "red" + } + } + } + ] + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "Half-Open" + }, + "properties": [ + { + "id": "thresholds", + "value": { + "mode": "absolute", + "steps": [ + { + "value": null, + "color": "transparent" + }, + { + "value": 1, + "color": "yellow" + } + ] + } + }, + { + "id": "mappings", + "value": [ + { + "type": "value", + "options": { + "0": { + "text": "Inactive", + "color": "transparent" + }, + "1": { + "text": "Half-Open - Probing", + "color": "yellow" + } + } + } + ] + } + ] + } + ] + } + }, + { + "id": 12, + "title": "Cache Circuit Breaker State Timeline", + "type": "graph", + "gridPos": { "x": 18, "y": 16, "w": 6, "h": 4 }, + "targets": [ + { + "expr": "cache_circuit_breaker_state{state=\"open\"}", + "legendFormat": "Open", + "refId": "A" + }, + { + "expr": "cache_circuit_breaker_state{state=\"half_open\"}", + "legendFormat": "Half-Open", + "refId": "B" + } + ], + "yaxes": [ + { + "format": "short", + "label": "State", + "max": 1, + "min": 0 + } + ], + "alert": { + "name": "Cache Circuit Breaker Open", + "conditions": [ + { + "evaluator": { + "params": [1], + "type": "gt" + }, + "operator": { + "type": "and" + }, + "query": { + "params": ["A", "2m", "now"] + }, + "reducer": { + "params": [], + "type": "avg" + }, + "type": "query" + } + ], + "executionErrorState": "alerting", + "frequency": "1m", + "handler": 1, + "message": "Redis cache circuit breaker has been open for more than 2 minutes", + "noDataState": "no_data", + "notifications": [] + } + }, { "id": 6, "title": "Database Query Time (p95)", "type": "graph", - "gridPos": { "x": 12, "y": 16, "w": 12, "h": 8 }, + "gridPos": { "x": 0, "y": 20, "w": 12, "h": 8 }, "targets": [ { "expr": "histogram_quantile(0.95, rate(db_query_duration_seconds_bucket[5m]))", @@ -293,7 +500,7 @@ "id": 7, "title": "Database Connection Pool — Active vs Idle", "type": "graph", - "gridPos": { "x": 0, "y": 24, "w": 12, "h": 8 }, + "gridPos": { "x": 12, "y": 20, "w": 12, "h": 8 }, "targets": [ { "expr": "db_pool_connections_active{pool=\"main\"}", @@ -345,7 +552,7 @@ "id": 10, "title": "Database Pool Acquire Duration (p95)", "type": "graph", - "gridPos": { "x": 12, "y": 24, "w": 12, "h": 8 }, + "gridPos": { "x": 0, "y": 28, "w": 12, "h": 8 }, "targets": [ { "expr": "histogram_quantile(0.95, rate(db_pool_acquire_duration_seconds_bucket{pool=\"main\"}[5m]))", @@ -397,7 +604,7 @@ "id": 8, "title": "Contract Gas Costs", "type": "graph", - "gridPos": { "x": 12, "y": 24, "w": 12, "h": 8 }, + "gridPos": { "x": 12, "y": 28, "w": 12, "h": 8 }, "targets": [ { "expr": "contract_gas_used{operation=\"create_market\"}", @@ -454,7 +661,7 @@ "id": 9, "title": "System Health Overview", "type": "stat", - "gridPos": { "x": 0, "y": 32, "w": 24, "h": 4 }, + "gridPos": { "x": 0, "y": 36, "w": 24, "h": 4 }, "targets": [ { "expr": "up{job=\"predictiq-api\"}", diff --git a/services/api/src/cache/mod.rs b/services/api/src/cache/mod.rs index 2600d86f..3a1c2e6e 100644 --- a/services/api/src/cache/mod.rs +++ b/services/api/src/cache/mod.rs @@ -62,12 +62,25 @@ impl CircuitBreaker { } } - fn record_success(&self) { + fn record_success(&self, metrics: &Option) { + let prev_state = self.state(); self.failure_count.store(0, Ordering::Release); self.opened_at_ms.store(0, Ordering::Release); + let new_state = self.state(); + + // Only update metrics if state actually changed + if prev_state != new_state { + if let Some(m) = metrics { + m.set_cache_circuit_breaker_state(new_state as i64); + } + if prev_state != CircuitState::Closed { + tracing::info!("Redis circuit breaker closed after successful operation"); + } + } } - fn record_failure(&self) { + fn record_failure(&self, metrics: &Option) { + let prev_state = self.state(); let prev = self.failure_count.fetch_add(1, Ordering::AcqRel); if prev + 1 >= self.threshold && self.opened_at_ms.load(Ordering::Acquire) == 0 { let now_ms = std::time::SystemTime::now() @@ -80,15 +93,32 @@ impl CircuitBreaker { "Redis circuit breaker opened after {} failures", prev + 1 ); + + // Update metrics to reflect open state + if let Some(m) = metrics { + m.set_cache_circuit_breaker_state(CircuitState::Open as i64); + } } } /// Returns `true` if the call is allowed (Closed or HalfOpen). - fn allow(&self) -> bool { - match self.state() { + fn allow(&self, metrics: &Option) -> bool { + let prev_state = self.state(); + let allowed = match prev_state { CircuitState::Closed | CircuitState::HalfOpen => true, CircuitState::Open => false, + }; + + // Update metrics when transitioning to HalfOpen + let current_state = self.state(); + if prev_state != current_state && current_state == CircuitState::HalfOpen { + if let Some(m) = metrics { + m.set_cache_circuit_breaker_state(CircuitState::HalfOpen as i64); + } + tracing::info!("Redis circuit breaker transitioned to half-open, allowing probe request"); } + + allowed } } @@ -168,6 +198,7 @@ pub struct RedisCache { cb: Arc, cfg: RedisCacheConfig, tag_cfg: TagStoreConfig, + metrics: Option, } @@ -327,7 +358,19 @@ impl RedisCache { } pub async fn new_with_config(redis_url: &str, cfg: RedisCacheConfig) -> anyhow::Result { + Self::new_with_config_and_metrics(redis_url, cfg, None).await + } + pub async fn new_with_metrics(redis_url: &str, metrics: crate::metrics::Metrics) -> anyhow::Result { + let cfg = RedisCacheConfig::from_env(); + Self::new_with_config_and_metrics(redis_url, cfg, Some(metrics)).await + } + + pub async fn new_with_config_and_metrics( + redis_url: &str, + cfg: RedisCacheConfig, + metrics: Option, + ) -> anyhow::Result { let pool_cfg = PoolConfig::from_url(redis_url); let pool = pool_cfg .builder() @@ -339,7 +382,15 @@ impl RedisCache { let cb = Arc::new(CircuitBreaker::new(cfg.cb_threshold, cfg.cb_reset_timeout)); let tag_cfg = TagStoreConfig::from_env(); - Ok(Self { pool, cb, cfg, tag_cfg }) + + let cache = Self { pool, cb, cfg, tag_cfg, metrics: metrics.clone() }; + + // Initialize circuit breaker state metric to closed (0) + if let Some(ref m) = metrics { + m.set_cache_circuit_breaker_state(0); + } + + Ok(cache) } @@ -362,7 +413,7 @@ impl RedisCache { F: Fn(deadpool_redis::Connection) -> Fut, Fut: Future>, { - if !self.cb.allow() { + if !self.cb.allow(&self.metrics) { anyhow::bail!("Redis circuit breaker is open"); } @@ -375,16 +426,16 @@ impl RedisCache { match self.pool.get().await { Err(e) => { last_err = anyhow::anyhow!("pool acquire: {e}"); - self.cb.record_failure(); + self.cb.record_failure(&self.metrics); } Ok(conn) => match op(conn).await { Ok(v) => { - self.cb.record_success(); + self.cb.record_success(&self.metrics); return Ok(v); } Err(e) => { last_err = e; - self.cb.record_failure(); + self.cb.record_failure(&self.metrics); } }, } @@ -451,7 +502,7 @@ impl RedisCache { /// The circuit breaker is checked once before the loop; individual batch /// errors are propagated immediately. pub async fn del_by_pattern(&self, pattern: &str) -> anyhow::Result { - if !self.cb.allow() { + if !self.cb.allow(&self.metrics) { anyhow::bail!("Redis circuit breaker is open"); } @@ -515,7 +566,7 @@ impl RedisCache { Fut: Future>, { // If circuit is open, skip cache entirely and call fetcher directly. - if !self.cb.allow() { + if !self.cb.allow(&self.metrics) { tracing::warn!(key, "Redis unavailable, bypassing cache"); let value = fetcher().await?; return Ok((value, false)); @@ -621,7 +672,7 @@ impl RedisCache { /// Prefer `exec` for most use cases; use this only when you need to hold /// a connection across multiple commands (e.g. pipelined operations). pub async fn get_connection(&self) -> anyhow::Result { - if !self.cb.allow() { + if !self.cb.allow(&self.metrics) { anyhow::bail!("Redis circuit breaker is open"); } self.pool.get().await.context("failed to acquire Redis connection") diff --git a/services/api/src/metrics.rs b/services/api/src/metrics.rs index e9c8b542..ce076567 100644 --- a/services/api/src/metrics.rs +++ b/services/api/src/metrics.rs @@ -18,6 +18,7 @@ pub struct Metrics { db_pool_connections_idle: IntGaugeVec, db_pool_acquire_duration: HistogramVec, rate_limit_rejections: IntCounterVec, + cache_circuit_breaker_state: IntGaugeVec, } impl Metrics { @@ -120,6 +121,15 @@ impl Metrics { ) .context("rate_limit_rejections metric")?; + let cache_circuit_breaker_state = IntGaugeVec::new( + prometheus::Opts::new( + "cache_circuit_breaker_state", + "Redis cache circuit breaker state (0=closed, 1=open, 2=half-open)", + ), + &["state"], + ) + .context("cache_circuit_breaker_state metric")?; + registry.register(Box::new(cache_hits.clone()))?; registry.register(Box::new(cache_misses.clone()))?; registry.register(Box::new(invalidations.clone()))?; @@ -132,6 +142,7 @@ impl Metrics { registry.register(Box::new(db_pool_connections_idle.clone()))?; registry.register(Box::new(db_pool_acquire_duration.clone()))?; registry.register(Box::new(rate_limit_rejections.clone()))?; + registry.register(Box::new(cache_circuit_breaker_state.clone()))?; Ok(Self { registry, @@ -147,6 +158,7 @@ impl Metrics { db_pool_connections_idle, db_pool_acquire_duration, rate_limit_rejections, + cache_circuit_breaker_state, }) } @@ -226,6 +238,42 @@ impl Metrics { .inc(); } + /// Update the cache circuit breaker state gauge. + /// Call this whenever the circuit breaker transitions state. + /// state: 0=closed, 1=open, 2=half-open + pub fn set_cache_circuit_breaker_state(&self, state: i64) { + // Reset all states to 0 first + self.cache_circuit_breaker_state + .with_label_values(&["closed"]) + .set(0); + self.cache_circuit_breaker_state + .with_label_values(&["open"]) + .set(0); + self.cache_circuit_breaker_state + .with_label_values(&["half_open"]) + .set(0); + + // Set the current state to 1 + match state { + 0 => { + self.cache_circuit_breaker_state + .with_label_values(&["closed"]) + .set(1); + } + 1 => { + self.cache_circuit_breaker_state + .with_label_values(&["open"]) + .set(1); + } + 2 => { + self.cache_circuit_breaker_state + .with_label_values(&["half_open"]) + .set(1); + } + _ => {} + } + } + pub fn render(&self) -> anyhow::Result { let mut buffer = vec![]; let encoder = TextEncoder::new();