Skip to content

Commit 3fe065f

Browse files
committed
fix: merge signature CTEs with user WITH clauses
1 parent 2452451 commit 3fe065f

6 files changed

Lines changed: 190 additions & 11 deletions

File tree

src/api/views.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -178,10 +178,12 @@ pub async fn create_view(
178178
)));
179179
}
180180

181-
// Validate SQL is SELECT only
182-
let sql_upper = req.sql.trim().to_uppercase();
183-
if !sql_upper.starts_with("SELECT") {
184-
return Err(ApiError::BadRequest("SQL must be a SELECT statement".to_string()));
181+
// Validate SQL is query-only (SELECT or WITH ... SELECT)
182+
let sql_upper = req.sql.trim_start().to_uppercase();
183+
if !sql_upper.starts_with("SELECT") && !sql_upper.starts_with("WITH") {
184+
return Err(ApiError::BadRequest(
185+
"SQL must be a SELECT statement (CTEs with WITH are allowed)".to_string(),
186+
));
185187
}
186188

187189
// Parse signature if provided
@@ -209,8 +211,8 @@ pub async fn create_view(
209211
let sql = if let Some(ref sig) = signature {
210212
let sql = sig.normalize_table_references(&req.sql);
211213
let sql = sig.rewrite_filters_for_pushdown(&sql);
212-
let cte = sig.to_cte_sql_clickhouse();
213-
format!("WITH {} {}", cte, sql)
214+
let ctes = vec![sig.to_cte_sql_clickhouse()];
215+
crate::query::merge_ctes_into_query(&sql, &ctes)
214216
} else {
215217
req.sql.clone()
216218
};

src/clickhouse.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ impl ClickHouseEngine {
103103
}
104104

105105
let ctes: Vec<String> = sigs.iter().map(|sig| sig.to_cte_sql_clickhouse()).collect();
106-
format!("WITH {} {sql}", ctes.join(", "))
106+
crate::query::merge_ctes_into_query(&sql, &ctes)
107107
} else {
108108
sql.to_string()
109109
};

src/query/mod.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,76 @@ pub fn convert_hex_literals_postgres(sql: &str) -> String {
2424
.replace_all(sql, r"'\x$1'")
2525
.into_owned()
2626
}
27+
28+
/// Merge generated CTE definitions into a user query.
29+
///
30+
/// If query already starts with a WITH clause, generated CTEs are prepended
31+
/// to the existing CTE list. Otherwise, a new WITH clause is added.
32+
pub fn merge_ctes_into_query(sql: &str, generated_ctes: &[String]) -> String {
33+
if generated_ctes.is_empty() {
34+
return sql.to_string();
35+
}
36+
37+
let ctes = generated_ctes.join(", ");
38+
let trimmed = sql.trim_start();
39+
let leading_ws = &sql[..sql.len() - trimmed.len()];
40+
41+
if starts_with_keyword(trimmed, "WITH RECURSIVE") {
42+
let rest = trimmed["WITH RECURSIVE".len()..].trim_start();
43+
return format!("{leading_ws}WITH RECURSIVE {ctes}, {rest}");
44+
}
45+
46+
if starts_with_keyword(trimmed, "WITH") {
47+
let rest = trimmed["WITH".len()..].trim_start();
48+
return format!("{leading_ws}WITH {ctes}, {rest}");
49+
}
50+
51+
format!("{leading_ws}WITH {ctes} {trimmed}")
52+
}
53+
54+
fn starts_with_keyword(input: &str, keyword: &str) -> bool {
55+
input.len() >= keyword.len()
56+
&& input[..keyword.len()].eq_ignore_ascii_case(keyword)
57+
&& input[keyword.len()..]
58+
.chars()
59+
.next()
60+
.is_none_or(|c| c.is_ascii_whitespace())
61+
}
62+
63+
#[cfg(test)]
64+
mod tests {
65+
use super::merge_ctes_into_query;
66+
67+
#[test]
68+
fn test_merge_ctes_into_plain_select() {
69+
let merged = merge_ctes_into_query(
70+
"SELECT n FROM numbers",
71+
&["Transfer AS (SELECT 1)".to_string()],
72+
);
73+
assert_eq!(merged, "WITH Transfer AS (SELECT 1) SELECT n FROM numbers");
74+
}
75+
76+
#[test]
77+
fn test_merge_ctes_into_existing_with() {
78+
let merged = merge_ctes_into_query(
79+
"WITH numbers AS (SELECT 1 AS n) SELECT n FROM numbers",
80+
&["Transfer AS (SELECT 1)".to_string()],
81+
);
82+
assert_eq!(
83+
merged,
84+
"WITH Transfer AS (SELECT 1), numbers AS (SELECT 1 AS n) SELECT n FROM numbers"
85+
);
86+
}
87+
88+
#[test]
89+
fn test_merge_ctes_into_existing_with_recursive() {
90+
let merged = merge_ctes_into_query(
91+
"WITH RECURSIVE r AS (SELECT 1) SELECT * FROM r",
92+
&["Transfer AS (SELECT 1)".to_string()],
93+
);
94+
assert_eq!(
95+
merged,
96+
"WITH RECURSIVE Transfer AS (SELECT 1), r AS (SELECT 1) SELECT * FROM r"
97+
);
98+
}
99+
}

src/service/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ pub async fn execute_query_postgres(
174174
.iter()
175175
.map(|sig| sig.to_cte_sql_postgres_filtered(filter))
176176
.collect();
177-
format!("WITH {} {sql}", ctes.join(", "))
177+
crate::query::merge_ctes_into_query(&sql, &ctes)
178178
} else {
179179
sql.to_string()
180180
};
@@ -502,4 +502,3 @@ mod tests {
502502
}
503503

504504
}
505-

tests/api_live_test.rs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,73 @@ async fn test_query_with_signature_cte() {
235235
}
236236
}
237237

238+
#[tokio::test]
239+
#[serial(db)]
240+
async fn test_query_with_user_cte_succeeds_without_signature() {
241+
let db = TestDb::empty().await;
242+
let broadcaster = Arc::new(Broadcaster::new());
243+
let (pools, chain_id) = make_pools(db.pool.clone());
244+
let mut app = make_test_service(pools, chain_id, broadcaster).await;
245+
246+
let response = app
247+
.call(
248+
Request::builder()
249+
.method("GET")
250+
.uri("/query?sql=WITH%20numbers%20AS%20%28SELECT%201%20AS%20n%29%20SELECT%20n%20FROM%20numbers&chainId=1")
251+
.body(Body::empty())
252+
.unwrap(),
253+
)
254+
.await
255+
.unwrap();
256+
257+
assert_eq!(response.status(), StatusCode::OK);
258+
259+
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
260+
.await
261+
.unwrap();
262+
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
263+
264+
assert_eq!(json["ok"], true);
265+
assert_eq!(json["columns"], serde_json::json!(["n"]));
266+
assert_eq!(json["row_count"], 1);
267+
}
268+
269+
#[tokio::test]
270+
#[serial(db)]
271+
async fn test_query_with_signature_and_user_cte_succeeds() {
272+
let db = TestDb::empty().await;
273+
let broadcaster = Arc::new(Broadcaster::new());
274+
let (pools, chain_id) = make_pools(db.pool.clone());
275+
let mut app = make_test_service(pools, chain_id, broadcaster).await;
276+
277+
let sig = "Transfer(address%20indexed%20from%2Caddress%20indexed%20to%2Cuint256%20value)";
278+
let uri = format!(
279+
"/query?sql=WITH%20numbers%20AS%20%28SELECT%201%20AS%20n%29%20SELECT%20n%20FROM%20numbers&chainId=1&signature={sig}"
280+
);
281+
282+
let response = app
283+
.call(
284+
Request::builder()
285+
.method("GET")
286+
.uri(&uri)
287+
.body(Body::empty())
288+
.unwrap(),
289+
)
290+
.await
291+
.unwrap();
292+
293+
assert_eq!(response.status(), StatusCode::OK);
294+
295+
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
296+
.await
297+
.unwrap();
298+
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
299+
300+
assert_eq!(json["ok"], true);
301+
assert_eq!(json["columns"], serde_json::json!(["n"]));
302+
assert_eq!(json["row_count"], 1);
303+
}
304+
238305
#[tokio::test]
239306
#[serial(db)]
240307
async fn test_query_rejects_non_select() {

tests/smoke_test.rs

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -931,6 +931,46 @@ async fn test_query_logs_with_event_signature() {
931931
assert!(result.columns.contains(&"value".to_string()));
932932
}
933933

934+
#[tokio::test]
935+
#[serial(db)]
936+
async fn test_query_with_user_cte_succeeds_without_signature() {
937+
let db = TestDb::empty().await;
938+
let opts = default_options();
939+
940+
let result = execute_query_postgres(
941+
&db.pool,
942+
"WITH numbers AS (SELECT 1 AS n) SELECT n FROM numbers",
943+
&[],
944+
&opts,
945+
)
946+
.await
947+
.expect("Query with user CTE failed");
948+
949+
assert_eq!(result.engine.as_deref(), Some("postgres"));
950+
assert_eq!(result.row_count, 1);
951+
assert_eq!(result.columns, vec!["n".to_string()]);
952+
}
953+
954+
#[tokio::test]
955+
#[serial(db)]
956+
async fn test_query_with_signature_and_user_cte_succeeds() {
957+
let db = TestDb::empty().await;
958+
let opts = default_options();
959+
960+
let result = execute_query_postgres(
961+
&db.pool,
962+
"WITH numbers AS (SELECT 1 AS n) SELECT n FROM numbers",
963+
&["Transfer(address indexed from, address indexed to, uint256 value)"],
964+
&opts,
965+
)
966+
.await
967+
.expect("Query with signature + user CTE should succeed");
968+
969+
assert_eq!(result.engine.as_deref(), Some("postgres"));
970+
assert_eq!(result.row_count, 1);
971+
assert_eq!(result.columns, vec!["n".to_string()]);
972+
}
973+
934974
#[tokio::test]
935975
#[serial(db)]
936976
async fn test_query_receipts() {
@@ -1284,5 +1324,3 @@ async fn test_query_daily_stats_pattern() {
12841324
assert!(result.columns.contains(&"day".to_string()));
12851325
assert!(result.columns.contains(&"transfer_count".to_string()));
12861326
}
1287-
1288-

0 commit comments

Comments
 (0)