From d7564f448ced57ee6a7dfe7df81fdfc179c22e18 Mon Sep 17 00:00:00 2001 From: Anton Bronnikov Date: Thu, 23 Oct 2025 12:38:31 +0200 Subject: [PATCH 1/3] feat: implement max/min req/res sizes we need to prepare for implementation of FCUs deduplication (this avalanche problem still persists). that will require assembly of incoming requests before the decision whether to proxy them or not is made => and for that we will need buffers + reasonable limits. --- .../rproxy/src/server/proxy/config/authrpc.rs | 64 +++++++++++++++ crates/rproxy/src/server/proxy/config/rpc.rs | 64 +++++++++++++++ crates/rproxy/src/server/proxy/http/config.rs | 4 + crates/rproxy/src/server/proxy/http/proxy.rs | 79 +++++++++++++++++-- readme.md | 48 +++++++++++ 5 files changed, 251 insertions(+), 8 deletions(-) diff --git a/crates/rproxy/src/server/proxy/config/authrpc.rs b/crates/rproxy/src/server/proxy/config/authrpc.rs index f136798..90accb3 100644 --- a/crates/rproxy/src/server/proxy/config/authrpc.rs +++ b/crates/rproxy/src/server/proxy/config/authrpc.rs @@ -131,6 +131,28 @@ pub(crate) struct ConfigAuthrpc { )] pub(crate) log_sanitise: bool, + /// max size of authrpc requests + #[arg( + default_value = "16", + env = "RPROXY_AUTHRPC_MAX_REQUEST_SIZE_MB", + help_heading = "authrpc", + long("authrpc-max-request-size-mb"), + name("authrpc_max_request_size_mb"), + value_name = "megabytes" + )] + pub(crate) max_request_size_mb: usize, + + /// max size of authrpc responses + #[arg( + default_value = "256", + env = "RPROXY_AUTHRPC_MAX_RESPONSE_SIZE_MB", + help_heading = "authrpc", + long("authrpc-max-response-size-mb"), + name("authrpc_max_response_size_mb"), + value_name = "megabytes" + )] + pub(crate) max_response_size_mb: usize, + /// list of authrpc peers urls to mirror the requests to #[arg( env="RPROXY_AUTHRPC_MIRRORING_PEERS", @@ -153,6 +175,28 @@ pub(crate) struct ConfigAuthrpc { #[clap(value_enum)] pub(crate) mirroring_strategy: ConfigProxyHttpMirroringStrategy, + /// size of preallocated authrpc request buffers + #[arg( + default_value = "1", + env = "RPROXY_AUTHRPC_PREALLOCATED_RESPONSE_BUFFER_SIZE_KB", + help_heading = "authrpc", + long("authrpc-preallocated-request-buffer-size-kb"), + name("authrpc_preallocated_request_buffer_size_kb"), + value_name = "kilobytes" + )] + pub(crate) prealloacated_request_buffer_size_kb: usize, + + /// size of preallocated authrpc response buffers + #[arg( + default_value = "1", + env = "RPROXY_AUTHRPC_PREALLOCATED_RESPONSE_BUFFER_SIZE_KB", + help_heading = "authrpc", + long("authrpc-preallocated-response-buffer-size-kb"), + name("authrpc_preallocated_response_buffer_size_kb"), + value_name = "kilobytes" + )] + pub(crate) prealloacated_response_buffer_size_kb: usize, + /// remove authrpc backend from mirroring peers #[arg( env = "RPROXY_AUTHRPC_REMOVE_BACKEND_FROM_MIRRORING_PEERS", @@ -329,6 +373,16 @@ impl ConfigProxyHttp for ConfigAuthrpc { self.log_sanitise } + #[inline] + fn max_request_size(&self) -> usize { + 1024 * 1024 * self.max_request_size_mb + } + + #[inline] + fn max_response_size(&self) -> usize { + 1024 * 1024 * self.max_response_size_mb + } + #[inline] fn mirroring_peer_urls(&self) -> Vec { self.mirroring_peer_urls @@ -341,6 +395,16 @@ impl ConfigProxyHttp for ConfigAuthrpc { fn mirroring_strategy(&self) -> &ConfigProxyHttpMirroringStrategy { &self.mirroring_strategy } + + #[inline] + fn prealloacated_request_buffer_size(&self) -> usize { + 1024 * self.prealloacated_request_buffer_size_kb + } + + #[inline] + fn prealloacated_response_buffer_size(&self) -> usize { + 1024 * self.prealloacated_response_buffer_size_kb + } } // ConfigAuthrpcError -------------------------------------------------- diff --git a/crates/rproxy/src/server/proxy/config/rpc.rs b/crates/rproxy/src/server/proxy/config/rpc.rs index 618490d..2a9c2ad 100644 --- a/crates/rproxy/src/server/proxy/config/rpc.rs +++ b/crates/rproxy/src/server/proxy/config/rpc.rs @@ -130,6 +130,28 @@ pub(crate) struct ConfigRpc { )] pub(crate) log_sanitise: bool, + /// max size of rpc requests + #[arg( + default_value = "16", + env = "RPROXY_RPC_MAX_REQUEST_SIZE_MB", + help_heading = "rpc", + long("rpc-max-request-size-mb"), + name("rpc_max_request_size_mb"), + value_name = "megabytes" + )] + pub(crate) max_request_size_mb: usize, + + /// max size of rpc responses + #[arg( + default_value = "256", + env = "RPROXY_RPC_MAX_RESPONSE_SIZE_MB", + help_heading = "rpc", + long("rpc-max-response-size-mb"), + name("rpc_max_response_size_mb"), + value_name = "megabytes" + )] + pub(crate) max_response_size_mb: usize, + /// whether the requests that returned an error from rpc backend should /// be mirrored to peers #[arg( @@ -162,6 +184,28 @@ pub(crate) struct ConfigRpc { #[clap(value_enum)] pub(crate) mirroring_strategy: ConfigProxyHttpMirroringStrategy, + /// size of preallocated rpc request buffers + #[arg( + default_value = "1", + env = "RPROXY_RPC_PREALLOCATED_RESPONSE_BUFFER_SIZE_KB", + help_heading = "rpc", + long("rpc-preallocated-request-buffer-size-kb"), + name("rpc_preallocated_request_buffer_size_kb"), + value_name = "kilobytes" + )] + pub(crate) prealloacated_request_buffer_size_kb: usize, + + /// size of preallocated rpc response buffers + #[arg( + default_value = "256", + env = "RPROXY_RPC_PREALLOCATED_RESPONSE_BUFFER_SIZE_KB", + help_heading = "rpc", + long("rpc-preallocated-response-buffer-size-kb"), + name("rpc_preallocated_response_buffer_size_kb"), + value_name = "kilobytes" + )] + pub(crate) prealloacated_response_buffer_size_kb: usize, + /// remove rpc backend from peers #[arg( env = "RPROXY_RPC_REMOVE_BACKEND_FROM_MIRRORING_PEERS", @@ -335,6 +379,16 @@ impl ConfigProxyHttp for ConfigRpc { self.log_sanitise } + #[inline] + fn max_request_size(&self) -> usize { + 1024 * 1024 * self.max_request_size_mb + } + + #[inline] + fn max_response_size(&self) -> usize { + 1024 * 1024 * self.max_response_size_mb + } + #[inline] fn mirroring_peer_urls(&self) -> Vec { self.mirroring_peer_urls @@ -347,6 +401,16 @@ impl ConfigProxyHttp for ConfigRpc { fn mirroring_strategy(&self) -> &ConfigProxyHttpMirroringStrategy { &self.mirroring_strategy } + + #[inline] + fn prealloacated_request_buffer_size(&self) -> usize { + 1024 * self.prealloacated_request_buffer_size_kb + } + + #[inline] + fn prealloacated_response_buffer_size(&self) -> usize { + 1024 * self.prealloacated_response_buffer_size_kb + } } // ConfigRpcError ------------------------------------------------------ diff --git a/crates/rproxy/src/server/proxy/http/config.rs b/crates/rproxy/src/server/proxy/http/config.rs index 7a9c55f..f75201b 100644 --- a/crates/rproxy/src/server/proxy/http/config.rs +++ b/crates/rproxy/src/server/proxy/http/config.rs @@ -15,8 +15,12 @@ pub(crate) trait ConfigProxyHttp: Clone + Send + Unpin + 'static { fn log_proxied_requests(&self) -> bool; fn log_proxied_responses(&self) -> bool; fn log_sanitise(&self) -> bool; + fn max_request_size(&self) -> usize; + fn max_response_size(&self) -> usize; fn mirroring_peer_urls(&self) -> Vec; fn mirroring_strategy(&self) -> &ConfigProxyHttpMirroringStrategy; + fn prealloacated_request_buffer_size(&self) -> usize; + fn prealloacated_response_buffer_size(&self) -> usize; } // ConfigProxyHttpMirroringStrategy ------------------------------------ diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 2e3b6f0..4e059a6 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -31,7 +31,7 @@ use awc::{ ClientResponse, Connector, body::MessageBody, - error::HeaderValue, + error::{HeaderValue, PayloadError}, http::{Method, header::HeaderMap}, }; use bytes::Bytes; @@ -318,7 +318,13 @@ where let connection_id = info.connection_id; let bck_req = this.backend.new_backend_request(&info); - let bck_req_body = ProxyHttpRequestBody::new(this.clone(), info, cli_req_body, timestamp); + let bck_req_body = ProxyHttpRequestBody::new( + this.clone(), + info, + cli_req_body, + this.shared.config().prealloacated_request_buffer_size(), + timestamp, + ); let bck_res = match bck_req.send_stream(bck_req_body).await { Ok(res) => res, @@ -345,12 +351,14 @@ where let status = bck_res.status(); let mut cli_res = Self::to_client_response(&bck_res); + let preallocate = this.shared.config().prealloacated_response_buffer_size(); let bck_body = ProxyHttpResponseBody::new( this, id, status, bck_res.headers().clone(), bck_res.into_stream(), + preallocate, timestamp, ); @@ -1205,6 +1213,7 @@ where info: Option, start: UtcDateTime, body: Vec, + max_size: usize, #[pin] stream: S, @@ -1216,17 +1225,20 @@ where P: ProxyHttpInner, { fn new( - worker: web::Data>, + proxy: web::Data>, info: ProxyHttpRequestInfo, body: S, + preallocate: usize, timestamp: UtcDateTime, ) -> Self { + let max_size = proxy.shared.config().max_request_size(); Self { - proxy: worker, + proxy, info: Some(info), stream: body, start: timestamp, - body: Vec::new(), // TODO: preallocate reasonable size + body: Vec::with_capacity(preallocate), + max_size, } } } @@ -1234,7 +1246,7 @@ where impl Stream for ProxyHttpRequestBody where S: Stream>, - E: Debug, + E: From + Debug, C: ConfigProxyHttp, P: ProxyHttpInner, { @@ -1247,6 +1259,30 @@ where Poll::Pending => Poll::Pending, Poll::Ready(Some(Ok(chunk))) => { + if this.body.len() + chunk.len() > *this.max_size { + let err = format!( + "request is too large: {}+ > {}", + this.body.len() + chunk.len(), + *this.max_size + ); + if let Some(info) = mem::take(this.info) { + warn!( + proxy = P::name(), + request_id = %info.id(), + connection_id = %info.connection_id(), + error = err, + "Proxy http request stream error", + ); + } else { + warn!( + proxy = P::name(), + error = err, + request_id = "unknown", + "Proxy http request stream error", + ); + } + return Poll::Ready(Some(Err(E::from(PayloadError::Overflow)))) + } this.body.extend_from_slice(&chunk); Poll::Ready(Some(Ok(chunk))) } @@ -1301,6 +1337,7 @@ where info: Option, start: UtcDateTime, body: Vec, + max_size: usize, #[pin] stream: S, @@ -1317,13 +1354,16 @@ where status: StatusCode, headers: HeaderMap, body: S, + preallocate: usize, timestamp: UtcDateTime, ) -> Self { + let max_size = proxy.shared.config().max_response_size(); Self { proxy, stream: body, start: timestamp, - body: Vec::new(), // TODO: preallocate reasonable size + body: Vec::with_capacity(preallocate), + max_size, info: Some(ProxyHttpResponseInfo::new(id, status, headers)), } } @@ -1332,7 +1372,7 @@ where impl Stream for ProxyHttpResponseBody where S: Stream>, - E: Debug, + E: From + Debug, C: ConfigProxyHttp, P: ProxyHttpInner, { @@ -1345,6 +1385,29 @@ where Poll::Pending => Poll::Pending, Poll::Ready(Some(Ok(chunk))) => { + if this.body.len() + chunk.len() > *this.max_size { + let err = format!( + "response is too large: {}+ > {}", + this.body.len() + chunk.len(), + *this.max_size + ); + if let Some(info) = mem::take(this.info) { + warn!( + proxy = P::name(), + request_id = %info.id(), + error = err, + "Proxy http response stream error", + ); + } else { + warn!( + proxy = P::name(), + error = err, + request_id = "unknown", + "Proxy http response stream error", + ); + } + return Poll::Ready(Some(Err(E::from(PayloadError::Overflow)))) + } this.body.extend_from_slice(&chunk); Poll::Ready(Some(Ok(chunk))) } diff --git a/readme.md b/readme.md index 7e2067d..35eb004 100644 --- a/readme.md +++ b/readme.md @@ -110,6 +110,18 @@ authrpc: [env: RPROXY_AUTHRPC_LOG_SANITISE=] + --authrpc-max-request-size-mb + max size of authrpc requests + + [env: RPROXY_AUTHRPC_MAX_REQUEST_SIZE_MB=] + [default: 16] + + --authrpc-max-response-size-mb + max size of authrpc responses + + [env: RPROXY_AUTHRPC_MAX_RESPONSE_SIZE_MB=] + [default: 256] + --authrpc-mirroring-peer ... list of authrpc peers urls to mirror the requests to @@ -124,6 +136,18 @@ authrpc: [env: RPROXY_AUTHRPC_MIRRORING_STRATEGY=] [default: fan-out] + --authrpc-preallocated-request-buffer-size-kb + size of preallocated authrpc request buffers + + [env: RPROXY_AUTHRPC_PREALLOCATED_RESPONSE_BUFFER_SIZE_KB=] + [default: 1] + + --authrpc-preallocated-response-buffer-size-kb + size of preallocated authrpc response buffers + + [env: RPROXY_AUTHRPC_PREALLOCATED_RESPONSE_BUFFER_SIZE_KB=] + [default: 1] + --authrpc-remove-backend-from-mirroring-peers remove authrpc backend from mirroring peers @@ -283,6 +307,18 @@ rpc: [env: RPROXY_RPC_LOG_SANITISE=] + --rpc-max-request-size-mb + max size of rpc requests + + [env: RPROXY_RPC_MAX_REQUEST_SIZE_MB=] + [default: 16] + + --rpc-max-response-size-mb + max size of rpc responses + + [env: RPROXY_RPC_MAX_RESPONSE_SIZE_MB=] + [default: 256] + --rpc-mirror-errored-requests whether the requests that returned an error from rpc backend should be mirrored to peers @@ -303,6 +339,18 @@ rpc: [env: RPROXY_RPC_MIRRORING_STRATEGY=] [default: fan-out] + --rpc-preallocated-request-buffer-size-kb + size of preallocated rpc request buffers + + [env: RPROXY_RPC_PREALLOCATED_RESPONSE_BUFFER_SIZE_KB=] + [default: 1] + + --rpc-preallocated-response-buffer-size-kb + size of preallocated rpc response buffers + + [env: RPROXY_RPC_PREALLOCATED_RESPONSE_BUFFER_SIZE_KB=] + [default: 256] + --rpc-remove-backend-from-mirroring-peers remove rpc backend from peers From 74a2149218cd65d2dde5204f7012d79f77f06b5e Mon Sep 17 00:00:00 2001 From: Anton Bronnikov Date: Thu, 23 Oct 2025 22:44:06 +0200 Subject: [PATCH 2/3] review: rename `bck` => `bknd`, `cli` => `clnt` --- .../src/server/proxy/config/flashblocks.rs | 8 - crates/rproxy/src/server/proxy/http/proxy.rs | 138 ++++++------ crates/rproxy/src/server/proxy/ws/proxy.rs | 202 +++++++++--------- 3 files changed, 170 insertions(+), 178 deletions(-) diff --git a/crates/rproxy/src/server/proxy/config/flashblocks.rs b/crates/rproxy/src/server/proxy/config/flashblocks.rs index ea6a1e3..dece62f 100644 --- a/crates/rproxy/src/server/proxy/config/flashblocks.rs +++ b/crates/rproxy/src/server/proxy/config/flashblocks.rs @@ -63,14 +63,6 @@ pub(crate) struct ConfigFlashblocks { )] pub(crate) log_backend_messages: bool, - /// whether to log flashblocks backend messages - #[arg( - env = "RPROXY_FLASHBLOCKS_LOG_BACKEND_MESSAGES", - help_heading = "flashblocks", - long("flashblocks-log-backend-messages"), - name("flashblocks_log_backend_messages") - )] - /// whether to log flashblocks client messages #[arg( env = "RPROXY_FLASHBLOCKS_LOG_CLIENT_MESSAGES", diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 4e059a6..8ac17d2 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -274,31 +274,31 @@ where Ok(socket.into()) } - fn to_client_response(bck_res: &ClientResponse) -> HttpResponseBuilder { - let mut cli_res = HttpResponse::build(bck_res.status()); + fn to_client_response(bknd_res: &ClientResponse) -> HttpResponseBuilder { + let mut clnt_res = HttpResponse::build(bknd_res.status()); - for (name, header) in bck_res.headers().iter() { - if is_hop_by_hop_header(name) { + for (hkey, hval) in bknd_res.headers().iter() { + if is_hop_by_hop_header(hkey) { continue; } - if let Ok(hname) = header::HeaderName::from_str(name.as_str()) { - cli_res.append_header((hname, header.clone())); + if let Ok(hkey) = header::HeaderName::from_str(hkey.as_str()) { + clnt_res.append_header((hkey, hval.clone())); } } - cli_res + clnt_res } /// receive accepts client's (frontend) request and proxies it to /// backend async fn receive( - cli_req: HttpRequest, - cli_req_body: web::Payload, + clnt_req: HttpRequest, + clnt_req_body: web::Payload, this: web::Data, ) -> Result { let timestamp = UtcDateTime::now(); - if let Some(user_agent) = cli_req.headers().get(header::USER_AGENT) && + if let Some(user_agent) = clnt_req.headers().get(header::USER_AGENT) && !user_agent.is_empty() && let Ok(user_agent) = user_agent.to_str() { @@ -312,21 +312,21 @@ where .inc(); } - let info = ProxyHttpRequestInfo::new(&cli_req, cli_req.conn_data::()); + let info = ProxyHttpRequestInfo::new(&clnt_req, clnt_req.conn_data::()); let id = info.id; let connection_id = info.connection_id; - let bck_req = this.backend.new_backend_request(&info); - let bck_req_body = ProxyHttpRequestBody::new( + let bknd_req = this.backend.new_backend_request(&info); + let bknd_req_body = ProxyHttpRequestBody::new( this.clone(), info, - cli_req_body, + clnt_req_body, this.shared.config().prealloacated_request_buffer_size(), timestamp, ); - let bck_res = match bck_req.send_stream(bck_req_body).await { + let bknd_res = match bknd_req.send_stream(bknd_req_body).await { Ok(res) => res, Err(err) => { warn!( @@ -348,21 +348,21 @@ where }; let timestamp = UtcDateTime::now(); - let status = bck_res.status(); - let mut cli_res = Self::to_client_response(&bck_res); + let status = bknd_res.status(); + let mut clnt_res = Self::to_client_response(&bknd_res); let preallocate = this.shared.config().prealloacated_response_buffer_size(); - let bck_body = ProxyHttpResponseBody::new( + let bknd_res_body = ProxyHttpResponseBody::new( this, id, status, - bck_res.headers().clone(), - bck_res.into_stream(), + bknd_res.headers().clone(), + bknd_res.into_stream(), preallocate, timestamp, ); - Ok(cli_res.streaming(bck_body)) + Ok(clnt_res.streaming(bknd_res_body)) } fn postprocess_client_request(&self, req: ProxiedHttpRequest) { @@ -380,11 +380,11 @@ where }; } - fn postprocess_backend_response(&self, bck_res: ProxiedHttpResponse) { - let Some((_, cli_req)) = self.requests.remove_sync(&bck_res.info.id) else { + fn postprocess_backend_response(&self, bknd_res: ProxiedHttpResponse) { + let Some((_, clnt_req)) = self.requests.remove_sync(&bknd_res.info.id) else { error!( proxy = P::name(), - request_id = %bck_res.info.id, + request_id = %bknd_res.info.id, worker_id = %self.id, "Proxied http response for unmatching request", ); @@ -393,31 +393,31 @@ where // hand over to postprocessor asynchronously so that we can return the // response to the client as early as possible - self.postprocessor.do_send(ProxiedHttpCombo { req: cli_req, res: bck_res }); + self.postprocessor.do_send(ProxiedHttpCombo { req: clnt_req, res: bknd_res }); } fn finalise_proxying( - mut cli_req: ProxiedHttpRequest, - mut bck_res: ProxiedHttpResponse, + mut clnt_req: ProxiedHttpRequest, + mut bknd_res: ProxiedHttpResponse, inner: Arc

, worker_id: Uuid, metrics: Arc, mirroring_peers: Arc>>>, mut mirroring_peer_round_robin_index: usize, ) { - if cli_req.decompressed_size < cli_req.size { - (cli_req.decompressed_body, cli_req.decompressed_size) = - decompress(cli_req.body.clone(), cli_req.size, cli_req.info.content_encoding()); + if clnt_req.decompressed_size < clnt_req.size { + (clnt_req.decompressed_body, clnt_req.decompressed_size) = + decompress(clnt_req.body.clone(), clnt_req.size, clnt_req.info.content_encoding()); } - if bck_res.decompressed_size < bck_res.size { - (bck_res.decompressed_body, bck_res.decompressed_size) = - decompress(bck_res.body.clone(), bck_res.size, bck_res.info.content_encoding()); + if bknd_res.decompressed_size < bknd_res.size { + (bknd_res.decompressed_body, bknd_res.decompressed_size) = + decompress(bknd_res.body.clone(), bknd_res.size, bknd_res.info.content_encoding()); } - match serde_json::from_slice::(&cli_req.decompressed_body) { + match serde_json::from_slice::(&clnt_req.decompressed_body) { Ok(jrpc) => { - if inner.should_mirror(&jrpc, &cli_req, &bck_res) { + if inner.should_mirror(&jrpc, &clnt_req, &bknd_res) { let mirrors_count = match inner.config().mirroring_strategy() { ConfigProxyHttpMirroringStrategy::FanOut => mirroring_peers.len(), ConfigProxyHttpMirroringStrategy::RoundRobin => 1, @@ -431,7 +431,7 @@ where mirroring_peer_round_robin_index = 0; } - let mut req = cli_req.clone(); + let mut req = clnt_req.clone(); req.info.jrpc_method_enriched = jrpc.method_enriched(); mirroring_peer.do_send(req.clone()); } @@ -439,20 +439,20 @@ where Self::maybe_log_proxied_request_and_response( &jrpc, - &cli_req, - &bck_res, + &clnt_req, + &bknd_res, inner.clone(), worker_id, ); - Self::emit_metrics_on_proxy_success(&jrpc, &cli_req, &bck_res, metrics.clone()); + Self::emit_metrics_on_proxy_success(&jrpc, &clnt_req, &bknd_res, metrics.clone()); } Err(err) => { warn!( proxy = P::name(), - request_id = %cli_req.info.id, - connection_id = %cli_req.info.connection_id, + request_id = %clnt_req.info.id, + connection_id = %clnt_req.info.connection_id, worker_id = %worker_id, error = ?err, "Failed to parse json-rpc request", @@ -462,29 +462,29 @@ where } fn postprocess_mirrored_response( - mut cli_req: ProxiedHttpRequest, - mut mrr_res: ProxiedHttpResponse, + mut clnt_req: ProxiedHttpRequest, + mut mirr_res: ProxiedHttpResponse, inner: Arc

, metrics: Arc, worker_id: Uuid, ) { - if cli_req.decompressed_size < cli_req.size { - (cli_req.decompressed_body, cli_req.decompressed_size) = - decompress(cli_req.body.clone(), cli_req.size, cli_req.info.content_encoding()); + if clnt_req.decompressed_size < clnt_req.size { + (clnt_req.decompressed_body, clnt_req.decompressed_size) = + decompress(clnt_req.body.clone(), clnt_req.size, clnt_req.info.content_encoding()); } - if mrr_res.decompressed_size < mrr_res.size { - (mrr_res.decompressed_body, mrr_res.decompressed_size) = - decompress(mrr_res.body.clone(), mrr_res.size, mrr_res.info.content_encoding()); + if mirr_res.decompressed_size < mirr_res.size { + (mirr_res.decompressed_body, mirr_res.decompressed_size) = + decompress(mirr_res.body.clone(), mirr_res.size, mirr_res.info.content_encoding()); } - Self::maybe_log_mirrored_request(&cli_req, &mrr_res, worker_id, inner.config()); + Self::maybe_log_mirrored_request(&clnt_req, &mirr_res, worker_id, inner.config()); metrics .http_mirror_success_count .get_or_create(&LabelsProxyHttpJrpc { proxy: P::name(), - jrpc_method: cli_req.info.jrpc_method_enriched, + jrpc_method: clnt_req.info.jrpc_method_enriched, }) .inc(); } @@ -978,36 +978,36 @@ where { type Result = (); - fn handle(&mut self, cli_req: ProxiedHttpRequest, ctx: &mut Self::Context) -> Self::Result { + fn handle(&mut self, clnt_req: ProxiedHttpRequest, ctx: &mut Self::Context) -> Self::Result { let start = UtcDateTime::now(); let inner = self.inner.clone(); let worker_id = self.worker_id; let metrics = self.metrics.clone(); - let mrr_req = self.new_backend_request(&cli_req.info); - let mrr_req_body = cli_req.body.clone(); + let mirr_req = self.new_backend_request(&clnt_req.info); + let mirr_req_body = clnt_req.body.clone(); ctx.spawn( async move { - match mrr_req.send_body(mrr_req_body).await { - Ok(mut bck_res) => { + match mirr_req.send_body(mirr_req_body).await { + Ok(mut bknd_res) => { let end = UtcDateTime::now(); - match bck_res.body().await { - Ok(mrr_res_body) => { - let size = match mrr_res_body.size() { + match bknd_res.body().await { + Ok(mirr_res_body) => { + let size = match mirr_res_body.size() { BodySize::Sized(size) => size, // Body is always sized BodySize::None | BodySize::Stream => 0, }; let info = ProxyHttpResponseInfo::new( - cli_req.info.id, - bck_res.status(), - bck_res.headers().clone(), + clnt_req.info.id, + bknd_res.status(), + bknd_res.headers().clone(), ); - let mrr_res = ProxiedHttpResponse { + let mirr_res = ProxiedHttpResponse { info, - body: mrr_res_body, + body: mirr_res_body, size: size as usize, decompressed_body: Bytes::new(), decompressed_size: 0, @@ -1015,14 +1015,14 @@ where end, }; ProxyHttp::::postprocess_mirrored_response( - cli_req, mrr_res, inner, metrics, worker_id, + clnt_req, mirr_res, inner, metrics, worker_id, ); } Err(err) => { warn!( proxy = P::name(), - request_id = %cli_req.info.id, - connection_id = %cli_req.info.connection_id, + request_id = %clnt_req.info.id, + connection_id = %clnt_req.info.connection_id, error = ?err, "Failed to mirror a request", ); @@ -1037,8 +1037,8 @@ where Err(err) => { warn!( proxy = P::name(), - request_id = %cli_req.info.id, - connection_id = %cli_req.info.connection_id, + request_id = %clnt_req.info.id, + connection_id = %clnt_req.info.connection_id, error = ?err, "Failed to mirror a request", ); diff --git a/crates/rproxy/src/server/proxy/ws/proxy.rs b/crates/rproxy/src/server/proxy/ws/proxy.rs index 2dd10c4..87cd51d 100644 --- a/crates/rproxy/src/server/proxy/ws/proxy.rs +++ b/crates/rproxy/src/server/proxy/ws/proxy.rs @@ -51,13 +51,13 @@ use crate::{ const WS_PING_INTERVAL_SECONDS: u64 = 1; -const WS_CLI_ERROR: &str = "client error"; -const WS_BCK_ERROR: &str = "backend error"; -const WS_BCK_TIMEOUT: &str = "backend error"; +const WS_CLNT_ERROR: &str = "client error"; +const WS_BKND_ERROR: &str = "backend error"; +const WS_BKND_TIMEOUT: &str = "backend error"; const WS_CLOSE_OK: &str = ""; -const WS_LABEL_BACKEND: &str = "backend"; -const WS_LABEL_CLIENT: &str = "client"; +const WS_LABEL_BKND: &str = "backend"; +const WS_LABEL_CLNT: &str = "client"; // ProxyWs ------------------------------------------------------------- @@ -76,8 +76,8 @@ where backend: ProxyWsBackendEndpoint, pings: HashMap, - ping_balance_cli: AtomicI64, - ping_balance_bck: AtomicI64, + ping_balance_clnt: AtomicI64, + ping_balance_bknd: AtomicI64, _config: PhantomData, _proxy: PhantomData

, @@ -115,8 +115,8 @@ where resetter, backend, pings: HashMap::new(), - ping_balance_bck: AtomicI64::new(0), - ping_balance_cli: AtomicI64::new(0), + ping_balance_bknd: AtomicI64::new(0), + ping_balance_clnt: AtomicI64::new(0), _config: PhantomData, _proxy: PhantomData, } @@ -243,13 +243,13 @@ where #[expect(clippy::unused_async, reason = "required by the actix framework")] async fn receive( - cli_req: HttpRequest, - cli_req_body: web::Payload, + clnt_req: HttpRequest, + clnt_req_body: web::Payload, this: web::Data, ) -> Result { - let info = ProxyHttpRequestInfo::new(&cli_req, cli_req.conn_data::()); + let info = ProxyHttpRequestInfo::new(&clnt_req, clnt_req.conn_data::()); - let (res, cli_tx, cli_rx) = match actix_ws::handle(&cli_req, cli_req_body) { + let (res, clnt_tx, clnt_rx) = match actix_ws::handle(&clnt_req, clnt_req_body) { Ok(res) => res, Err(err) => { error!( @@ -264,30 +264,30 @@ where } }; - actix_web::rt::spawn(Self::handshake(this, cli_tx, cli_rx, info)); + actix_web::rt::spawn(Self::handshake(this, clnt_tx, clnt_rx, info)); Ok(res) } async fn handshake( this: web::Data, - cli_tx: Session, - cli_rx: MessageStream, + clnt_tx: Session, + clnt_rx: MessageStream, info: ProxyHttpRequestInfo, ) { - let bck_uri = this.backend.new_backend_uri(&info); + let bknd_uri = this.backend.new_backend_uri(&info); trace!( proxy = P::name(), request_id = %info.id(), connection_id = %info.connection_id(), worker_id = %this.id, - backend_uri = %bck_uri, + backend_uri = %bknd_uri, "Starting websocket handshake...", ); - let (bck_stream, _) = match tokio::time::timeout( + let (bknd_stream, _) = match tokio::time::timeout( this.config().backend_timeout(), - tokio_tungstenite::connect_async(bck_uri), + tokio_tungstenite::connect_async(bknd_uri), ) .await { @@ -303,10 +303,10 @@ where "Failed to establish backend websocket session" ); - if let Err(err) = cli_tx + if let Err(err) = clnt_tx .close(Some(actix_ws::CloseReason { code: awc::ws::CloseCode::Error, - description: Some(String::from(WS_BCK_ERROR)), + description: Some(String::from(WS_BKND_ERROR)), })) .await { @@ -331,10 +331,10 @@ where "Timed out to establish backend websocket session" ); - if let Err(err) = cli_tx + if let Err(err) = clnt_tx .close(Some(actix_ws::CloseReason { code: awc::ws::CloseCode::Again, - description: Some(String::from(WS_BCK_TIMEOUT)), + description: Some(String::from(WS_BKND_TIMEOUT)), })) .await { @@ -351,18 +351,18 @@ where } }; - let (bck_tx, bck_rx) = bck_stream.split(); + let (bknd_tx, bknd_rx) = bknd_stream.split(); - Self::pump(this, info, cli_tx, cli_rx, bck_tx, bck_rx).await; + Self::pump(this, info, clnt_tx, clnt_rx, bknd_tx, bknd_rx).await; } async fn pump( this: web::Data, info: ProxyHttpRequestInfo, - mut cli_tx: Session, - mut cli_rx: MessageStream, - mut bck_tx: SplitSink>, tungstenite::Message>, - mut bck_rx: SplitStream>>, + mut clnt_tx: Session, + mut clnt_rx: MessageStream, + mut bknd_tx: SplitSink>, tungstenite::Message>, + mut bknd_rx: SplitStream>>, ) { info!( proxy = P::name(), @@ -391,30 +391,30 @@ where // ping both sides _ = heartbeat.tick() => { - pumping = Self::heartbeat(&this, info.clone(), &mut cli_tx, &mut bck_tx).await; + pumping = Self::heartbeat(&this, info.clone(), &mut clnt_tx, &mut bknd_tx).await; } // client => backend - cli_msg = cli_rx.next() => { - pumping = Self::pump_cli_to_bck( + clnt_msg = clnt_rx.next() => { + pumping = Self::pump_clnt_to_bknd( &this, info.clone(), UtcDateTime::now(), - cli_msg, - &mut bck_tx, - &mut cli_tx + clnt_msg, + &mut bknd_tx, + &mut clnt_tx ).await; } // backend => client - bck_msg = bck_rx.next() => { - pumping = Self::pump_bck_to_cli( + bknd_msg = bknd_rx.next() => { + pumping = Self::pump_bknd_to_cli( &this, info.clone(), UtcDateTime::now(), - bck_msg, - &mut cli_tx, - &mut bck_tx + bknd_msg, + &mut clnt_tx, + &mut bknd_tx ).await; } } @@ -430,7 +430,7 @@ where msg = %msg, "Closing client websocket session..." ); - if let Err(err) = cli_tx + if let Err(err) = clnt_tx .close(Some(actix_ws::CloseReason { code: awc::ws::CloseCode::Error, description: Some(String::from(msg)), @@ -454,7 +454,7 @@ where msg = %msg, "Closing backend websocket session..." ); - if let Err(err) = bck_tx + if let Err(err) = bknd_tx .send(tungstenite::Message::Close(Some(tungstenite::protocol::CloseFrame { code: tungstenite::protocol::frame::coding::CloseCode::Error, reason: msg.into(), @@ -477,7 +477,7 @@ where worker_id = %this.id, "Closing client websocket session..." ); - if let Err(err) = cli_tx + if let Err(err) = clnt_tx .close(Some(actix_ws::CloseReason { code: awc::ws::CloseCode::Normal, description: None, @@ -499,7 +499,7 @@ where worker_id = %this.id, "Closing backend websocket session..." ); - if let Err(err) = bck_tx + if let Err(err) = bknd_tx .send(tungstenite::Message::Close(Some(tungstenite::protocol::CloseFrame { code: tungstenite::protocol::frame::coding::CloseCode::Normal, reason: Utf8Bytes::default(), @@ -527,8 +527,8 @@ where async fn heartbeat( this: &web::Data, info: Arc, - cli_tx: &mut Session, - bck_tx: &mut SplitSink>, tungstenite::Message>, + clnt_tx: &mut Session, + bknd_tx: &mut SplitSink>, tungstenite::Message>, ) -> Result<(), &'static str> { let ping_threshold = (1 + this.config().backend_timeout().as_secs() / WS_PING_INTERVAL_SECONDS) as i64; @@ -536,18 +536,18 @@ where { // ping -> client - if this.ping_balance_cli.load(Ordering::Relaxed) > ping_threshold { + if this.ping_balance_clnt.load(Ordering::Relaxed) > ping_threshold { error!( proxy = P::name(), connection_id = %info.connection_id(), worker_id = %this.id, "More than {} websocket pings sent to client didn't return, terminating the pump...", ping_threshold, ); - return Err(WS_CLI_ERROR); + return Err(WS_CLNT_ERROR); } - let cli_ping = ProxyWsPing::new(info.connection_id()); - if let Err(err) = cli_tx.ping(&cli_ping.to_slice()).await { + let clnt_ping = ProxyWsPing::new(info.connection_id()); + if let Err(err) = clnt_tx.ping(&clnt_ping.to_slice()).await { error!( proxy = P::name(), connection_id = %info.connection_id(), @@ -555,27 +555,27 @@ where error = ?err, "Failed to send ping websocket message to client" ); - return Err(WS_CLI_ERROR); + return Err(WS_CLNT_ERROR); } - let _ = this.pings.insert_sync(cli_ping.id, cli_ping); - this.ping_balance_cli.inc(); + let _ = this.pings.insert_sync(clnt_ping.id, clnt_ping); + this.ping_balance_clnt.inc(); } { // ping -> backend - if this.ping_balance_bck.load(Ordering::Relaxed) > ping_threshold { + if this.ping_balance_bknd.load(Ordering::Relaxed) > ping_threshold { error!( proxy = P::name(), connection_id = %info.connection_id(), worker_id = %this.id, "More than {} websocket pings sent to backend didn't return, terminating the pump...", ping_threshold, ); - return Err(WS_BCK_ERROR); + return Err(WS_BKND_ERROR); } - let bck_ping = ProxyWsPing::new(info.connection_id()); - if let Err(err) = bck_tx.send(tungstenite::Message::Ping(bck_ping.to_bytes())).await { + let bknd_ping = ProxyWsPing::new(info.connection_id()); + if let Err(err) = bknd_tx.send(tungstenite::Message::Ping(bknd_ping.to_bytes())).await { error!( proxy = P::name(), connection_id = %info.connection_id(), @@ -583,29 +583,29 @@ where error = ?err, "Failed to send ping websocket message to backend" ); - return Err(WS_BCK_ERROR); + return Err(WS_BKND_ERROR); } - let _ = this.pings.insert_sync(bck_ping.id, bck_ping); - this.ping_balance_bck.inc(); + let _ = this.pings.insert_sync(bknd_ping.id, bknd_ping); + this.ping_balance_bknd.inc(); } Ok(()) } - async fn pump_cli_to_bck( + async fn pump_clnt_to_bknd( this: &web::Data, info: Arc, timestamp: UtcDateTime, - cli_msg: Option>, - bck_tx: &mut SplitSink>, tungstenite::Message>, - cli_tx: &mut Session, + clnt_msg: Option>, + bknd_tx: &mut SplitSink>, tungstenite::Message>, + clnt_tx: &mut Session, ) -> Result<(), &'static str> { - match cli_msg { + match clnt_msg { Some(Ok(msg)) => { match msg { // binary actix_ws::Message::Binary(bytes) => { if let Err(err) = - bck_tx.send(tungstenite::Message::Binary(bytes.clone())).await + bknd_tx.send(tungstenite::Message::Binary(bytes.clone())).await { error!( proxy = P::name(), @@ -619,10 +619,10 @@ where .ws_proxy_failure_count .get_or_create(&LabelsProxyWs { proxy: P::name(), - destination: WS_LABEL_BACKEND, + destination: WS_LABEL_BKND, }) .inc(); - return Err(WS_BCK_ERROR); + return Err(WS_BKND_ERROR); } this.postprocessor.do_send(ProxyWsMessage::ClientToBackendBinary { msg: bytes, @@ -635,7 +635,7 @@ where // text actix_ws::Message::Text(text) => { - if let Err(err) = bck_tx + if let Err(err) = bknd_tx .send(tungstenite::Message::Text(unsafe { // safety: it's from client's ws message => must be valid utf-8 tungstenite::protocol::frame::Utf8Bytes::from_bytes_unchecked( @@ -656,10 +656,10 @@ where .ws_proxy_failure_count .get_or_create(&LabelsProxyWs { proxy: P::name(), - destination: WS_LABEL_BACKEND, + destination: WS_LABEL_BKND, }) .inc(); - return Err(WS_BCK_ERROR); + return Err(WS_BKND_ERROR); } this.postprocessor.do_send(ProxyWsMessage::ClientToBackendText { msg: text, @@ -672,7 +672,7 @@ where // ping actix_ws::Message::Ping(bytes) => { - if let Err(err) = cli_tx.pong(&bytes).await { + if let Err(err) = clnt_tx.pong(&bytes).await { error!( proxy = P::name(), connection_id = %info.connection_id(), @@ -680,7 +680,7 @@ where error = ?err, "Failed to return pong message to client" ); - return Err(WS_CLI_ERROR); + return Err(WS_CLNT_ERROR); } Ok(()) } @@ -691,13 +691,13 @@ where let Some((_, ping)) = this.pings.remove_sync(&pong.id) && pong == ping { - this.ping_balance_cli.dec(); + this.ping_balance_clnt.dec(); this.shared .metrics .ws_latency_client .get_or_create(&LabelsProxyWs { proxy: P::name(), - destination: WS_LABEL_BACKEND, + destination: WS_LABEL_BKND, }) .record( (1000000.0 * (timestamp - pong.timestamp).as_seconds_f64() / @@ -716,7 +716,7 @@ where // close actix_ws::Message::Close(reason) => { - if let Err(err) = bck_tx + if let Err(err) = bknd_tx .send(tungstenite::Message::Close(reason.map(|r| { tungstenite::protocol::CloseFrame { code: tungstenite::protocol::frame::coding::CloseCode::from( @@ -734,7 +734,7 @@ where error = ?err, "Failed to proxy close websocket message to backend" ); - return Err(WS_BCK_ERROR); + return Err(WS_BKND_ERROR); } Err(WS_CLOSE_OK) } @@ -751,7 +751,7 @@ where error = ?err, "Client websocket stream error" ); - Err(WS_CLI_ERROR) + Err(WS_CLNT_ERROR) } None => { @@ -766,20 +766,20 @@ where } } - async fn pump_bck_to_cli( + async fn pump_bknd_to_cli( this: &web::Data, info: Arc, timestamp: UtcDateTime, - bck_msg: Option>, - cli_tx: &mut Session, - bck_tx: &mut SplitSink>, tungstenite::Message>, + bknd_msg: Option>, + clnt_tx: &mut Session, + bknd_tx: &mut SplitSink>, tungstenite::Message>, ) -> Result<(), &'static str> { - match bck_msg { + match bknd_msg { Some(Ok(msg)) => { match msg { // binary tungstenite::Message::Binary(bytes) => { - if let Err(err) = cli_tx.binary(bytes.clone()).await { + if let Err(err) = clnt_tx.binary(bytes.clone()).await { error!( proxy = P::name(), connection_id = %info.connection_id(), @@ -792,10 +792,10 @@ where .ws_proxy_failure_count .get_or_create(&LabelsProxyWs { proxy: P::name(), - destination: WS_LABEL_CLIENT, + destination: WS_LABEL_CLNT, }) .inc(); - return Err(WS_CLI_ERROR); + return Err(WS_CLNT_ERROR); } this.postprocessor.do_send(ProxyWsMessage::BackendToClientBinary { msg: bytes, @@ -808,7 +808,7 @@ where // text tungstenite::Message::Text(text) => { - if let Err(err) = cli_tx.text(text.clone().as_str()).await { + if let Err(err) = clnt_tx.text(text.clone().as_str()).await { error!( proxy = P::name(), connection_id = %info.connection_id(), @@ -821,10 +821,10 @@ where .ws_proxy_failure_count .get_or_create(&LabelsProxyWs { proxy: P::name(), - destination: WS_LABEL_CLIENT, + destination: WS_LABEL_CLNT, }) .inc(); - return Err(WS_CLI_ERROR); + return Err(WS_CLNT_ERROR); } this.postprocessor.do_send(ProxyWsMessage::BackendToClientText { msg: text, @@ -837,7 +837,7 @@ where // ping tungstenite::Message::Ping(bytes) => { - if let Err(err) = bck_tx.send(tungstenite::Message::Pong(bytes)).await { + if let Err(err) = bknd_tx.send(tungstenite::Message::Pong(bytes)).await { error!( proxy = P::name(), connection_id = %info.connection_id(), @@ -845,7 +845,7 @@ where error = ?err, "Failed to return pong message to backend" ); - return Err(WS_BCK_ERROR); + return Err(WS_BKND_ERROR); } Ok(()) } @@ -856,13 +856,13 @@ where let Some((_, ping)) = this.pings.remove_sync(&pong.id) && pong == ping { - this.ping_balance_bck.dec(); + this.ping_balance_bknd.dec(); this.shared .metrics .ws_latency_backend .get_or_create(&LabelsProxyWs { proxy: P::name(), - destination: WS_LABEL_BACKEND, + destination: WS_LABEL_BKND, }) .record( (1000000.0 * (timestamp - pong.timestamp).as_seconds_f64() / @@ -881,7 +881,7 @@ where // close tungstenite::Message::Close(reason) => { - if let Err(err) = cli_tx + if let Err(err) = clnt_tx .clone() // .close() consumes it .close(reason.map(|reason| actix_ws::CloseReason { code: u16::from(reason.code).into(), @@ -896,7 +896,7 @@ where error = ?err, "Failed to proxy close websocket message to client" ); - return Err(WS_CLI_ERROR); + return Err(WS_CLNT_ERROR); } Err(WS_CLOSE_OK) } @@ -913,7 +913,7 @@ where error = ?err, "Backend websocket stream error" ); - Err(WS_BCK_ERROR) + Err(WS_BKND_ERROR) } None => { @@ -1054,7 +1054,7 @@ where fn emit_metrics_on_proxy_success(msg: &ProxyWsMessage, metrics: Arc) { match msg { ProxyWsMessage::BackendToClientBinary { msg, info: _, start, end } => { - let labels = LabelsProxyWs { proxy: P::name(), destination: WS_LABEL_CLIENT }; + let labels = LabelsProxyWs { proxy: P::name(), destination: WS_LABEL_CLNT }; metrics .ws_latency_proxy .get_or_create(&labels) @@ -1064,7 +1064,7 @@ where } ProxyWsMessage::BackendToClientText { msg, info: _, start, end } => { - let labels = LabelsProxyWs { proxy: P::name(), destination: WS_LABEL_CLIENT }; + let labels = LabelsProxyWs { proxy: P::name(), destination: WS_LABEL_CLNT }; metrics .ws_latency_proxy .get_or_create(&labels) @@ -1074,7 +1074,7 @@ where } ProxyWsMessage::ClientToBackendBinary { msg, info: _, start, end } => { - let labels = LabelsProxyWs { proxy: P::name(), destination: WS_LABEL_BACKEND }; + let labels = LabelsProxyWs { proxy: P::name(), destination: WS_LABEL_BKND }; metrics .ws_latency_proxy .get_or_create(&labels) @@ -1084,7 +1084,7 @@ where } ProxyWsMessage::ClientToBackendText { msg, info: _, start, end } => { - let labels = LabelsProxyWs { proxy: P::name(), destination: WS_LABEL_BACKEND }; + let labels = LabelsProxyWs { proxy: P::name(), destination: WS_LABEL_BKND }; metrics .ws_latency_proxy .get_or_create(&labels) From 4d02469d4c1ebe1b33530e2196906724a610a52e Mon Sep 17 00:00:00 2001 From: Anton Bronnikov Date: Thu, 23 Oct 2025 23:11:48 +0200 Subject: [PATCH 3/3] review: put connection id into logs where it's missing --- crates/rproxy/src/server/proxy/http/proxy.rs | 95 +++++++++++-------- crates/rproxy/src/server/proxy/ws/proxy.rs | 98 ++++++++++---------- 2 files changed, 105 insertions(+), 88 deletions(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 8ac17d2..b7daa14 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -314,8 +314,8 @@ where let info = ProxyHttpRequestInfo::new(&clnt_req, clnt_req.conn_data::()); - let id = info.id; - let connection_id = info.connection_id; + let req_id = info.req_id; + let conn_id = info.conn_id; let bknd_req = this.backend.new_backend_request(&info); let bknd_req_body = ProxyHttpRequestBody::new( @@ -331,8 +331,8 @@ where Err(err) => { warn!( proxy = P::name(), - request_id = %id, - connection_id = %connection_id, + request_id = %req_id, + connection_id = %conn_id, worker_id = %this.id, backend_url = %this.backend.url, error = ?err, @@ -354,7 +354,8 @@ where let preallocate = this.shared.config().prealloacated_response_buffer_size(); let bknd_res_body = ProxyHttpResponseBody::new( this, - id, + req_id, + conn_id, status, bknd_res.headers().clone(), bknd_res.into_stream(), @@ -366,14 +367,14 @@ where } fn postprocess_client_request(&self, req: ProxiedHttpRequest) { - let id = req.info.id; - let connection_id = req.info.connection_id; + let id = req.info.req_id; + let conn_id = req.info.conn_id; if self.requests.insert_sync(id, req).is_err() { error!( proxy = P::name(), request_id = %id, - connection_id = %connection_id, + connection_id = %conn_id, worker_id = %self.id, "Duplicate request id", ); @@ -381,10 +382,11 @@ where } fn postprocess_backend_response(&self, bknd_res: ProxiedHttpResponse) { - let Some((_, clnt_req)) = self.requests.remove_sync(&bknd_res.info.id) else { + let Some((_, clnt_req)) = self.requests.remove_sync(&bknd_res.info.req_id) else { error!( proxy = P::name(), - request_id = %bknd_res.info.id, + request_id = %bknd_res.info.req_id, + connection_id = %bknd_res.info.conn_id, worker_id = %self.id, "Proxied http response for unmatching request", ); @@ -451,8 +453,8 @@ where Err(err) => { warn!( proxy = P::name(), - request_id = %clnt_req.info.id, - connection_id = %clnt_req.info.connection_id, + request_id = %clnt_req.info.req_id, + connection_id = %clnt_req.info.conn_id, worker_id = %worker_id, error = ?err, "Failed to parse json-rpc request", @@ -518,8 +520,8 @@ where info!( proxy = P::name(), - request_id = %req.info.id, - connection_id = %req.info.connection_id, + request_id = %req.info.req_id, + connection_id = %req.info.conn_id, worker_id = %worker_id, jrpc_method = %jrpc.method_enriched(), http_status = res.status(), @@ -559,8 +561,8 @@ where info!( proxy = P::name(), - request_id = %req.info.id, - connection_id = %req.info.connection_id, + request_id = %req.info.req_id, + connection_id = %req.info.conn_id, worker_id = %worker_id, jrpc_method = %req.info.jrpc_method_enriched, http_status = res.status(), @@ -1001,7 +1003,8 @@ where BodySize::None | BodySize::Stream => 0, }; let info = ProxyHttpResponseInfo::new( - clnt_req.info.id, + clnt_req.info.req_id, + clnt_req.info.conn_id, bknd_res.status(), bknd_res.headers().clone(), ); @@ -1021,8 +1024,8 @@ where Err(err) => { warn!( proxy = P::name(), - request_id = %clnt_req.info.id, - connection_id = %clnt_req.info.connection_id, + request_id = %clnt_req.info.req_id, + connection_id = %clnt_req.info.conn_id, error = ?err, "Failed to mirror a request", ); @@ -1037,8 +1040,8 @@ where Err(err) => { warn!( proxy = P::name(), - request_id = %clnt_req.info.id, - connection_id = %clnt_req.info.connection_id, + request_id = %clnt_req.info.req_id, + connection_id = %clnt_req.info.conn_id, error = ?err, "Failed to mirror a request", ); @@ -1058,8 +1061,8 @@ where #[derive(Clone)] pub(crate) struct ProxyHttpRequestInfo { - id: Uuid, - connection_id: Uuid, + req_id: Uuid, + conn_id: Uuid, remote_addr: Option, method: Method, path: String, @@ -1131,8 +1134,8 @@ impl ProxyHttpRequestInfo { }; Self { - id: Uuid::now_v7(), - connection_id: Uuid::now_v7(), + req_id: Uuid::now_v7(), + conn_id: Uuid::now_v7(), remote_addr, method: req.method().clone(), path, @@ -1144,12 +1147,12 @@ impl ProxyHttpRequestInfo { #[inline] pub(crate) fn id(&self) -> Uuid { - self.id + self.req_id } #[inline] - pub(crate) fn connection_id(&self) -> Uuid { - self.connection_id + pub(crate) fn conn_id(&self) -> Uuid { + self.conn_id } #[inline] @@ -1176,19 +1179,25 @@ impl ProxyHttpRequestInfo { #[derive(Clone)] pub(crate) struct ProxyHttpResponseInfo { - id: Uuid, + req_id: Uuid, + conn_id: Uuid, status: StatusCode, headers: HeaderMap, // TODO: perhaps we don't need all headers, just select ones } impl ProxyHttpResponseInfo { - pub(crate) fn new(id: Uuid, status: StatusCode, headers: HeaderMap) -> Self { - Self { id, status, headers } + pub(crate) fn new(req_id: Uuid, conn_id: Uuid, status: StatusCode, headers: HeaderMap) -> Self { + Self { req_id, conn_id, status, headers } } #[inline] - pub(crate) fn id(&self) -> Uuid { - self.id + pub(crate) fn req_id(&self) -> Uuid { + self.req_id + } + + #[inline] + pub(crate) fn conn_id(&self) -> Uuid { + self.conn_id } fn content_encoding(&self) -> String { @@ -1269,7 +1278,7 @@ where warn!( proxy = P::name(), request_id = %info.id(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), error = err, "Proxy http request stream error", ); @@ -1278,6 +1287,7 @@ where proxy = P::name(), error = err, request_id = "unknown", + connection_id = "unknown", "Proxy http request stream error", ); } @@ -1292,7 +1302,7 @@ where warn!( proxy = P::name(), request_id = %info.id(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), error = ?err, "Proxy http request stream error", ); @@ -1301,6 +1311,7 @@ where proxy = P::name(), error = ?err, request_id = "unknown", + connection_id = "unknown", "Proxy http request stream error", ); } @@ -1348,9 +1359,11 @@ where C: ConfigProxyHttp, P: ProxyHttpInner, { + #[allow(clippy::too_many_arguments)] fn new( proxy: web::Data>, - id: Uuid, + req_id: Uuid, + conn_id: Uuid, status: StatusCode, headers: HeaderMap, body: S, @@ -1364,7 +1377,7 @@ where start: timestamp, body: Vec::with_capacity(preallocate), max_size, - info: Some(ProxyHttpResponseInfo::new(id, status, headers)), + info: Some(ProxyHttpResponseInfo::new(req_id, conn_id, status, headers)), } } } @@ -1394,7 +1407,8 @@ where if let Some(info) = mem::take(this.info) { warn!( proxy = P::name(), - request_id = %info.id(), + request_id = %info.req_id(), + connection_id = %info.conn_id(), error = err, "Proxy http response stream error", ); @@ -1403,6 +1417,7 @@ where proxy = P::name(), error = err, request_id = "unknown", + connection_id = "unknown", "Proxy http response stream error", ); } @@ -1416,7 +1431,8 @@ where if let Some(info) = mem::take(this.info) { warn!( proxy = P::name(), - request_id = %info.id(), + request_id = %info.req_id(), + connection_id = %info.conn_id(), error = ?err, "Proxy http response stream error", ); @@ -1425,6 +1441,7 @@ where proxy = P::name(), error = ?err, request_id = "unknown", + connection_id = "unknown", "Proxy http response stream error", ); } diff --git a/crates/rproxy/src/server/proxy/ws/proxy.rs b/crates/rproxy/src/server/proxy/ws/proxy.rs index 87cd51d..1bc8085 100644 --- a/crates/rproxy/src/server/proxy/ws/proxy.rs +++ b/crates/rproxy/src/server/proxy/ws/proxy.rs @@ -255,7 +255,7 @@ where error!( proxy = P::name(), request_id = %info.id(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, error = ?err, "Failed to upgrade to websocket", @@ -279,7 +279,7 @@ where trace!( proxy = P::name(), request_id = %info.id(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, backend_uri = %bknd_uri, "Starting websocket handshake...", @@ -297,7 +297,7 @@ where error!( proxy = P::name(), request_id = %info.id(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, error = ?err, "Failed to establish backend websocket session" @@ -313,7 +313,7 @@ where error!( proxy = P::name(), request_id = %info.id(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, error = ?err, "Failed to close client websocket session" @@ -326,7 +326,7 @@ where error!( proxy = P::name(), request_id = %info.id(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, "Timed out to establish backend websocket session" ); @@ -341,7 +341,7 @@ where error!( proxy = P::name(), request_id = %info.id(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, error = ?err, "Failed to close client websocket session" @@ -366,7 +366,7 @@ where ) { info!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, "Starting websocket pump..." ); @@ -425,7 +425,7 @@ where { debug!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, msg = %msg, "Closing client websocket session..." @@ -439,7 +439,7 @@ where { error!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, msg = %msg, error = ?err, @@ -449,7 +449,7 @@ where debug!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, msg = %msg, "Closing backend websocket session..." @@ -463,7 +463,7 @@ where { error!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, msg = %msg, error = ?err, @@ -473,7 +473,7 @@ where } else { debug!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, "Closing client websocket session..." ); @@ -486,7 +486,7 @@ where { error!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, error = ?err, "Failed to close client websocket session" @@ -495,7 +495,7 @@ where debug!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, "Closing backend websocket session..." ); @@ -508,7 +508,7 @@ where { error!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, error = ?err, "Failed to close backend websocket session" @@ -518,7 +518,7 @@ where info!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, "Stopped websocket pump" ); @@ -539,18 +539,18 @@ where if this.ping_balance_clnt.load(Ordering::Relaxed) > ping_threshold { error!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, "More than {} websocket pings sent to client didn't return, terminating the pump...", ping_threshold, ); return Err(WS_CLNT_ERROR); } - let clnt_ping = ProxyWsPing::new(info.connection_id()); + let clnt_ping = ProxyWsPing::new(info.conn_id()); if let Err(err) = clnt_tx.ping(&clnt_ping.to_slice()).await { error!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, error = ?err, "Failed to send ping websocket message to client" @@ -567,18 +567,18 @@ where if this.ping_balance_bknd.load(Ordering::Relaxed) > ping_threshold { error!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, "More than {} websocket pings sent to backend didn't return, terminating the pump...", ping_threshold, ); return Err(WS_BKND_ERROR); } - let bknd_ping = ProxyWsPing::new(info.connection_id()); + let bknd_ping = ProxyWsPing::new(info.conn_id()); if let Err(err) = bknd_tx.send(tungstenite::Message::Ping(bknd_ping.to_bytes())).await { error!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, error = ?err, "Failed to send ping websocket message to backend" @@ -609,7 +609,7 @@ where { error!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, error = ?err, "Failed to proxy binary websocket message to backend" @@ -646,7 +646,7 @@ where { error!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, error = ?err, "Failed to proxy text websocket message to backend" @@ -675,7 +675,7 @@ where if let Err(err) = clnt_tx.pong(&bytes).await { error!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, error = ?err, "Failed to return pong message to client" @@ -707,7 +707,7 @@ where } warn!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, "Unexpected websocket pong received from client", ); @@ -729,7 +729,7 @@ where { error!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, error = ?err, "Failed to proxy close websocket message to backend" @@ -746,7 +746,7 @@ where Some(Err(err)) => { error!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, error = ?err, "Client websocket stream error" @@ -757,7 +757,7 @@ where None => { info!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, "Client had closed websocket stream" ); @@ -782,7 +782,7 @@ where if let Err(err) = clnt_tx.binary(bytes.clone()).await { error!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, error = ?err, "Failed to proxy binary websocket message to client" @@ -811,7 +811,7 @@ where if let Err(err) = clnt_tx.text(text.clone().as_str()).await { error!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, error = ?err, "Failed to proxy text websocket message to client" @@ -840,7 +840,7 @@ where if let Err(err) = bknd_tx.send(tungstenite::Message::Pong(bytes)).await { error!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, error = ?err, "Failed to return pong message to backend" @@ -872,7 +872,7 @@ where } warn!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, "Unexpected websocket pong received from backend", ); @@ -891,7 +891,7 @@ where { error!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, error = ?err, "Failed to proxy close websocket message to client" @@ -908,7 +908,7 @@ where Some(Err(err)) => { error!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, error = ?err, "Backend websocket stream error" @@ -919,7 +919,7 @@ where None => { info!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %this.id, "Backend had closed websocket stream" ); @@ -955,7 +955,7 @@ where info!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %worker_id, remote_addr = info.remote_addr(), ts_message_received = start.format(&Iso8601::DEFAULT).unwrap_or_default(), @@ -977,7 +977,7 @@ where info!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %worker_id, remote_addr = info.remote_addr(), ts_message_received = start.format(&Iso8601::DEFAULT).unwrap_or_default(), @@ -999,7 +999,7 @@ where info!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %worker_id, remote_addr = info.remote_addr(), ts_message_received = start.format(&Iso8601::DEFAULT).unwrap_or_default(), @@ -1021,7 +1021,7 @@ where info!( proxy = P::name(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %worker_id, remote_addr = info.remote_addr(), ts_message_received = start.format(&Iso8601::DEFAULT).unwrap_or_default(), @@ -1163,7 +1163,7 @@ where error!( proxy = P::name(), request_id = %info.id(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %self.worker_id, error = ?err, "Failed to re-parse client request's path and query", @@ -1177,7 +1177,7 @@ where error!( proxy = P::name(), request_id = %info.id(), - connection_id = %info.connection_id(), + connection_id = %info.conn_id(), worker_id = %self.worker_id, error = ?err, "Failed to construct backend URI, defaulting to the base one", ); @@ -1272,19 +1272,19 @@ enum ProxyWsMessage { #[derive(PartialEq, Eq)] struct ProxyWsPing { id: Uuid, - connection_id: Uuid, + conn_id: Uuid, timestamp: UtcDateTime, } impl ProxyWsPing { - fn new(connection_id: Uuid) -> Self { - Self { id: Uuid::now_v7(), connection_id, timestamp: UtcDateTime::now() } + fn new(conn_id: Uuid) -> Self { + Self { id: Uuid::now_v7(), conn_id, timestamp: UtcDateTime::now() } } fn to_bytes(&self) -> Bytes { let mut bytes = BytesMut::with_capacity(48); bytes.put_u128(self.id.as_u128()); - bytes.put_u128(self.connection_id.as_u128()); + bytes.put_u128(self.conn_id.as_u128()); bytes.put_i128(self.timestamp.unix_timestamp_nanos()); bytes.freeze() } @@ -1295,12 +1295,12 @@ impl ProxyWsPing { } let id = Uuid::from_u128(bytes.get_u128()); - let connection_id = Uuid::from_u128(bytes.get_u128()); + let conn_id = Uuid::from_u128(bytes.get_u128()); let Ok(timestamp) = UtcDateTime::from_unix_timestamp_nanos(bytes.get_i128()) else { return None }; - Some(Self { id, connection_id, timestamp }) + Some(Self { id, conn_id, timestamp }) } fn to_slice(&self) -> [u8; 48] { @@ -1308,7 +1308,7 @@ impl ProxyWsPing { let mut cur = std::io::Cursor::new(res); let _ = cur.write(self.id.as_bytes()); - let _ = cur.write(self.connection_id.as_bytes()); + let _ = cur.write(self.conn_id.as_bytes()); let _ = cur.write(&self.timestamp.unix_timestamp_nanos().to_be_bytes()); cur.into_inner()