Skip to content

Commit d7564f4

Browse files
committed
feat: implement max/min req/res sizes
we need to prepare for implementation of FCUs deduplication (this avalanche problem still persists). that will require assembly of incoming requests before the decision whether to proxy them or not is made => and for that we will need buffers + reasonable limits.
1 parent 1f40ab5 commit d7564f4

File tree

5 files changed

+251
-8
lines changed

5 files changed

+251
-8
lines changed

crates/rproxy/src/server/proxy/config/authrpc.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,28 @@ pub(crate) struct ConfigAuthrpc {
131131
)]
132132
pub(crate) log_sanitise: bool,
133133

134+
/// max size of authrpc requests
135+
#[arg(
136+
default_value = "16",
137+
env = "RPROXY_AUTHRPC_MAX_REQUEST_SIZE_MB",
138+
help_heading = "authrpc",
139+
long("authrpc-max-request-size-mb"),
140+
name("authrpc_max_request_size_mb"),
141+
value_name = "megabytes"
142+
)]
143+
pub(crate) max_request_size_mb: usize,
144+
145+
/// max size of authrpc responses
146+
#[arg(
147+
default_value = "256",
148+
env = "RPROXY_AUTHRPC_MAX_RESPONSE_SIZE_MB",
149+
help_heading = "authrpc",
150+
long("authrpc-max-response-size-mb"),
151+
name("authrpc_max_response_size_mb"),
152+
value_name = "megabytes"
153+
)]
154+
pub(crate) max_response_size_mb: usize,
155+
134156
/// list of authrpc peers urls to mirror the requests to
135157
#[arg(
136158
env="RPROXY_AUTHRPC_MIRRORING_PEERS",
@@ -153,6 +175,28 @@ pub(crate) struct ConfigAuthrpc {
153175
#[clap(value_enum)]
154176
pub(crate) mirroring_strategy: ConfigProxyHttpMirroringStrategy,
155177

178+
/// size of preallocated authrpc request buffers
179+
#[arg(
180+
default_value = "1",
181+
env = "RPROXY_AUTHRPC_PREALLOCATED_RESPONSE_BUFFER_SIZE_KB",
182+
help_heading = "authrpc",
183+
long("authrpc-preallocated-request-buffer-size-kb"),
184+
name("authrpc_preallocated_request_buffer_size_kb"),
185+
value_name = "kilobytes"
186+
)]
187+
pub(crate) prealloacated_request_buffer_size_kb: usize,
188+
189+
/// size of preallocated authrpc response buffers
190+
#[arg(
191+
default_value = "1",
192+
env = "RPROXY_AUTHRPC_PREALLOCATED_RESPONSE_BUFFER_SIZE_KB",
193+
help_heading = "authrpc",
194+
long("authrpc-preallocated-response-buffer-size-kb"),
195+
name("authrpc_preallocated_response_buffer_size_kb"),
196+
value_name = "kilobytes"
197+
)]
198+
pub(crate) prealloacated_response_buffer_size_kb: usize,
199+
156200
/// remove authrpc backend from mirroring peers
157201
#[arg(
158202
env = "RPROXY_AUTHRPC_REMOVE_BACKEND_FROM_MIRRORING_PEERS",
@@ -329,6 +373,16 @@ impl ConfigProxyHttp for ConfigAuthrpc {
329373
self.log_sanitise
330374
}
331375

376+
#[inline]
377+
fn max_request_size(&self) -> usize {
378+
1024 * 1024 * self.max_request_size_mb
379+
}
380+
381+
#[inline]
382+
fn max_response_size(&self) -> usize {
383+
1024 * 1024 * self.max_response_size_mb
384+
}
385+
332386
#[inline]
333387
fn mirroring_peer_urls(&self) -> Vec<Url> {
334388
self.mirroring_peer_urls
@@ -341,6 +395,16 @@ impl ConfigProxyHttp for ConfigAuthrpc {
341395
fn mirroring_strategy(&self) -> &ConfigProxyHttpMirroringStrategy {
342396
&self.mirroring_strategy
343397
}
398+
399+
#[inline]
400+
fn prealloacated_request_buffer_size(&self) -> usize {
401+
1024 * self.prealloacated_request_buffer_size_kb
402+
}
403+
404+
#[inline]
405+
fn prealloacated_response_buffer_size(&self) -> usize {
406+
1024 * self.prealloacated_response_buffer_size_kb
407+
}
344408
}
345409

346410
// ConfigAuthrpcError --------------------------------------------------

crates/rproxy/src/server/proxy/config/rpc.rs

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,28 @@ pub(crate) struct ConfigRpc {
130130
)]
131131
pub(crate) log_sanitise: bool,
132132

133+
/// max size of rpc requests
134+
#[arg(
135+
default_value = "16",
136+
env = "RPROXY_RPC_MAX_REQUEST_SIZE_MB",
137+
help_heading = "rpc",
138+
long("rpc-max-request-size-mb"),
139+
name("rpc_max_request_size_mb"),
140+
value_name = "megabytes"
141+
)]
142+
pub(crate) max_request_size_mb: usize,
143+
144+
/// max size of rpc responses
145+
#[arg(
146+
default_value = "256",
147+
env = "RPROXY_RPC_MAX_RESPONSE_SIZE_MB",
148+
help_heading = "rpc",
149+
long("rpc-max-response-size-mb"),
150+
name("rpc_max_response_size_mb"),
151+
value_name = "megabytes"
152+
)]
153+
pub(crate) max_response_size_mb: usize,
154+
133155
/// whether the requests that returned an error from rpc backend should
134156
/// be mirrored to peers
135157
#[arg(
@@ -162,6 +184,28 @@ pub(crate) struct ConfigRpc {
162184
#[clap(value_enum)]
163185
pub(crate) mirroring_strategy: ConfigProxyHttpMirroringStrategy,
164186

187+
/// size of preallocated rpc request buffers
188+
#[arg(
189+
default_value = "1",
190+
env = "RPROXY_RPC_PREALLOCATED_RESPONSE_BUFFER_SIZE_KB",
191+
help_heading = "rpc",
192+
long("rpc-preallocated-request-buffer-size-kb"),
193+
name("rpc_preallocated_request_buffer_size_kb"),
194+
value_name = "kilobytes"
195+
)]
196+
pub(crate) prealloacated_request_buffer_size_kb: usize,
197+
198+
/// size of preallocated rpc response buffers
199+
#[arg(
200+
default_value = "256",
201+
env = "RPROXY_RPC_PREALLOCATED_RESPONSE_BUFFER_SIZE_KB",
202+
help_heading = "rpc",
203+
long("rpc-preallocated-response-buffer-size-kb"),
204+
name("rpc_preallocated_response_buffer_size_kb"),
205+
value_name = "kilobytes"
206+
)]
207+
pub(crate) prealloacated_response_buffer_size_kb: usize,
208+
165209
/// remove rpc backend from peers
166210
#[arg(
167211
env = "RPROXY_RPC_REMOVE_BACKEND_FROM_MIRRORING_PEERS",
@@ -335,6 +379,16 @@ impl ConfigProxyHttp for ConfigRpc {
335379
self.log_sanitise
336380
}
337381

382+
#[inline]
383+
fn max_request_size(&self) -> usize {
384+
1024 * 1024 * self.max_request_size_mb
385+
}
386+
387+
#[inline]
388+
fn max_response_size(&self) -> usize {
389+
1024 * 1024 * self.max_response_size_mb
390+
}
391+
338392
#[inline]
339393
fn mirroring_peer_urls(&self) -> Vec<Url> {
340394
self.mirroring_peer_urls
@@ -347,6 +401,16 @@ impl ConfigProxyHttp for ConfigRpc {
347401
fn mirroring_strategy(&self) -> &ConfigProxyHttpMirroringStrategy {
348402
&self.mirroring_strategy
349403
}
404+
405+
#[inline]
406+
fn prealloacated_request_buffer_size(&self) -> usize {
407+
1024 * self.prealloacated_request_buffer_size_kb
408+
}
409+
410+
#[inline]
411+
fn prealloacated_response_buffer_size(&self) -> usize {
412+
1024 * self.prealloacated_response_buffer_size_kb
413+
}
350414
}
351415

352416
// ConfigRpcError ------------------------------------------------------

crates/rproxy/src/server/proxy/http/config.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,12 @@ pub(crate) trait ConfigProxyHttp: Clone + Send + Unpin + 'static {
1515
fn log_proxied_requests(&self) -> bool;
1616
fn log_proxied_responses(&self) -> bool;
1717
fn log_sanitise(&self) -> bool;
18+
fn max_request_size(&self) -> usize;
19+
fn max_response_size(&self) -> usize;
1820
fn mirroring_peer_urls(&self) -> Vec<Url>;
1921
fn mirroring_strategy(&self) -> &ConfigProxyHttpMirroringStrategy;
22+
fn prealloacated_request_buffer_size(&self) -> usize;
23+
fn prealloacated_response_buffer_size(&self) -> usize;
2024
}
2125

2226
// ConfigProxyHttpMirroringStrategy ------------------------------------

crates/rproxy/src/server/proxy/http/proxy.rs

Lines changed: 71 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use awc::{
3131
ClientResponse,
3232
Connector,
3333
body::MessageBody,
34-
error::HeaderValue,
34+
error::{HeaderValue, PayloadError},
3535
http::{Method, header::HeaderMap},
3636
};
3737
use bytes::Bytes;
@@ -318,7 +318,13 @@ where
318318
let connection_id = info.connection_id;
319319

320320
let bck_req = this.backend.new_backend_request(&info);
321-
let bck_req_body = ProxyHttpRequestBody::new(this.clone(), info, cli_req_body, timestamp);
321+
let bck_req_body = ProxyHttpRequestBody::new(
322+
this.clone(),
323+
info,
324+
cli_req_body,
325+
this.shared.config().prealloacated_request_buffer_size(),
326+
timestamp,
327+
);
322328

323329
let bck_res = match bck_req.send_stream(bck_req_body).await {
324330
Ok(res) => res,
@@ -345,12 +351,14 @@ where
345351
let status = bck_res.status();
346352
let mut cli_res = Self::to_client_response(&bck_res);
347353

354+
let preallocate = this.shared.config().prealloacated_response_buffer_size();
348355
let bck_body = ProxyHttpResponseBody::new(
349356
this,
350357
id,
351358
status,
352359
bck_res.headers().clone(),
353360
bck_res.into_stream(),
361+
preallocate,
354362
timestamp,
355363
);
356364

@@ -1205,6 +1213,7 @@ where
12051213
info: Option<ProxyHttpRequestInfo>,
12061214
start: UtcDateTime,
12071215
body: Vec<u8>,
1216+
max_size: usize,
12081217

12091218
#[pin]
12101219
stream: S,
@@ -1216,25 +1225,28 @@ where
12161225
P: ProxyHttpInner<C>,
12171226
{
12181227
fn new(
1219-
worker: web::Data<ProxyHttp<C, P>>,
1228+
proxy: web::Data<ProxyHttp<C, P>>,
12201229
info: ProxyHttpRequestInfo,
12211230
body: S,
1231+
preallocate: usize,
12221232
timestamp: UtcDateTime,
12231233
) -> Self {
1234+
let max_size = proxy.shared.config().max_request_size();
12241235
Self {
1225-
proxy: worker,
1236+
proxy,
12261237
info: Some(info),
12271238
stream: body,
12281239
start: timestamp,
1229-
body: Vec::new(), // TODO: preallocate reasonable size
1240+
body: Vec::with_capacity(preallocate),
1241+
max_size,
12301242
}
12311243
}
12321244
}
12331245

12341246
impl<S, E, C, P> Stream for ProxyHttpRequestBody<S, C, P>
12351247
where
12361248
S: Stream<Item = Result<Bytes, E>>,
1237-
E: Debug,
1249+
E: From<PayloadError> + Debug,
12381250
C: ConfigProxyHttp,
12391251
P: ProxyHttpInner<C>,
12401252
{
@@ -1247,6 +1259,30 @@ where
12471259
Poll::Pending => Poll::Pending,
12481260

12491261
Poll::Ready(Some(Ok(chunk))) => {
1262+
if this.body.len() + chunk.len() > *this.max_size {
1263+
let err = format!(
1264+
"request is too large: {}+ > {}",
1265+
this.body.len() + chunk.len(),
1266+
*this.max_size
1267+
);
1268+
if let Some(info) = mem::take(this.info) {
1269+
warn!(
1270+
proxy = P::name(),
1271+
request_id = %info.id(),
1272+
connection_id = %info.connection_id(),
1273+
error = err,
1274+
"Proxy http request stream error",
1275+
);
1276+
} else {
1277+
warn!(
1278+
proxy = P::name(),
1279+
error = err,
1280+
request_id = "unknown",
1281+
"Proxy http request stream error",
1282+
);
1283+
}
1284+
return Poll::Ready(Some(Err(E::from(PayloadError::Overflow))))
1285+
}
12501286
this.body.extend_from_slice(&chunk);
12511287
Poll::Ready(Some(Ok(chunk)))
12521288
}
@@ -1301,6 +1337,7 @@ where
13011337
info: Option<ProxyHttpResponseInfo>,
13021338
start: UtcDateTime,
13031339
body: Vec<u8>,
1340+
max_size: usize,
13041341

13051342
#[pin]
13061343
stream: S,
@@ -1317,13 +1354,16 @@ where
13171354
status: StatusCode,
13181355
headers: HeaderMap,
13191356
body: S,
1357+
preallocate: usize,
13201358
timestamp: UtcDateTime,
13211359
) -> Self {
1360+
let max_size = proxy.shared.config().max_response_size();
13221361
Self {
13231362
proxy,
13241363
stream: body,
13251364
start: timestamp,
1326-
body: Vec::new(), // TODO: preallocate reasonable size
1365+
body: Vec::with_capacity(preallocate),
1366+
max_size,
13271367
info: Some(ProxyHttpResponseInfo::new(id, status, headers)),
13281368
}
13291369
}
@@ -1332,7 +1372,7 @@ where
13321372
impl<S, E, C, P> Stream for ProxyHttpResponseBody<S, C, P>
13331373
where
13341374
S: Stream<Item = Result<Bytes, E>>,
1335-
E: Debug,
1375+
E: From<PayloadError> + Debug,
13361376
C: ConfigProxyHttp,
13371377
P: ProxyHttpInner<C>,
13381378
{
@@ -1345,6 +1385,29 @@ where
13451385
Poll::Pending => Poll::Pending,
13461386

13471387
Poll::Ready(Some(Ok(chunk))) => {
1388+
if this.body.len() + chunk.len() > *this.max_size {
1389+
let err = format!(
1390+
"response is too large: {}+ > {}",
1391+
this.body.len() + chunk.len(),
1392+
*this.max_size
1393+
);
1394+
if let Some(info) = mem::take(this.info) {
1395+
warn!(
1396+
proxy = P::name(),
1397+
request_id = %info.id(),
1398+
error = err,
1399+
"Proxy http response stream error",
1400+
);
1401+
} else {
1402+
warn!(
1403+
proxy = P::name(),
1404+
error = err,
1405+
request_id = "unknown",
1406+
"Proxy http response stream error",
1407+
);
1408+
}
1409+
return Poll::Ready(Some(Err(E::from(PayloadError::Overflow))))
1410+
}
13481411
this.body.extend_from_slice(&chunk);
13491412
Poll::Ready(Some(Ok(chunk)))
13501413
}

0 commit comments

Comments
 (0)