Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions backend/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ PREDIFI_DB_ACQUIRE_TIMEOUT_SECS=30
PREDIFI_DB_CONNECT_MAX_ATTEMPTS=5
PREDIFI_DB_CONNECT_BASE_DELAY_MS=200
PREDIFI_DB_CONNECT_MAX_DELAY_MS=5000
# Per-connection TCP/TLS handshake timeout in seconds (issue #1174).
# Controls how long sqlx waits for each individual connection to be established.
# Distinct from PREDIFI_DB_ACQUIRE_TIMEOUT_SECS which limits waiting for a pool slot.
PREDIFI_DB_CONNECT_TIMEOUT_SECS=10

# Redis (for caching hot data - issue #714)
PREDIFI_REDIS_URL=redis://localhost:6379
Expand Down
15 changes: 15 additions & 0 deletions backend/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const DEFAULT_DB_ACQUIRE_TIMEOUT_SECS: u64 = 30;
const DEFAULT_DB_CONNECT_MAX_ATTEMPTS: u32 = 5;
const DEFAULT_DB_CONNECT_BASE_DELAY_MS: u64 = 200;
const DEFAULT_DB_CONNECT_MAX_DELAY_MS: u64 = 5_000;
const DEFAULT_DB_CONNECT_TIMEOUT_SECS: u64 = 10;
const DEFAULT_RPC_HEALTH_TIMEOUT_SECS: u64 = 2;
const DEFAULT_RPC_HEALTH_RETRY_COUNT: u8 = 3;
const DEFAULT_RPC_TIMEOUT_SECS: u64 = 30;
Expand Down Expand Up @@ -55,6 +56,13 @@ pub struct Config {
pub db_connect_base_delay_ms: u64,
/// Maximum backoff delay in ms between startup connection attempts (default `5000`).
pub db_connect_max_delay_ms: u64,
/// Per-connection TCP/TLS handshake timeout in seconds for each individual
/// connection the pool creates (default `10`). This is distinct from
/// `db_acquire_timeout_secs`, which governs how long a caller waits for a
/// slot in an already-open pool; this field controls how long sqlx waits
/// for the underlying TCP connect + TLS handshake to complete when
/// establishing a brand-new connection.
pub db_connect_timeout_secs: u64,
/// Per-attempt timeout in seconds for the Stellar RPC health check (default `2`).
pub rpc_health_timeout_secs: u64,
/// Number of times to retry the Stellar RPC health check before reporting failure (default `3`).
Expand Down Expand Up @@ -136,6 +144,11 @@ impl Config {
"PREDIFI_DB_CONNECT_MAX_DELAY_MS",
DEFAULT_DB_CONNECT_MAX_DELAY_MS,
)?;
let db_connect_timeout_secs = get_u64(
vars,
"PREDIFI_DB_CONNECT_TIMEOUT_SECS",
DEFAULT_DB_CONNECT_TIMEOUT_SECS,
)?;
let rpc_health_timeout_secs = get_u64(
vars,
"PREDIFI_RPC_HEALTH_TIMEOUT_SECS",
Expand Down Expand Up @@ -194,6 +207,7 @@ impl Config {
db_connect_max_attempts,
db_connect_base_delay_ms,
db_connect_max_delay_ms,
db_connect_timeout_secs,
rpc_health_timeout_secs,
rpc_health_retry_count,
rpc_timeout_secs,
Expand Down Expand Up @@ -333,6 +347,7 @@ impl Config {
db_connect_max_attempts: 1,
db_connect_base_delay_ms: 0,
db_connect_max_delay_ms: 0,
db_connect_timeout_secs: 5,
rpc_health_timeout_secs: 2,
rpc_health_retry_count: 3,
rpc_timeout_secs: 30,
Expand Down
82 changes: 80 additions & 2 deletions backend/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,17 @@ pub async fn create_pool(config: &Config) -> Result<PgPool, sqlx::Error> {
.acquire_timeout(Duration::from_secs(config.db_acquire_timeout_secs))
.connect(&config.database_url);

match tokio::time::timeout(Duration::from_secs(config.db_acquire_timeout_secs), future)
.await
// `db_connect_timeout_secs` bounds the time spent on the initial TCP/TLS
// handshake for each individual connection attempt. This is distinct from
// `acquire_timeout`, which limits how long a caller waits for a slot in an
// already-healthy pool. sqlx 0.8 does not expose a separate
// `connect_timeout` on `PgPoolOptions`, so we wrap the connect future in a
// `tokio::time::timeout` to achieve the same effect.
match tokio::time::timeout(
Duration::from_secs(config.db_connect_timeout_secs),
future,
)
.await
{
Ok(result) => result,
Err(_) => Err(sqlx::Error::PoolTimedOut),
Expand Down Expand Up @@ -290,6 +299,75 @@ mod tests {
assert_eq!(backoff_delay_ms(3, 200, 5_000), 800);
assert_eq!(backoff_delay_ms(10, 200, 5_000), 5_000);
}

#[test]
fn backoff_delay_with_zero_base_is_zero() {
// When base delay is 0, every attempt should return 0 (no delay)
assert_eq!(backoff_delay_ms(1, 0, 5_000), 0);
assert_eq!(backoff_delay_ms(5, 0, 5_000), 0);
}

#[test]
fn backoff_delay_saturates_at_max() {
// For large attempt counts the delay should be capped at max_delay_ms
assert_eq!(backoff_delay_ms(30, 100, 1_000), 1_000);
assert_eq!(backoff_delay_ms(64, 1, 500), 500);
}

/// Verify that `retry_with_backoff` treats `max_attempts = 0` as `1`
/// (the floor guard prevents zero-iteration loops).
#[tokio::test]
async fn retry_with_backoff_treats_zero_max_as_one() {
let calls = Arc::new(AtomicU32::new(0));
let calls_clone = calls.clone();
let result: Result<(), &'static str> =
retry_with_backoff(0, 0, 0, "test", || async {
calls_clone.fetch_add(1, Ordering::SeqCst);
Err("fail")
})
.await;

// Should attempt exactly once (0 is clamped to 1)
assert_eq!(result, Err("fail"));
assert_eq!(calls.load(Ordering::SeqCst), 1);
}

/// Ensure that a successful first attempt is returned immediately without
/// any retries, confirming the fast path works correctly.
#[tokio::test]
async fn retry_with_backoff_succeeds_on_first_attempt() {
let calls = Arc::new(AtomicU32::new(0));
let calls_clone = calls.clone();
let result: Result<&'static str, &'static str> =
retry_with_backoff(5, 0, 0, "test", || async {
calls_clone.fetch_add(1, Ordering::SeqCst);
Ok("immediate")
})
.await;

assert_eq!(result, Ok("immediate"));
// Must have called exactly once — no unnecessary retries
assert_eq!(calls.load(Ordering::SeqCst), 1);
}

/// Verify that the configurable connect delay fields in [`Config`] are
/// properly wired: `db_connect_timeout_secs` must be independent from
/// `db_acquire_timeout_secs` and both must be readable.
#[test]
fn config_connect_timeout_is_independent_from_acquire_timeout() {
let config = crate::config::Config::default_for_test();
// The two timeouts serve different purposes and must be independently
// configurable — a change to one must not imply a change to the other.
assert!(
config.db_connect_timeout_secs > 0,
"connect timeout must be > 0"
);
assert!(
config.db_acquire_timeout_secs > 0,
"acquire timeout must be > 0"
);
// They are allowed to be equal but are independently specified
}
}

/// A single row returned by the active pools query.
Expand Down
175 changes: 127 additions & 48 deletions backend/src/redis_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,8 @@ pub fn user_predictions_cache_key(address: &str, limit: i64, offset: i64) -> Str
mod tests {
use super::*;

// ── Key-generation tests ──────────────────────────────────────────────────

#[test]
fn test_cache_key_generation() {
assert_eq!(
Expand All @@ -333,6 +335,9 @@ mod tests {
);
}

// ── Disabled-cache / cache-miss tests ────────────────────────────────────

/// A disabled cache must never claim to be available.
#[tokio::test]
async fn test_disabled_cache() {
let cache = RedisCache::disabled();
Expand All @@ -346,68 +351,142 @@ mod tests {
cache.delete("test").await;
}

/// GET on any key of a disabled cache is a cache miss — returns `None`.
#[tokio::test]
async fn test_cache_aside_pattern_flow() {
// Simulates the cache-aside pattern: check cache, on miss load from DB, then cache result
async fn cache_miss_on_disabled_cache_returns_none_for_any_key() {
let cache = RedisCache::disabled();
let cache_key = "pools:new:all:active:20:0";

// Step 1: Check cache (should be empty)
let cached: Option<serde_json::Value> = cache.get(cache_key).await;
assert!(cached.is_none());

// Step 2: Simulate DB load and cache result
let test_data = serde_json::json!({
"pools": [],
"total": 0,
"limit": 20,
"offset": 0
});
cache.set(cache_key, &test_data, POOLS_CACHE_TTL).await;

// Step 3: Verify we got None (disabled cache no-ops)
let cached: Option<serde_json::Value> = cache.get(cache_key).await;
assert!(cached.is_none());

let result: Option<String> = cache.get("pools:popular:all:active:10:0").await;
assert!(
result.is_none(),
"disabled cache must always miss on GET (pools key)"
);

let result: Option<serde_json::Value> = cache.get("pool:42:details").await;
assert!(
result.is_none(),
"disabled cache must always miss on GET (pool-details key)"
);

let result: Option<Vec<u32>> = cache.get("user:GABC:predictions:10:0").await;
assert!(
result.is_none(),
"disabled cache must always miss on GET (user predictions key)"
);
}

/// Calling `set` followed by `get` on a disabled cache still returns `None`
/// because no backing store exists.
#[tokio::test]
async fn test_simulate_available_cache() {
let cache = RedisCache::simulate_available();
assert!(cache.is_available());
assert!(cache.ping().await);
async fn cache_miss_after_set_on_disabled_cache() {
let cache = RedisCache::disabled();

// Operations still no-op since there's no actual Redis connection
cache.set("test", &"value", 60).await;
let result: Option<String> = cache.get("test").await;
assert!(result.is_none());
cache.set("some-key", &42u32, 60).await;
let result: Option<u32> = cache.get("some-key").await;
assert!(
result.is_none(),
"GET after SET on a disabled cache must still be a cache miss"
);
}

/// `delete` on a disabled cache must be a no-op (must not panic).
#[tokio::test]
async fn cache_miss_delete_on_disabled_cache_is_noop() {
let cache = RedisCache::disabled();
// Neither of these must panic
cache.delete("nonexistent-key").await;
cache.delete_pattern("pools:*").await;
}

/// Verifies that `is_available()` returns `false` for a disabled cache,
/// confirming callers can guard cache operations correctly.
#[test]
fn disabled_cache_is_not_available() {
let cache = RedisCache::disabled();
assert!(
!cache.is_available(),
"disabled cache must report is_available() == false"
);
}

/// `ping` on a disabled cache must return `false` (connection refused / absent).
#[tokio::test]
async fn disabled_cache_ping_returns_false() {
let cache = RedisCache::disabled();
assert!(
!cache.ping().await,
"ping on a disabled cache must return false"
);
}

/// Cache misses must not be confused with empty-collection hits.
/// `GET` for a key that was never set must return `None`, not `Some(vec![])`.
#[tokio::test]
async fn test_cache_key_uniqueness() {
// Different parameters should produce different keys
let key1 = pools_cache_key("new", None, "active", 20, 0);
let key2 = pools_cache_key("popular", None, "active", 20, 0);
let key3 = pools_cache_key("new", Some("sports"), "active", 20, 0);
let key4 = pools_cache_key("new", None, "closed", 20, 0);
let key5 = pools_cache_key("new", None, "active", 10, 0);
let key6 = pools_cache_key("new", None, "active", 20, 10);

assert_ne!(key1, key2);
assert_ne!(key1, key3);
assert_ne!(key1, key4);
assert_ne!(key1, key5);
assert_ne!(key1, key6);
async fn cache_miss_is_none_not_empty_collection() {
let cache = RedisCache::disabled();
let result: Option<Vec<String>> = cache.get("pools:new:all:active:10:0").await;
assert!(
result.is_none(),
"a cache miss must be None, not Some(empty collection)"
);
}

/// Cache miss on a key with a previously-set TTL (simulated via disabled
/// cache) must also be `None`.
#[tokio::test]
async fn test_cache_pattern_invalidation() {
// Test pattern matching for cache invalidation
async fn cache_miss_after_ttl_simulated_via_disabled_cache() {
let cache = RedisCache::disabled();

// Operations that would invalidate all pool caches
cache.invalidate_pools_cache().await;
// Simulate: set with 1s TTL, then read back immediately (disabled = always miss)
cache.set("ttl-key", &"data", 1).await;
let result: Option<String> = cache.get("ttl-key").await;
assert!(
result.is_none(),
"disabled cache never stores data so always misses even within TTL"
);
}

// ── Cache-key uniqueness tests ────────────────────────────────────────────

// Verify pattern constant
assert_eq!(POOLS_CACHE_PATTERN, "pools:*");
/// Different pagination parameters must produce distinct cache keys,
/// ensuring pages are stored independently.
#[test]
fn pools_cache_keys_differ_by_offset() {
let key_page1 = pools_cache_key("new", None, "active", 10, 0);
let key_page2 = pools_cache_key("new", None, "active", 10, 10);
assert_ne!(
key_page1, key_page2,
"different offsets must produce different cache keys"
);
}

/// Different pool IDs must produce distinct cache keys.
#[test]
fn pool_details_keys_differ_by_id() {
let key_a = pool_details_cache_key(1);
let key_b = pool_details_cache_key(2);
assert_ne!(key_a, key_b, "different pool IDs must produce different keys");
}

/// Different user addresses must produce distinct cache keys.
#[test]
fn user_predictions_keys_differ_by_address() {
let key_a = user_predictions_cache_key("ADDR_A", 10, 0);
let key_b = user_predictions_cache_key("ADDR_B", 10, 0);
assert_ne!(
key_a, key_b,
"different user addresses must produce different keys"
);
}

/// The same key parameters must always produce the same key (deterministic).
#[test]
fn cache_key_generation_is_deterministic() {
for _ in 0..5 {
assert_eq!(
pools_cache_key("popular", Some("crypto"), "active", 20, 40),
"pools:popular:crypto:active:20:40"
);
}
}
}
Loading