Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 28 additions & 2 deletions src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use futures::Stream;
use serde::{Deserialize, Serialize};
use tower_http::cors::CorsLayer;
use tower_http::trace::TraceLayer;
use chrono::Utc;

use crate::broadcast::Broadcaster;
use crate::clickhouse::ClickHouseEngine;
Expand Down Expand Up @@ -198,11 +199,17 @@ const GIT_REV: &str = if let Some(rev) = option_env!("GIT_REV") { rev } else { "
async fn handle_status(State(state): State<AppState>) -> Result<Json<StatusResponse>, ApiError> {
let mut all_chains = Vec::new();
let pools = state.pools.read().await;
for pool in pools.values() {
if let Ok(chains) = crate::service::get_all_status(pool).await {
for (chain_id, pool) in pools.iter() {
let chains = crate::service::get_all_status(pool)
.await
.map_err(|e| ApiError::QueryError(format!("Failed to load status for chain {chain_id}: {e}")))?;
if chains.is_empty() {
all_chains.push(empty_status(*chain_id));
} else {
all_chains.extend(chains);
}
}
all_chains.sort_by_key(|chain| chain.chain_id);

// Populate per-table store status for each chain
let ch_configs = state.clickhouse_configs.read().await;
Expand Down Expand Up @@ -244,6 +251,25 @@ async fn handle_status(State(state): State<AppState>) -> Result<Json<StatusRespo
}))
}

fn empty_status(chain_id: u64) -> SyncStatus {
SyncStatus {
chain_id: chain_id as i64,
head_num: 0,
synced_num: 0,
tip_num: 0,
lag: 0,
gap_blocks: 0,
gaps: Vec::new(),
backfill_num: None,
backfill_remaining: 0,
sync_rate: None,
eta_secs: None,
updated_at: Utc::now(),
postgres: None,
clickhouse: None,
}
}

#[derive(Deserialize)]
pub struct QueryParams {
/// SQL query (SELECT only)
Expand Down
75 changes: 75 additions & 0 deletions tests/status_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use axum::body::Body;
use axum::extract::connect_info::IntoMakeServiceWithConnectInfo;
use axum::http::{Request, StatusCode};
use axum::Router;
use deadpool_postgres::{Config as PgConfig, Runtime};
use tokio_postgres::NoTls;
use tower::Service;

use tidx::api;
Expand Down Expand Up @@ -102,6 +104,79 @@ async fn test_status_includes_postgres_watermarks() {
);
}

#[tokio::test]
#[serial(db)]
async fn test_status_includes_configured_chain_when_sync_state_is_empty() {
let db = TestDb::empty().await;
db.truncate_all().await;
let broadcaster = Arc::new(Broadcaster::new());
let (pools, chain_id) = make_pools(db.pool.clone());
let mut app = make_test_service(pools, chain_id, broadcaster).await;

let response = app
.call(
Request::builder()
.uri("/status")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();

assert_eq!(response.status(), StatusCode::OK);

let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();

let chains = json["chains"].as_array().expect("chains should be array");
assert_eq!(chains.len(), 1, "expected configured chain to be present");
assert_eq!(chains[0]["chain_id"], 1);
}

#[tokio::test]
async fn test_status_surfaces_query_failures() {
let broadcaster = Arc::new(Broadcaster::new());
let mut pools = HashMap::new();
let chain_id = 1u64;

let mut cfg = PgConfig::new();
cfg.url = Some("postgres://tidx:tidx@127.0.0.1:1/tidx?connect_timeout=1".to_string());
let broken_pool = cfg
.create_pool(Some(Runtime::Tokio1), NoTls)
.expect("failed to create broken pool");
pools.insert(chain_id, broken_pool);

let mut app = make_test_service(pools, chain_id, broadcaster).await;

let response = app
.call(
Request::builder()
.uri("/status")
.body(Body::empty())
.unwrap(),
)
.await
.unwrap();

assert_eq!(response.status(), StatusCode::UNPROCESSABLE_ENTITY);

let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();

assert_eq!(json["ok"], false);
assert!(
json["error"]
.as_str()
.unwrap_or_default()
.contains("Failed to load status for chain 1"),
"unexpected error body: {json:?}"
);
}

// ============================================================================
// CLI HTTP proxy: when a real HTTP server is running, status is fetched via HTTP
// ============================================================================
Expand Down
Loading