From 44954a43cabae55e345e5aaafdd6a34d9caaf972 Mon Sep 17 00:00:00 2001 From: OSEH-svg Date: Sat, 27 Jun 2026 13:28:32 +0100 Subject: [PATCH] feat(backend): : improve backend reliability, observability, and event processing --- backend/src/lib.rs | 2 + backend/src/main.rs | 378 +--------------------------------------- backend/src/server.rs | 2 +- backend/src/shutdown.rs | 54 +++--- 4 files changed, 26 insertions(+), 410 deletions(-) diff --git a/backend/src/lib.rs b/backend/src/lib.rs index 85f1ab81..b5dbe2c0 100644 --- a/backend/src/lib.rs +++ b/backend/src/lib.rs @@ -21,7 +21,9 @@ pub mod routes; pub mod seed; pub mod server; pub mod session; +pub mod shutdown; pub mod telemetry; +pub mod tracing_context; pub mod worker; pub mod ws; diff --git a/backend/src/main.rs b/backend/src/main.rs index aeb70a1d..94ff502e 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -5,382 +5,8 @@ //! `predifi-seed`). This file only wires environment loading to //! [`predifi_backend::run_server`]. -pub mod config; -pub mod constants; -pub mod db; -pub mod errors; -pub mod jwt; -pub mod metrics; -pub mod openapi; -pub mod price_cache; -pub mod redis_cache; -pub mod referrals; -pub mod request_logger; -pub mod response; -pub mod routes; -pub mod server; -pub mod session; -pub mod shutdown; -pub mod telemetry; -pub mod tracing_context; -pub mod worker; -pub mod ws; - -use crate::config::Config; -use crate::metrics::Metrics; -use crate::request_logger::LoggingLayer; -use axum::response::IntoResponse; -use axum::routing::get; -use axum::Json; -use axum::Router; -use http::header::HeaderValue; -use sentry_tracing::layer as sentry_tracing_layer; -use serde_json::json; -use std::sync::Arc; -use std::time::Duration as TokioDuration; -use tokio::time::sleep; -#[cfg(not(test))] -use tower_governor::governor::GovernorConfigBuilder; -use tower_http::cors::{AllowOrigin, CorsLayer}; -use tracing::info; -use tracing_subscriber::prelude::*; -use tracing_subscriber::EnvFilter; - -/// Build the CORS middleware layer from the validated origin list in `config`. -/// -/// Only the origins listed in [`Config::cors_allowed_origins`] are permitted. -/// The list is validated at startup (see `config::parse_cors_origins`), so any -/// entry that reaches this function is already a well-formed `http://` or -/// `https://` origin. Entries that cannot be parsed into a [`HeaderValue`] are -/// silently skipped (this should never happen in practice given the prior -/// validation). -pub fn build_cors(config: &Config) -> CorsLayer { - let origins: Vec = config - .cors_allowed_origins - .iter() - .filter_map(|origin| origin.parse().ok()) - .collect(); - - CorsLayer::new() - .allow_origin(AllowOrigin::list(origins)) - .allow_methods([ - http::Method::GET, - http::Method::POST, - http::Method::PUT, - http::Method::DELETE, - http::Method::OPTIONS, - ]) - .allow_headers([ - http::header::CONTENT_TYPE, - http::header::AUTHORIZATION, - http::header::ACCEPT, - ]) -} - -use axum::extract::State; - -/// Check database health with a simple query. -async fn check_db_health(db: &Option) -> (String, String) { - if let Some(pool) = db { - match sqlx::query("SELECT 1").execute(pool).await { - Ok(_) => ("ok".to_string(), String::new()), - Err(e) => ("unreachable".to_string(), e.to_string()), - } - } else { - ("not_configured".to_string(), String::new()) - } -} - -/// Check RPC health with retry logic and exponential backoff. -async fn check_rpc_health(rpc_url: &str, timeout_secs: u64, retry_count: u8) -> (String, String) { - let client = reqwest::Client::builder() - .timeout(std::time::Duration::from_secs(timeout_secs)) - .build() - .unwrap_or_else(|_| reqwest::Client::new()); - - let max_attempts = retry_count as usize; - let mut last_error = String::new(); - - for attempt in 0..max_attempts { - let rpc_req = client - .post(rpc_url) - .json(&serde_json::json!({ - "jsonrpc": "2.0", - "id": 1, - "method": "getHealth" - })) - .send() - .await; - - match rpc_req { - Ok(res) if res.status().is_success() => { - return ("ok".to_string(), String::new()); - } - Ok(res) => { - last_error = format!("HTTP {} response", res.status()); - } - Err(e) => { - last_error = e.to_string(); - } - } - - // Exponential backoff: 2^(attempt) seconds, capped at 5 seconds - if attempt < max_attempts - 1 { - let backoff_duration = std::cmp::min(2u64.pow(attempt as u32), 5); - sleep(TokioDuration::from_secs(backoff_duration)).await; - } - } - - ("unreachable".to_string(), last_error) -} - -/// Health-check handler. -async fn health(State(state): State) -> axum::response::Response { - use axum::http::StatusCode; - - let mut all_healthy = true; - let (db_status, db_error) = check_db_health(&state.db).await; - if db_status == "unreachable" { - all_healthy = false; - } - - let (rpc_status, _rpc_error) = check_rpc_health( - &state.config.stellar_rpc_url, - state.config.rpc_health_timeout_secs, - state.config.rpc_health_retry_count, - ) - .await; - if rpc_status == "unreachable" { - all_healthy = false; - } - - /// Check Redis health and availability. - async fn check_redis_health(redis: &redis_cache::RedisCache) -> (String, String) { - if !redis.is_available() { - return ("not_configured".to_string(), String::new()); - } - if !redis.ping().await { - return ("unreachable".to_string(), "Redis ping failed".to_string()); - } - ("ok".to_string(), String::new()) - } - - /// Check price cache health. - fn check_price_cache_health(cache: &price_cache::PriceCache) -> (String, String) { - if cache.snapshot().is_empty() { - return ("not_ready".to_string(), "price cache is empty".to_string()); - } - ("ok".to_string(), String::new()) - } - - let (redis_status, redis_error) = check_redis_health(&state.redis).await; - if redis_status == "unreachable" || redis_status == "not_configured" { - all_healthy = false; - } - - let (price_cache_status, _price_cache_error) = check_price_cache_health(&state.cache); - if price_cache_status == "not_ready" { - all_healthy = false; - } - - let body = json!({ - "status": if all_healthy { "ok" } else { "error" }, - "service": "predifi-backend", - "version": env!("CARGO_PKG_VERSION"), - "dependencies": { - "db": db_status, - "rpc": rpc_status, - "redis": redis_status, - "price_cache": price_cache_status - }, - "errors": { - "db": if db_status == "unreachable" { Some(db_error.clone()) } else { None }, - "rpc": if rpc_status == "unreachable" { Some("rpc unreachable".to_string()) } else { None }, - "redis": if redis_status == "unreachable" { Some(redis_error.clone()) } else { None }, - "price_cache": if price_cache_status == "not_ready" { Some("price cache is empty".to_string()) } else { None } - } - }); - - if all_healthy { - (StatusCode::OK, Json(body)).into_response() - } else { - (StatusCode::SERVICE_UNAVAILABLE, Json(body)).into_response() - } -} - -/// Root handler — returns a welcome message. -async fn root() -> Json { - Json(json!({ - "message": "Welcome to the PrediFi backend", - "api": "/api/v1" - })) -} - -/// Metrics endpoint exposed to Prometheus. -async fn metrics(State(state): State) -> impl IntoResponse { - match state.metrics.gather_text() { - Ok(body) => ( - axum::http::StatusCode::OK, - [(http::header::CONTENT_TYPE, "text/plain; version=0.0.4")], - body, - ), - Err(error) => ( - axum::http::StatusCode::INTERNAL_SERVER_ERROR, - [(http::header::CONTENT_TYPE, "text/plain; charset=utf-8")], - format!("failed to gather metrics: {error}"), - ), - } -} - -// async fn metrics_middleware( -// State(metrics): State, -// request: axum::http::Request, -// next: Next, -// ) -> axum::response::Response { -// let method = request.method().to_string(); -// let path = request.uri().path().to_string(); -// -// let response = next.run(request).await; -// let status = response.status().as_u16().to_string(); -// -// metrics -// .http_requests_total -// .with_label_values(&[&method, &path, &status]) -// .inc(); -// -// response -// } - -/// Build the Axum router with CORS, logging, and rate limiting middleware. -pub fn build_router( - config: Config, - cache: price_cache::PriceCache, - redis: redis_cache::RedisCache, - event_bus: ws::EventBus, -) -> Router { - let prometheus_metrics = Arc::new(Metrics::new().unwrap_or_else(|error| { - eprintln!("failed to initialize Prometheus metrics: {error}"); - std::process::exit(1); - })); - - let state = routes::v1::AppState { - config: Arc::new(config.clone()), - cache: cache.clone(), - redis: redis.clone(), - db: None, - metrics: prometheus_metrics.clone(), - event_bus: event_bus.clone(), - }; - - let router = Router::new() - .route("/", get(root)) - .route("/health", get(health)) - .route("/metrics", get(metrics)) - .with_state(state) - .nest( - "/api", - routes::router( - Arc::new(config.clone()), - cache, - redis, - None, - prometheus_metrics.clone(), - event_bus, - ), - ) - .merge(openapi::swagger_router()) - .layer(build_cors(&config)) - .layer(LoggingLayer); - - #[cfg(not(test))] - let router = { - let governor_conf = Arc::new( - GovernorConfigBuilder::default() - .per_second(5) - .burst_size(50) - .error_handler(|_| { - ( - axum::http::StatusCode::TOO_MANY_REQUESTS, - "Too Many Requests", - ) - .into_response() - }) - .finish() - .unwrap(), - ); - router.layer(tower_governor::GovernorLayer { - config: governor_conf, - }) - }; - - router -} - -/// Build the Axum router with a live database pool. -pub fn build_router_with_db( - config: Config, - cache: price_cache::PriceCache, - redis: redis_cache::RedisCache, - pool: sqlx::PgPool, - event_bus: ws::EventBus, -) -> Router { - let prometheus_metrics = Arc::new(Metrics::new().unwrap_or_else(|error| { - eprintln!("failed to initialize Prometheus metrics: {error}"); - std::process::exit(1); - })); - - let state = routes::v1::AppState { - config: Arc::new(config.clone()), - cache: cache.clone(), - redis: redis.clone(), - db: Some(pool.clone()), - metrics: prometheus_metrics.clone(), - event_bus: event_bus.clone(), - }; - - #[cfg(not(test))] - let governor_conf = Arc::new( - GovernorConfigBuilder::default() - .per_second(5) - .burst_size(50) - .error_handler(|_| { - ( - axum::http::StatusCode::TOO_MANY_REQUESTS, - "Too Many Requests", - ) - .into_response() - }) - .finish() - .unwrap(), - ); - - let router = Router::new() - .route("/", get(root)) - .route("/health", get(health)) - .route("/metrics", get(metrics)) - .with_state(state) - .nest( - "/api", - routes::router_with_db( - Arc::new(config.clone()), - cache, - redis, - pool, - prometheus_metrics.clone(), - event_bus, - ), - ) - .merge(openapi::swagger_router()) - .layer(build_cors(&config)) - .layer(LoggingLayer); - - #[cfg(not(test))] - let router = router.layer(tower_governor::GovernorLayer { - config: governor_conf, - }); - - router -} +use predifi_backend::config::Config; +use predifi_backend::run_server; #[tokio::main] async fn main() { diff --git a/backend/src/server.rs b/backend/src/server.rs index 409f0c46..e2dc437c 100644 --- a/backend/src/server.rs +++ b/backend/src/server.rs @@ -498,7 +498,7 @@ where warn!("Redis cache unavailable - running without caching"); } - let app = build_router_with_db(config.clone(), cache, redis, pool, event_bus); + let app = build_router_with_db(config.clone(), cache, redis, pool.clone(), event_bus); let bind_addr = config.bind_address(); diff --git a/backend/src/shutdown.rs b/backend/src/shutdown.rs index 1a43a1c3..dbc730ee 100644 --- a/backend/src/shutdown.rs +++ b/backend/src/shutdown.rs @@ -52,26 +52,18 @@ pub async fn wait_for_signal() { // resolves, leaving the surviving handlers in charge of triggering the // `select!`. No early-return paths are taken because a single failed // installation must never prevent the other signals from working. - let ctrl_c_arm = match tokio::signal::ctrl_c() { - Ok(future) => Some(future), - Err(error) => { - warn!(error = %error, "failed to install Ctrl+C handler; relying on SIGTERM/SIGHUP"); - None - } - }; - - let terminate_arm = match tokio::signal::unix::signal( + let mut terminate_signal = match tokio::signal::unix::signal( tokio::signal::unix::SignalKind::terminate(), ) { - Ok(mut signal) => Some(signal.recv()), + Ok(signal) => Some(signal), Err(error) => { warn!(error = %error, "failed to install SIGTERM handler; skipping"); None } }; - let hangup_arm = match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup()) { - Ok(mut signal) => Some(signal.recv()), + let mut hangup_signal = match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup()) { + Ok(signal) => Some(signal), Err(error) => { warn!(error = %error, "failed to install SIGHUP handler; skipping"); None @@ -79,20 +71,16 @@ pub async fn wait_for_signal() { }; let ctrl_c_block = async { - match ctrl_c_arm { - Some(future) => { - if future.await.is_ok() { - info!("received Ctrl+C, beginning graceful shutdown"); - } - } - None => std::future::pending::<()>().await, + match tokio::signal::ctrl_c().await { + Ok(()) => info!("received Ctrl+C, beginning graceful shutdown"), + Err(error) => warn!(error = %error, "Ctrl+C handler failed; relying on SIGTERM/SIGHUP"), } }; let terminate_block = async { - match terminate_arm { - Some(future) => { - future.await; + match terminate_signal.as_mut() { + Some(signal) => { + signal.recv().await; info!("received SIGTERM, beginning graceful shutdown"); } None => std::future::pending::<()>().await, @@ -100,9 +88,9 @@ pub async fn wait_for_signal() { }; let hangup_block = async { - match hangup_arm { - Some(future) => { - future.await; + match hangup_signal.as_mut() { + Some(signal) => { + signal.recv().await; info!("received SIGHUP, beginning graceful shutdown"); } None => std::future::pending::<()>().await, @@ -188,23 +176,23 @@ mod tests { /// `with_shutdown_timeout` does not panic and does not block forever when /// the future would take longer than the deadline. - #[tokio::test(start_paused = true)] + #[tokio::test] async fn shutdown_timeout_returns_after_deadline_when_future_is_slow() { - // tokio::time::sleep respects the paused clock, so a 10-second sleep - // completes instantly from the test's perspective while still - // exceeding the 1-second deadline we hand to `with_shutdown_timeout`. + // The inner future sleeps for 2 s; the timeout deadline is 100 ms. + // The helper must return after ~100 ms, well before the 2-second sleep + // would complete. let start = tokio::time::Instant::now(); - with_shutdown_timeout(Duration::from_secs(1), "slow-unit", async { - tokio::time::sleep(Duration::from_secs(10)).await; + with_shutdown_timeout(Duration::from_millis(100), "slow-unit", async { + tokio::time::sleep(Duration::from_secs(2)).await; }) .await; let elapsed = start.elapsed(); assert!( - elapsed >= Duration::from_secs(1), + elapsed >= Duration::from_millis(100), "helper should have waited at least the deadline (got {elapsed:?})" ); assert!( - elapsed < Duration::from_secs(10), + elapsed < Duration::from_secs(2), "helper should not have waited for the full future (got {elapsed:?})" ); }