Skip to content

Commit a82d479

Browse files
committed
fix: parallelize runner config endpoints (#3253)
1 parent edd820d commit a82d479

File tree

6 files changed

+142
-107
lines changed

6 files changed

+142
-107
lines changed

engine/packages/api-peer/src/runner_configs.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,14 +124,14 @@ pub async fn upsert(
124124
})
125125
}
126126

127-
#[derive(Debug, Serialize, Deserialize, IntoParams)]
127+
#[derive(Debug, Serialize, Clone, Deserialize, IntoParams)]
128128
#[serde(deny_unknown_fields)]
129129
#[into_params(parameter_in = Query)]
130130
pub struct DeleteQuery {
131131
pub namespace: String,
132132
}
133133

134-
#[derive(Deserialize)]
134+
#[derive(Deserialize, Clone)]
135135
#[serde(deny_unknown_fields)]
136136
pub struct DeletePath {
137137
pub runner_name: String,

engine/packages/api-public/src/runner_configs/delete.rs

Lines changed: 39 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
use anyhow::Result;
1+
use anyhow::{Context, Result};
22
use axum::response::{IntoResponse, Response};
3+
use futures_util::{StreamExt, TryStreamExt};
34
use rivet_api_builder::{
45
ApiError,
56
extract::{Extension, Json, Path, Query},
@@ -38,30 +39,43 @@ pub async fn delete(
3839
async fn delete_inner(ctx: ApiCtx, path: DeletePath, query: DeleteQuery) -> Result<DeleteResponse> {
3940
ctx.auth().await?;
4041

41-
for dc in &ctx.config().topology().datacenters {
42-
if ctx.config().dc_label() == dc.datacenter_label {
43-
rivet_api_peer::runner_configs::delete(
44-
ctx.clone().into(),
45-
DeletePath {
46-
runner_name: path.runner_name.clone(),
47-
},
48-
DeleteQuery {
49-
namespace: query.namespace.clone(),
50-
},
51-
)
52-
.await?;
53-
} else {
54-
request_remote_datacenter::<DeleteResponse>(
55-
ctx.config(),
56-
dc.datacenter_label,
57-
&format!("/runner-configs/{}", path.runner_name),
58-
axum::http::Method::DELETE,
59-
Some(&query),
60-
Option::<&()>::None,
61-
)
62-
.await?;
63-
}
64-
}
42+
let dcs = ctx.config().topology().datacenters.clone();
43+
futures_util::stream::iter(dcs)
44+
.map(|dc| {
45+
let ctx = ctx.clone();
46+
let query = query.clone();
47+
let path = path.clone();
48+
async move {
49+
if ctx.config().dc_label() == dc.datacenter_label {
50+
rivet_api_peer::runner_configs::delete(
51+
ctx.clone().into(),
52+
DeletePath {
53+
runner_name: path.runner_name.clone(),
54+
},
55+
DeleteQuery {
56+
namespace: query.namespace.clone(),
57+
},
58+
)
59+
.await?;
60+
} else {
61+
request_remote_datacenter::<DeleteResponse>(
62+
ctx.config(),
63+
dc.datacenter_label,
64+
&format!("/runner-configs/{}", path.runner_name),
65+
axum::http::Method::DELETE,
66+
Some(&query),
67+
Option::<&()>::None,
68+
)
69+
.await?;
70+
}
71+
72+
anyhow::Ok(())
73+
}
74+
})
75+
.buffer_unordered(16)
76+
.try_collect::<Vec<_>>()
77+
// NOTE: We must error when any peer request fails, not all
78+
.await?;
6579

6680
// Resolve namespace
6781
let namespace = ctx

engine/packages/api-public/src/runner_configs/upsert.rs

Lines changed: 68 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use std::collections::HashMap;
22

33
use anyhow::Result;
44
use axum::response::{IntoResponse, Response};
5+
use futures_util::{StreamExt, TryStreamExt};
56
use rivet_api_builder::{
67
ApiError,
78
extract::{Extension, Json, Path, Query},
@@ -85,58 +86,75 @@ async fn upsert_inner(
8586
})
8687
.next();
8788

88-
// Apply config
89-
let mut any_endpoint_config_changed = false;
90-
for dc in &ctx.config().topology().datacenters {
91-
if let Some(runner_config) = body.datacenters.remove(&dc.name) {
92-
let response = if ctx.config().dc_label() == dc.datacenter_label {
93-
rivet_api_peer::runner_configs::upsert(
94-
ctx.clone().into(),
95-
path.clone(),
96-
query.clone(),
97-
rivet_api_peer::runner_configs::UpsertRequest(runner_config),
98-
)
99-
.await?
100-
} else {
101-
request_remote_datacenter::<UpsertResponse>(
102-
ctx.config(),
103-
dc.datacenter_label,
104-
&format!("/runner-configs/{}", path.runner_name),
105-
axum::http::Method::PUT,
106-
Some(&query),
107-
Some(&runner_config),
108-
)
109-
.await?
110-
};
89+
let dcs = ctx
90+
.config()
91+
.topology()
92+
.datacenters
93+
.iter()
94+
.map(|dc| (dc.clone(), body.datacenters.remove(&dc.name)))
95+
.collect::<Vec<_>>();
96+
let any_endpoint_config_changed = futures_util::stream::iter(dcs)
97+
.map(|(dc, runner_config)| {
98+
let ctx = ctx.clone();
99+
let query = query.clone();
100+
let path = path.clone();
101+
async move {
102+
if let Some(runner_config) = runner_config {
103+
let response = if ctx.config().dc_label() == dc.datacenter_label {
104+
rivet_api_peer::runner_configs::upsert(
105+
ctx.clone().into(),
106+
path.clone(),
107+
query.clone(),
108+
rivet_api_peer::runner_configs::UpsertRequest(runner_config),
109+
)
110+
.await?
111+
} else {
112+
request_remote_datacenter::<UpsertResponse>(
113+
ctx.config(),
114+
dc.datacenter_label,
115+
&format!("/runner-configs/{}", path.runner_name),
116+
axum::http::Method::PUT,
117+
Some(&query),
118+
Some(&runner_config),
119+
)
120+
.await?
121+
};
111122

112-
if response.endpoint_config_changed {
113-
any_endpoint_config_changed = true;
114-
}
115-
} else {
116-
if ctx.config().dc_label() == dc.datacenter_label {
117-
rivet_api_peer::runner_configs::delete(
118-
ctx.clone().into(),
119-
DeletePath {
120-
runner_name: path.runner_name.clone(),
121-
},
122-
DeleteQuery {
123-
namespace: query.namespace.clone(),
124-
},
125-
)
126-
.await?;
127-
} else {
128-
request_remote_datacenter::<DeleteResponse>(
129-
ctx.config(),
130-
dc.datacenter_label,
131-
&format!("/runner-configs/{}", path.runner_name),
132-
axum::http::Method::DELETE,
133-
Some(&query),
134-
Option::<&()>::None,
135-
)
136-
.await?;
123+
anyhow::Ok(response.endpoint_config_changed)
124+
} else {
125+
if ctx.config().dc_label() == dc.datacenter_label {
126+
rivet_api_peer::runner_configs::delete(
127+
ctx.clone().into(),
128+
DeletePath {
129+
runner_name: path.runner_name.clone(),
130+
},
131+
DeleteQuery {
132+
namespace: query.namespace.clone(),
133+
},
134+
)
135+
.await?;
136+
} else {
137+
request_remote_datacenter::<DeleteResponse>(
138+
ctx.config(),
139+
dc.datacenter_label,
140+
&format!("/runner-configs/{}", path.runner_name),
141+
axum::http::Method::DELETE,
142+
Some(&query),
143+
Option::<&()>::None,
144+
)
145+
.await?;
146+
}
147+
148+
Ok(false)
149+
}
137150
}
138-
}
139-
}
151+
})
152+
.buffer_unordered(16)
153+
.try_collect::<Vec<_>>()
154+
// NOTE: We must error when any peer request fails, not all
155+
.await?
156+
.into_iter()
157+
.any(|endpoint_config_changed| endpoint_config_changed);
140158

141159
// Update runner metadata
142160
//

engine/packages/api-public/src/runner_configs/utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ pub async fn fetch_serverless_runner_metadata(
8989
.headers(header_map)
9090
.timeout(Duration::from_secs(10))
9191
.send()
92+
.custom_instrument(tracing::info_span!("fetch_metadata_request"))
9293
.await
9394
.map_err(|err| {
9495
if err.is_timeout() {

engine/packages/api-util/src/lib.rs

Lines changed: 31 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -109,38 +109,39 @@ where
109109
A: Fn(u16, I, &mut R),
110110
R: Default + Send + 'static,
111111
{
112-
let dcs = &ctx.config().topology().datacenters;
113-
114-
let results = futures_util::stream::iter(dcs.clone().into_iter().map(|dc| {
115-
let ctx = ctx.clone();
116-
let query = query.clone();
117-
let endpoint = endpoint.to_string();
118-
let local_handler = local_handler.clone();
119-
120-
async move {
121-
if dc.datacenter_label == ctx.config().dc_label() {
122-
// Local datacenter - use direct API call
123-
(dc.datacenter_label, local_handler(ctx, query).await)
124-
} else {
125-
// Remote datacenter - HTTP request
126-
(
127-
dc.datacenter_label,
128-
request_remote_datacenter::<I>(
129-
ctx.config(),
112+
let dcs = ctx.config().topology().datacenters.clone();
113+
114+
let results = futures_util::stream::iter(dcs)
115+
.map(|dc| {
116+
let ctx = ctx.clone();
117+
let query = query.clone();
118+
let endpoint = endpoint.to_string();
119+
let local_handler = local_handler.clone();
120+
121+
async move {
122+
if dc.datacenter_label == ctx.config().dc_label() {
123+
// Local datacenter - use direct API call
124+
(dc.datacenter_label, local_handler(ctx, query).await)
125+
} else {
126+
// Remote datacenter - HTTP request
127+
(
130128
dc.datacenter_label,
131-
&endpoint,
132-
Method::GET,
133-
Some(&query),
134-
Option::<&()>::None,
129+
request_remote_datacenter::<I>(
130+
ctx.config(),
131+
dc.datacenter_label,
132+
&endpoint,
133+
Method::GET,
134+
Some(&query),
135+
Option::<&()>::None,
136+
)
137+
.await,
135138
)
136-
.await,
137-
)
139+
}
138140
}
139-
}
140-
}))
141-
.buffer_unordered(16)
142-
.collect::<Vec<_>>()
143-
.await;
141+
})
142+
.buffer_unordered(16)
143+
.collect::<Vec<_>>()
144+
.await;
144145

145146
// Aggregate results
146147
let result_count = results.len();
@@ -159,7 +160,7 @@ where
159160
// Error only if all requests failed
160161
if result_count == errors.len() {
161162
if let Some(res) = errors.into_iter().next() {
162-
return Err(res).with_context(|| "all datacenter requests failed");
163+
return Err(res).context("all datacenter requests failed");
163164
}
164165
}
165166

engine/packages/namespace/src/ops/resolve_for_name_global.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ pub async fn namespace_resolve_for_name_global(
3535
.get(url)
3636
.query(&[("name", &input.name)])
3737
.send()
38+
.custom_instrument(tracing::info_span!("namespaces_http_request"))
3839
.await?;
3940

4041
let res = rivet_api_util::parse_response::<

0 commit comments

Comments
 (0)