Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ Thumbs.db
# Seeds
*.dump
docker/postgres-pgduckdb/tidx-abi/target/
.worktrees/
36 changes: 36 additions & 0 deletions db/api_role.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
-- Create a read-only role for API query connections.
-- The API should connect as this role to provide defense-in-depth
-- against SQL injection, even if the query validator is bypassed.
--
-- Modeled after golden-axe's uapi role:
-- https://github.com/indexsupply/golden-axe/blob/master/be/src/sql/roles.sql
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'tidx_api') THEN
CREATE ROLE tidx_api WITH LOGIN PASSWORD 'tidx_api' NOSUPERUSER NOCREATEDB NOCREATEROLE;
END IF;
END $$;

-- Revoke all default privileges first (defense-in-depth)
REVOKE ALL ON ALL TABLES IN SCHEMA public FROM tidx_api;
REVOKE EXECUTE ON ALL FUNCTIONS IN SCHEMA public FROM tidx_api;

-- Grant read-only access to indexed tables only
GRANT USAGE ON SCHEMA public TO tidx_api;
GRANT SELECT ON blocks, txs, logs, receipts TO tidx_api;

-- Grant execute only on ABI decode helper functions
GRANT EXECUTE ON FUNCTION abi_uint(bytea) TO tidx_api;
GRANT EXECUTE ON FUNCTION abi_int(bytea) TO tidx_api;
GRANT EXECUTE ON FUNCTION abi_address(bytea) TO tidx_api;
GRANT EXECUTE ON FUNCTION abi_bool(bytea) TO tidx_api;
GRANT EXECUTE ON FUNCTION abi_bytes(bytea, int) TO tidx_api;
GRANT EXECUTE ON FUNCTION abi_string(bytea, int) TO tidx_api;
GRANT EXECUTE ON FUNCTION format_address(bytea) TO tidx_api;
GRANT EXECUTE ON FUNCTION format_uint(bytea) TO tidx_api;

-- Resource limits to prevent DoS even if validator is bypassed
ALTER ROLE tidx_api SET statement_timeout = '30s';
ALTER ROLE tidx_api SET work_mem = '256MB';
ALTER ROLE tidx_api SET temp_file_limit = '512MB';
ALTER ROLE tidx_api CONNECTION LIMIT 64;
141 changes: 99 additions & 42 deletions src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,16 @@ async fn handle_query_live(
} else {
let catch_up_start = last_block_num + 1;
for block_num in catch_up_start..=end {
let filtered_sql = inject_block_filter(&sql, block_num);
let filtered_sql = match inject_block_filter(&sql, block_num) {
Ok(s) => s,
Err(e) => {
yield Ok(SseEvent::default()
.event("error")
.json_data(serde_json::json!({ "ok": false, "error": e.to_string() }))
.unwrap());
return;
}
};
match crate::service::execute_query_postgres(&pool, &filtered_sql, signature.as_deref(), &options).await {
Ok(result) => {
yield Ok(SseEvent::default()
Expand Down Expand Up @@ -516,50 +525,85 @@ async fn handle_query_live(
/// Inject a block number filter into SQL query for live streaming.
/// Transforms queries to only return data for the specific block.
/// Uses 'num' for blocks table, 'block_num' for txs/logs tables.
///
/// Uses sqlparser AST manipulation to safely add the filter condition,
/// avoiding SQL injection risks from string-based splicing.
#[doc(hidden)]
pub fn inject_block_filter(sql: &str, block_num: u64) -> String {
let sql_upper = sql.to_uppercase();

// Determine column name based on table being queried
let col = if sql_upper.contains("FROM BLOCKS") || sql_upper.contains("FROM \"BLOCKS\"") {
"num"
} else {
"block_num"
pub fn inject_block_filter(sql: &str, block_num: u64) -> Result<String, ApiError> {
use sqlparser::ast::{
BinaryOperator, Expr, Ident, SetExpr, Statement, Value,
};

// Find WHERE clause position
if let Some(where_pos) = sql_upper.find("WHERE") {
// Insert after WHERE
let insert_pos = where_pos + 5;
format!(
"{} {} = {} AND {}",
&sql[..insert_pos],
col,
block_num,
&sql[insert_pos..]
)
} else if let Some(order_pos) = sql_upper.find("ORDER BY") {
// Insert WHERE before ORDER BY
format!(
"{} WHERE {} = {} {}",
&sql[..order_pos],
col,
block_num,
&sql[order_pos..]
)
} else if let Some(limit_pos) = sql_upper.find("LIMIT") {
// Insert WHERE before LIMIT
format!(
"{} WHERE {} = {} {}",
&sql[..limit_pos],
col,
block_num,
&sql[limit_pos..]
)
} else {
// Append WHERE at end
format!("{sql} WHERE {col} = {block_num}")
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser;

let dialect = GenericDialect {};
let mut statements = Parser::parse_sql(&dialect, sql)
.map_err(|e| ApiError::BadRequest(format!("SQL parse error: {e}")))?;

if statements.len() != 1 {
return Err(ApiError::BadRequest(
"Live mode requires exactly one SQL statement".to_string(),
));
}

let stmt = &mut statements[0];
let query = match stmt {
Statement::Query(q) => q,
_ => {
return Err(ApiError::BadRequest(
"Live mode requires a SELECT query".to_string(),
))
}
};

let select = match query.body.as_mut() {
SetExpr::Select(s) => s,
_ => {
return Err(ApiError::BadRequest(
"Live mode requires a simple SELECT query (UNION/INTERSECT not supported)"
.to_string(),
))
}
};

let table_name: String = select
.from
.first()
.and_then(|twj| match &twj.relation {
sqlparser::ast::TableFactor::Table { name, .. } => {
name.0.last().and_then(|part| part.as_ident()).map(|ident| ident.value.to_lowercase())
}
_ => None,
})
.ok_or_else(|| {
ApiError::BadRequest(
"Live mode requires a query with a FROM table clause".to_string(),
)
})?;

let col_name = if table_name == "blocks" { "num" } else { "block_num" };

let col_expr = Expr::CompoundIdentifier(vec![
Ident::new(&table_name),
Ident::new(col_name),
]);

let block_filter = Expr::BinaryOp {
left: Box::new(col_expr),
op: BinaryOperator::Eq,
right: Box::new(Expr::Value(Value::Number(block_num.to_string(), false).into())),
};

select.selection = Some(match select.selection.take() {
Some(existing) => Expr::BinaryOp {
left: Box::new(Expr::Nested(Box::new(existing))),
op: BinaryOperator::And,
right: Box::new(block_filter),
},
None => block_filter,
});

Ok(stmt.to_string())
}

/// Rewrite analytics table references to include chain-specific database prefix.
Expand Down Expand Up @@ -599,6 +643,19 @@ pub enum ApiError {
NotFound(String),
}

impl std::fmt::Display for ApiError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ApiError::BadRequest(msg) => write!(f, "{msg}"),
ApiError::Timeout => write!(f, "Query timeout"),
ApiError::QueryError(msg) => write!(f, "{msg}"),
ApiError::Internal(msg) => write!(f, "{msg}"),
ApiError::Forbidden(msg) => write!(f, "{msg}"),
ApiError::NotFound(msg) => write!(f, "{msg}"),
}
}
}

impl IntoResponse for ApiError {
fn into_response(self) -> axum::response::Response {
let (status, message) = match self {
Expand Down
3 changes: 3 additions & 0 deletions src/db/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ pub async fn run_migrations(pool: &Pool) -> Result<()> {
// Load any optional extensions
conn.batch_execute(include_str!("../../db/extensions.sql")).await?;

// Create read-only API role with SELECT-only access to indexed tables
conn.batch_execute(include_str!("../../db/api_role.sql")).await?;

Ok(())
}

Expand Down
Loading