Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions services/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,17 @@ base64 = "0.22"
subtle = "2.5"
ipnet = "2"

[features]
# Gate tests that require a live Redis instance (testcontainers or external).
# Run with: cargo test --features redis-integration
redis-integration = []

[dev-dependencies]
criterion = { version = "0.5", features = ["html_reports"] }
testcontainers = { version = "0.23", features = ["tokio"] }
testcontainers-modules = { version = "0.11", features = ["redis", "tokio"] }
testcontainers = "0.23"
testcontainers-modules = { version = "0.11", features = ["redis"] }
axum = { version = "0.7", features = ["macros"] }
serde_json = "1"

[[bench]]
name = "api_key_auth"
Expand Down
48 changes: 42 additions & 6 deletions services/api/src/blockchain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,20 @@ impl BlockchainClient {
return Ok(cursor_ledger);
}

// A gap of more than one ledger means the worker was behind or events
// were skipped. Emit a metric and a warning so alerts can fire.
let gap = confirmed_tip.saturating_sub(cursor_ledger + 1);
if gap > 0 {
tracing::warn!(
cursor_ledger,
confirmed_tip,
gap,
network = %self.network,
"ledger gap detected during blockchain sync"
);
self.metrics.observe_ledger_gap(&self.network, gap);
}

let events = self.fetch_events_since(cursor_ledger + 1).await?;
for event in events {
let event_key = format!("{}:event:{}", keys::CHAIN_PREFIX, event.id);
Expand Down Expand Up @@ -1027,12 +1041,34 @@ impl BlockchainClient {
Ok(progress)
}

/// Spawn both background workers and return their handles.
/// Each worker holds a child cancellation token and reports completion
/// to the coordinator when it exits.
pub fn start_background_tasks(self: Arc<Self>, coordinator: &ShutdownCoordinator) -> Vec<WorkerHandle> {
let sync_token = coordinator.token();
let sync_coord = coordinator.clone();
/// Test-only constructor that accepts an externally built HTTP client so
/// tests can configure short timeouts and point at a local mock RPC server.
#[cfg(test)]
pub(crate) fn new_for_test(
rpc_url: String,
cache: RedisCache,
metrics: Metrics,
http: Client,
retry_attempts: u32,
) -> Self {
Self {
http,
rpc_url,
network: "testnet".to_string(),
contract_id: "test-contract".to_string(),
retry_attempts,
retry_base_delay_ms: 10,
event_poll_interval: Duration::from_millis(50),
tx_poll_interval: Duration::from_millis(50),
confirmation_ledger_lag: 1,
sync_market_ids: vec![],
cache,
metrics,
monitor: Arc::new(MonitoringState::default()),
}
}

pub fn start_background_tasks(self: Arc<Self>) {
let sync_client = self.clone();
let sync_handle = tokio::spawn(async move {
sync_client.run_sync_worker(sync_token, sync_coord).await;
Expand Down
73 changes: 15 additions & 58 deletions services/api/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@ pub struct Metrics {
rpc_errors: IntCounterVec,
rpc_fallbacks: IntCounterVec,
db_timeouts: IntCounterVec,
email_dlq_size: IntGauge,
db_pool_connections_active: IntGaugeVec,
db_pool_connections_idle: IntGaugeVec,
db_pool_acquire_duration: HistogramVec,
rate_limit_rejections: IntCounterVec,
ledger_gaps: IntCounterVec,
}

impl Metrics {
Expand Down Expand Up @@ -75,50 +71,14 @@ impl Metrics {
)
.context("db_timeouts metric")?;

let email_dlq_size = IntGauge::new(
"email_dlq_size",
"Number of email jobs currently in the dead-letter queue",
)
.context("email_dlq_size metric")?;

let db_pool_connections_active = IntGaugeVec::new(
let ledger_gaps = IntCounterVec::new(
prometheus::Opts::new(
"db_pool_connections_active",
"Number of connections currently checked out from the pool",
"blockchain_ledger_gaps_total",
"Ledger gap events detected during blockchain sync, labelled by network",
),
&["pool"],
&["network"],
)
.context("db_pool_connections_active metric")?;

let db_pool_connections_idle = IntGaugeVec::new(
prometheus::Opts::new(
"db_pool_connections_idle",
"Number of idle connections sitting in the pool",
),
&["pool"],
)
.context("db_pool_connections_idle metric")?;

let db_pool_acquire_duration = HistogramVec::new(
prometheus::HistogramOpts::new(
"db_pool_acquire_duration_seconds",
"Time spent waiting to acquire a connection from the pool",
)
.buckets(vec![
0.0001, 0.0005, 0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0,
]),
&["pool"],
)
.context("db_pool_acquire_duration metric")?;

let rate_limit_rejections = IntCounterVec::new(
prometheus::Opts::new(
"rate_limit_rejections_total",
"Requests rejected by the rate limiter, by route",
),
&["route"],
)
.context("rate_limit_rejections metric")?;
.context("ledger_gaps metric")?;

registry.register(Box::new(cache_hits.clone()))?;
registry.register(Box::new(cache_misses.clone()))?;
Expand All @@ -127,11 +87,7 @@ impl Metrics {
registry.register(Box::new(rpc_errors.clone()))?;
registry.register(Box::new(rpc_fallbacks.clone()))?;
registry.register(Box::new(db_timeouts.clone()))?;
registry.register(Box::new(email_dlq_size.clone()))?;
registry.register(Box::new(db_pool_connections_active.clone()))?;
registry.register(Box::new(db_pool_connections_idle.clone()))?;
registry.register(Box::new(db_pool_acquire_duration.clone()))?;
registry.register(Box::new(rate_limit_rejections.clone()))?;
registry.register(Box::new(ledger_gaps.clone()))?;

Ok(Self {
registry,
Expand All @@ -142,11 +98,7 @@ impl Metrics {
rpc_errors,
rpc_fallbacks,
db_timeouts,
email_dlq_size,
db_pool_connections_active,
db_pool_connections_idle,
db_pool_acquire_duration,
rate_limit_rejections,
ledger_gaps,
})
}

Expand Down Expand Up @@ -187,8 +139,13 @@ impl Metrics {
self.db_timeouts.with_label_values(&[operation]).inc();
}

pub fn set_dlq_size(&self, n: i64) {
self.email_dlq_size.set(n);
/// Record a ledger-gap event on `network`, incrementing the counter by `gap_size` ledgers.
pub fn observe_ledger_gap(&self, network: &str, gap_size: u32) {
if gap_size > 0 {
self.ledger_gaps
.with_label_values(&[network])
.inc_by(u64::from(gap_size));
}
}

pub fn observe_tx_eviction(&self, count: u64) {
Expand Down
137 changes: 110 additions & 27 deletions services/api/src/tracing_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,36 @@ use opentelemetry_sdk::{
use opentelemetry_semantic_conventions::resource::{SERVICE_NAME, SERVICE_VERSION};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};

/// Resolve the trace sampling rate from OTel standard env vars, falling back to `default_rate`.
///
/// Reads `OTEL_TRACES_SAMPLER` and `OTEL_TRACES_SAMPLER_ARG` per the OpenTelemetry
/// environment-variable specification. The default production rate is **10 %** (0.1).
///
/// | `OTEL_TRACES_SAMPLER` | Effect |
/// |-------------------------------------|-------------------------------------|
/// | `always_on` | Sample 100 % |
/// | `always_off` | Sample 0 % |
/// | `traceidratio` *(default)* | Use `OTEL_TRACES_SAMPLER_ARG` ratio |
/// | `parentbased_always_on` | Sample 100 % |
/// | `parentbased_always_off` | Sample 0 % |
/// | `parentbased_traceidratio` | Use `OTEL_TRACES_SAMPLER_ARG` ratio |
pub fn sample_rate_from_env(default_rate: f64) -> f64 {
let sampler = std::env::var("OTEL_TRACES_SAMPLER")
.unwrap_or_else(|_| "traceidratio".to_string());

match sampler.trim() {
"always_on" | "parentbased_always_on" => 1.0,
"always_off" | "parentbased_always_off" => 0.0,
"traceidratio" | "parentbased_traceidratio" => {
std::env::var("OTEL_TRACES_SAMPLER_ARG")
.ok()
.and_then(|v| v.trim().parse::<f64>().ok())
.filter(|r| (0.0..=1.0).contains(r))
.unwrap_or(default_rate)
/// Validates that `raw` is a float in the closed interval [0.0, 1.0].
/// Returns `Err` with a human-readable reason when the value is malformed or out of range.
pub(crate) fn validate_sampler_arg(raw: &str) -> Result<f64, String> {
match raw.trim().parse::<f64>() {
Ok(rate) if (0.0..=1.0).contains(&rate) => Ok(rate),
Ok(rate) => Err(format!("value {rate} is out of range [0.0, 1.0]")),
Err(_) => Err(format!("cannot parse {raw:?} as a float")),
}
}

/// Reads `OTEL_TRACES_SAMPLER_ARG` from the environment and validates it.
/// If the variable is absent, returns `fallback` silently.
/// If the variable is present but invalid, emits `tracing::warn!` and returns `fallback`.
pub(crate) fn resolve_sampler_rate(fallback: f64) -> f64 {
let raw = match std::env::var("OTEL_TRACES_SAMPLER_ARG") {
Ok(v) => v,
Err(_) => return fallback,
};

match validate_sampler_arg(&raw) {
Ok(rate) => rate,
Err(reason) => {
tracing::warn!(
invalid_value = %raw,
fallback_rate = fallback,
reason = %reason,
"OTEL_TRACES_SAMPLER_ARG is invalid; using fallback sample rate"
);
fallback
}
_ => default_rate,
}
}

Expand All @@ -61,6 +63,9 @@ pub fn init_tracing(
KeyValue::new("deployment.environment", std::env::var("ENVIRONMENT").unwrap_or_else(|_| "development".to_string())),
]);

// OTEL_TRACES_SAMPLER_ARG overrides the configured rate when present and valid.
let sample_rate = resolve_sampler_rate(sample_rate);

// Configure sampler based on sample rate
let sampler = if sample_rate >= 1.0 {
Sampler::AlwaysOn
Expand Down Expand Up @@ -213,4 +218,82 @@ mod tests {
assert!(result.is_ok());
shutdown_tracing();
}

// ── validate_sampler_arg ──────────────────────────────────────────────────

#[test]
fn sampler_arg_valid_mid_range() {
assert_eq!(validate_sampler_arg("0.5").unwrap(), 0.5);
}

#[test]
fn sampler_arg_valid_lower_boundary() {
assert_eq!(validate_sampler_arg("0.0").unwrap(), 0.0);
}

#[test]
fn sampler_arg_valid_upper_boundary() {
assert_eq!(validate_sampler_arg("1.0").unwrap(), 1.0);
}

#[test]
fn sampler_arg_rejects_non_float() {
let err = validate_sampler_arg("abc").unwrap_err();
assert!(err.contains("abc"), "error message should quote the invalid value: {err}");
}

#[test]
fn sampler_arg_rejects_out_of_range_high() {
// A value of 1.5 is a valid float but outside [0.0, 1.0].
// The warning IS emitted for this case (Err path → warn! at callsite).
let err = validate_sampler_arg("1.5").unwrap_err();
assert!(err.contains("out of range"), "error should describe range: {err}");
}

#[test]
fn sampler_arg_rejects_out_of_range_low() {
let err = validate_sampler_arg("-0.1").unwrap_err();
assert!(err.contains("out of range"), "error should describe range: {err}");
}

#[test]
fn sampler_arg_rejects_empty_string() {
assert!(validate_sampler_arg("").is_err());
}

#[test]
fn sampler_arg_rejects_whitespace_only() {
assert!(validate_sampler_arg(" ").is_err());
}

#[test]
fn resolve_sampler_rate_returns_fallback_when_env_absent() {
std::env::remove_var("OTEL_TRACES_SAMPLER_ARG");
assert_eq!(resolve_sampler_rate(0.3), 0.3);
}

#[test]
fn resolve_sampler_rate_uses_env_when_valid() {
std::env::set_var("OTEL_TRACES_SAMPLER_ARG", "0.7");
let rate = resolve_sampler_rate(0.1);
std::env::remove_var("OTEL_TRACES_SAMPLER_ARG");
assert_eq!(rate, 0.7);
}

#[test]
fn resolve_sampler_rate_falls_back_on_invalid_env() {
std::env::set_var("OTEL_TRACES_SAMPLER_ARG", "not-a-number");
let rate = resolve_sampler_rate(0.2);
std::env::remove_var("OTEL_TRACES_SAMPLER_ARG");
// Invalid value → fallback; the warning IS emitted (logged via tracing::warn!)
assert_eq!(rate, 0.2);
}

#[test]
fn resolve_sampler_rate_falls_back_on_out_of_range_env() {
std::env::set_var("OTEL_TRACES_SAMPLER_ARG", "2.0");
let rate = resolve_sampler_rate(0.5);
std::env::remove_var("OTEL_TRACES_SAMPLER_ARG");
assert_eq!(rate, 0.5);
}
}
Loading
Loading