Skip to content

Commit aa4e357

Browse files
committed
Update axum and related
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent cdc57d7 commit aa4e357

File tree

25 files changed

+250
-221
lines changed

25 files changed

+250
-221
lines changed

Cargo.lock

Lines changed: 157 additions & 132 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

deny.toml

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@ skip = [
8888

8989
{ name = "hyper-timeout", version = "0.4.1" },
9090

91-
{ name = "tungstenite", version = "0.24.0" },
92-
{ name = "tokio-tungstenite", version = "0.24.0" },
91+
{ name = "tungstenite", version = "0.27.0" },
92+
{ name = "tokio-tungstenite", version = "0.27.0" },
9393

9494
# `axum 0.7.5` depends on both `sync_wrapper 1.*` and `axum-core 0.4.3`.
9595
# The latter depends on `sync_wrapper 0.1.*`.
@@ -168,8 +168,11 @@ skip = [
168168
{ name = "phf_shared", version = "0.11.3" },
169169
{ name = "phf_generator", version = "0.11.2" },
170170
{ name = "phf_codegen", version = "0.11.3" },
171-
# multer
172-
{ name = "spin", version = "0.9.8" },
171+
# axum
172+
{ name = "matchit", version = "0.7.0" },
173+
# console-subscriber
174+
{ name = "axum", version = "0.7.9" },
175+
{ name = "axum-core", version = "0.4.5" },
173176
]
174177

175178
[[bans.deny]]

src/balancerd/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ workspace = true
1212
[dependencies]
1313
anyhow = "1.0.100"
1414
async-trait = "0.1.89"
15-
axum = "0.7.5"
15+
axum = "0.8.6"
1616
bytes = "1.10.1"
1717
bytesize = "2.1.0"
1818
chrono = { version = "0.4.39", default-features = false, features = ["std"] }

src/clusterd/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ workspace = true
1111

1212
[dependencies]
1313
anyhow = "1.0.100"
14-
axum = "0.7.5"
14+
axum = "0.8.6"
1515
clap = { version = "4.5.23", features = ["derive", "env"] }
1616
fail = { version = "0.5.1", features = ["failpoints"] }
1717
futures = "0.3.31"

src/environmentd/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ workspace = true
1515
anyhow = "1.0.100"
1616
askama = { version = "0.12.1", default-features = false, features = ["config", "serde-json"] }
1717
async-trait = "0.1.89"
18-
axum = { version = "0.7.5", features = ["ws"] }
19-
axum-extra = { version = "0.9.3", features = ["typed-header"] }
18+
axum = { version = "0.8.6", features = ["ws"] }
19+
axum-extra = { version = "0.10.3", features = ["typed-header"] }
2020
base64 = "0.22.1"
2121
bytes = "1.10.1"
2222
bytesize = "2.1.0"
@@ -124,7 +124,7 @@ tracing = "0.1.37"
124124
tracing-capture = { version = "0.1.0", optional = true }
125125
tracing-opentelemetry = { version = "0.25.0" }
126126
tracing-subscriber = "0.3.19"
127-
tungstenite = { version = "0.24.0" }
127+
tungstenite = { version = "0.28.0" }
128128
url = "2.3.1"
129129
uuid = "1.18.1"
130130
workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true }

src/environmentd/src/http.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ use std::sync::Arc;
2525
use std::time::{Duration, SystemTime};
2626

2727
use anyhow::Context;
28-
use async_trait::async_trait;
2928
use axum::error_handling::HandleErrorLayer;
3029
use axum::extract::ws::{Message, WebSocket};
3130
use axum::extract::{ConnectInfo, DefaultBodyLimit, FromRequestParts, Query, Request, State};
@@ -207,7 +206,7 @@ impl HttpServer {
207206
"/hierarchical-memory",
208207
routing::get(memory::handle_hierarchical_memory),
209208
)
210-
.route("/static/*path", routing::get(root::handle_static));
209+
.route("/static/{*path}", routing::get(root::handle_static));
211210

212211
let mut ws_router = Router::new()
213212
.route("/api/experimental/sql", routing::get(sql::handle_sql_ws))
@@ -230,7 +229,7 @@ impl HttpServer {
230229
if routes_enabled.webhook {
231230
let webhook_router = Router::new()
232231
.route(
233-
"/api/webhook/:database/:schema/:id",
232+
"/api/webhook/{:database}/{:schema}/{:id}",
234233
routing::post(webhook::handle_webhook),
235234
)
236235
.with_state(WebhookState {
@@ -315,7 +314,7 @@ impl HttpServer {
315314
routing::get(|| async { Redirect::temporary("/internal-console/") }),
316315
)
317316
.route(
318-
"/internal-console/*path",
317+
"/internal-console/{*path}",
319318
routing::get(console::handle_internal_console),
320319
)
321320
.route(
@@ -606,7 +605,6 @@ impl AuthedClient {
606605
}
607606
}
608607

609-
#[async_trait]
610608
impl<S> FromRequestParts<S> for AuthedClient
611609
where
612610
S: Send + Sync,

src/environmentd/src/http/sql.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
// the Business Source License, use of this software will be governed
88
// by the Apache License, Version 2.0.
99

10-
use std::borrow::Cow;
1110
use std::collections::BTreeMap;
1211
use std::net::{IpAddr, SocketAddr};
1312
use std::pin::pin;
@@ -354,7 +353,7 @@ async fn run_ws(state: &WsState, user: Option<AuthedUser>, peer_addr: IpAddr, mu
354353
// are safe to return because they're generated after authentication.
355354
debug!("WS request failed init: {}", e);
356355
let reason = match e.downcast_ref::<AdapterError>() {
357-
Some(error) => Cow::Owned(error.to_string()),
356+
Some(error) => error.to_string().into(),
358357
None => "unauthorized".into(),
359358
};
360359
let _ = ws
@@ -385,7 +384,7 @@ async fn run_ws(state: &WsState, user: Option<AuthedUser>, peer_addr: IpAddr, mu
385384
));
386385
for msg in msgs {
387386
let _ = ws
388-
.send(Message::Text(
387+
.send(Message::text(
389388
serde_json::to_string(&msg).expect("must serialize"),
390389
))
391390
.await;
@@ -491,7 +490,7 @@ async fn run_ws_request(
491490
/// Sends a single [`WebSocketResponse`] over the provided [`WebSocket`].
492491
async fn send_ws_response(ws: &mut WebSocket, resp: WebSocketResponse) -> Result<(), Error> {
493492
let msg = serde_json::to_string(&resp).unwrap();
494-
let msg = Message::Text(msg);
493+
let msg = Message::text(msg);
495494
ws.send(msg).await?;
496495

497496
Ok(())
@@ -1104,7 +1103,7 @@ impl ResultSender for WebSocket {
11041103
tick.tick().await;
11051104
loop {
11061105
tick.tick().await;
1107-
if let Err(err) = self.send(Message::Ping(Vec::new())).await {
1106+
if let Err(err) = self.send(Message::Ping(Default::default())).await {
11081107
return err.into();
11091108
}
11101109
}

src/environmentd/src/test_util.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1503,7 +1503,7 @@ pub fn auth_with_ws(
15031503
}
15041504
auth_with_ws_impl(
15051505
ws,
1506-
Message::Text(
1506+
Message::text(
15071507
serde_json::to_string(&WebSocketAuth::Basic {
15081508
user: "materialize".into(),
15091509
password: "".into(),

src/environmentd/tests/auth.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -355,11 +355,11 @@ async fn run_tests<'a>(header: &str, server: &test_util::TestServer, tests: &[Te
355355
let stream = make_ws_tls(&uri, configure);
356356
let (mut ws, _resp) = tungstenite::client(uri, stream).unwrap();
357357

358-
ws.send(Message::Text(serde_json::to_string(&auth).unwrap()))
358+
ws.send(Message::text(serde_json::to_string(&auth).unwrap()))
359359
.unwrap();
360360

361-
ws.send(Message::Text(
362-
r#"{"query": "SELECT pg_catalog.current_user()"}"#.into(),
361+
ws.send(Message::text(
362+
r#"{"query": "SELECT pg_catalog.current_user()"}"#,
363363
))
364364
.unwrap();
365365

@@ -3482,7 +3482,7 @@ async fn test_password_auth_http() {
34823482
.unwrap();
34833483

34843484
let query = r#"{"query":"SELECT current_user"}"#;
3485-
let ws_options_msg = Message::Text(r#"{"options": {}}"#.to_owned());
3485+
let ws_options_msg = Message::text(r#"{"options": {}}"#);
34863486

34873487
let http_client = hyper_util::client::legacy::Client::builder(TokioExecutor::new())
34883488
.pool_idle_timeout(Duration::from_secs(10))
@@ -3511,7 +3511,7 @@ async fn test_password_auth_http() {
35113511
ws.read().unwrap(),
35123512
Message::Close(Some(CloseFrame {
35133513
code: CloseCode::Protocol,
3514-
reason: Cow::Borrowed("unauthorized"),
3514+
reason: "unauthorized".into(),
35153515
})),
35163516
);
35173517

@@ -3577,7 +3577,7 @@ async fn test_password_auth_http() {
35773577
let msg: WebSocketResponse = serde_json::from_str(&msg).unwrap();
35783578
match msg {
35793579
WebSocketResponse::ReadyForQuery(_) => {
3580-
ws.send(Message::Text(query.to_owned())).unwrap();
3580+
ws.send(Message::text(query)).unwrap();
35813581
}
35823582
WebSocketResponse::Row(rows) => {
35833583
assert_eq!(&rows, &[serde_json::Value::from("mz_system".to_owned())]);

src/environmentd/tests/server.rs

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -914,8 +914,8 @@ fn test_http_sql() {
914914

915915
f.run(|tc| {
916916
let msg = match tc.directive.as_str() {
917-
"ws-text" => Message::Text(tc.input.clone()),
918-
"ws-binary" => Message::Binary(tc.input.as_bytes().to_vec()),
917+
"ws-text" => Message::text(&tc.input),
918+
"ws-binary" => Message::Binary(tc.input.clone().into()),
919919
"http" => {
920920
let json: serde_json::Value = serde_json::from_str(&tc.input).unwrap();
921921
let res = Client::new()
@@ -938,12 +938,12 @@ fn test_http_sql() {
938938
loop {
939939
let resp = ws.read().unwrap();
940940
match resp {
941-
Message::Text(mut msg) => {
942-
if fixtimestamp {
943-
msg = fixtimestamp_re
944-
.replace_all(&msg, fixtimestamp_replace)
945-
.into();
946-
}
941+
Message::Text(msg) => {
942+
let msg = if fixtimestamp {
943+
fixtimestamp_re.replace_all(&msg, fixtimestamp_replace)
944+
} else {
945+
msg.as_str().into()
946+
};
947947
let msg: WebSocketResponse = serde_json::from_str(&msg).unwrap();
948948
write!(&mut responses, "{}\n", serde_json::to_string(&msg).unwrap())
949949
.unwrap();
@@ -1742,7 +1742,7 @@ fn test_max_request_size() {
17421742
let json =
17431743
format!("{{\"queries\":[{{\"query\":\"{statement}\",\"params\":[\"{param}\"]}}]}}");
17441744
let json: serde_json::Value = serde_json::from_str(&json).unwrap();
1745-
ws.send(Message::Text(json.to_string())).unwrap();
1745+
ws.send(Message::text(json.to_string())).unwrap();
17461746

17471747
// The specific error isn't forwarded to the client, the connection is just closed.
17481748
let err = ws.read().unwrap_err();
@@ -1819,7 +1819,7 @@ fn test_max_statement_batch_size() {
18191819
test_util::auth_with_ws(&mut ws, BTreeMap::default()).unwrap();
18201820
let json = format!("{{\"query\":\"{statements}\"}}");
18211821
let json: serde_json::Value = serde_json::from_str(&json).unwrap();
1822-
ws.send(Message::Text(json.to_string())).unwrap();
1822+
ws.send(Message::text(json.to_string())).unwrap();
18231823

18241824
// Discard the CommandStarting message
18251825
let _ = ws.read().unwrap();
@@ -1875,7 +1875,7 @@ fn test_ws_passes_options() {
18751875
// set from the options map we passed with the auth.
18761876
let json = "{\"query\":\"SHOW application_name;\"}";
18771877
let json: serde_json::Value = serde_json::from_str(json).unwrap();
1878-
ws.send(Message::Text(json.to_string())).unwrap();
1878+
ws.send(Message::text(json.to_string())).unwrap();
18791879

18801880
let mut read_msg = || -> WebSocketResponse {
18811881
let msg = ws.read().unwrap();
@@ -1930,7 +1930,7 @@ fn test_ws_subscribe_no_crash() {
19301930
let query = "SUBSCRIBE (SELECT 1)";
19311931
let json = format!("{{\"query\":\"{query}\"}}");
19321932
let json: serde_json::Value = serde_json::from_str(&json).unwrap();
1933-
ws.send(Message::Text(json.to_string())).unwrap();
1933+
ws.send(Message::text(json.to_string())).unwrap();
19341934

19351935
// Give the server time to crash, if it's going to.
19361936
std::thread::sleep(Duration::from_secs(1))
@@ -2143,7 +2143,7 @@ fn test_max_connections_on_all_interfaces() {
21432143
test_util::auth_with_ws(&mut ws, BTreeMap::default()).unwrap();
21442144
let json = format!("{{\"query\":\"{query}\"}}");
21452145
let json: serde_json::Value = serde_json::from_str(&json).unwrap();
2146-
ws.send(Message::Text(json.to_string())).unwrap();
2146+
ws.send(Message::text(json.to_string())).unwrap();
21472147

21482148
// The specific error isn't forwarded to the client, the connection is just closed.
21492149
match ws.read() {
@@ -2154,13 +2154,13 @@ fn test_max_connections_on_all_interfaces() {
21542154
);
21552155
assert_eq!(
21562156
ws.read().unwrap(),
2157-
Message::Text(format!(
2157+
Message::text(format!(
21582158
r#"{{"type":"Rows","payload":{{"columns":[{{"name":"{UNKNOWN_COLUMN_NAME}","type_oid":23,"type_len":4,"type_mod":-1}}]}}}}"#
21592159
))
21602160
);
21612161
assert_eq!(
21622162
ws.read().unwrap(),
2163-
Message::Text("{\"type\":\"Row\",\"payload\":[\"1\"]}".to_string())
2163+
Message::text("{\"type\":\"Row\",\"payload\":[\"1\"]}")
21642164
);
21652165
tracing::info!("data: {:?}", ws.read().unwrap());
21662166
}
@@ -2593,15 +2593,15 @@ fn test_internal_ws_auth() {
25932593
// Auth with OptionsOnly
25942594
test_util::auth_with_ws_impl(
25952595
&mut ws,
2596-
Message::Text(serde_json::to_string(&WebSocketAuth::OptionsOnly { options }).unwrap()),
2596+
Message::text(serde_json::to_string(&WebSocketAuth::OptionsOnly { options }).unwrap()),
25972597
)
25982598
.unwrap();
25992599

26002600
// Query to make sure we get back the correct user, which should be
26012601
// set from the headers passed with the websocket request.
26022602
let json = "{\"query\":\"SELECT current_user;\"}";
26032603
let json: serde_json::Value = serde_json::from_str(json).unwrap();
2604-
ws.send(Message::Text(json.to_string())).unwrap();
2604+
ws.send(Message::text(json.to_string())).unwrap();
26052605

26062606
let mut read_msg = || -> WebSocketResponse {
26072607
let msg = ws.read().unwrap();
@@ -2768,7 +2768,7 @@ fn test_cancel_ws() {
27682768
test_util::auth_with_ws(&mut ws, BTreeMap::default()).unwrap();
27692769
let json = r#"{"queries":[{"query":"SUBSCRIBE t"}]}"#;
27702770
let json: serde_json::Value = serde_json::from_str(json).unwrap();
2771-
ws.send(Message::Text(json.to_string())).unwrap();
2771+
ws.send(Message::text(json.to_string())).unwrap();
27722772

27732773
loop {
27742774
let msg = ws.read().unwrap();
@@ -2861,7 +2861,10 @@ async fn smoketest_webhook_source() {
28612861
assert_eq!(total_requests_metric.get_counter().get_value(), 100.0);
28622862

28632863
let path_label = &total_requests_metric.get_label()[0];
2864-
assert_eq!(path_label.value(), "/api/webhook/:database/:schema/:id");
2864+
assert_eq!(
2865+
path_label.value(),
2866+
"/api/webhook/{:database}/{:schema}/{:id}"
2867+
);
28652868

28662869
let status_label = &total_requests_metric.get_label()[2];
28672870
assert_eq!(status_label.value(), "200");
@@ -3057,10 +3060,10 @@ fn test_github_20262() {
30573060

30583061
let (mut ws, _resp) = tungstenite::connect(server.ws_addr()).unwrap();
30593062
test_util::auth_with_ws(&mut ws, BTreeMap::default()).unwrap();
3060-
ws.send(Message::Text(subscribe)).unwrap();
3063+
ws.send(Message::text(subscribe)).unwrap();
30613064
cancel();
3062-
ws.send(Message::Text(commit)).unwrap();
3063-
ws.send(Message::Text(select)).unwrap();
3065+
ws.send(Message::text(commit)).unwrap();
3066+
ws.send(Message::text(select)).unwrap();
30643067

30653068
let mut expect = VecDeque::from([
30663069
r#"{"type":"CommandStarting","payload":{"has_rows":true,"is_streaming":true}}"#.to_string(),
@@ -4353,7 +4356,7 @@ async fn test_double_encoded_json() {
43534356

43544357
let json = "{\"query\":\"SELECT a FROM t1 ORDER BY a;\"}";
43554358
let json: serde_json::Value = serde_json::from_str(json).unwrap();
4356-
ws.send(Message::Text(json.to_string())).unwrap();
4359+
ws.send(Message::text(json.to_string())).unwrap();
43574360

43584361
let mut read_msg = || -> WebSocketResponse {
43594362
let msg = ws.read().unwrap();

0 commit comments

Comments
 (0)