From 9c89cd95be48121ac290dd36be42c502b26d4de9 Mon Sep 17 00:00:00 2001 From: Ash Kunda <18058966+akundaz@users.noreply.github.com> Date: Tue, 21 Oct 2025 21:01:34 -0400 Subject: [PATCH 1/4] make fmt --- crates/rproxy/src/config.rs | 2 +- crates/rproxy/src/jrpc/jrpc_request.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/rproxy/src/config.rs b/crates/rproxy/src/config.rs index 32ee416..0a116aa 100644 --- a/crates/rproxy/src/config.rs +++ b/crates/rproxy/src/config.rs @@ -4,7 +4,7 @@ use clap::Parser; use thiserror::Error; use crate::server::{ - config::{ConfigLoggingError, ConfigLogging, ConfigMetrics, ConfigMetricsError}, + config::{ConfigLogging, ConfigLoggingError, ConfigMetrics, ConfigMetricsError}, proxy::config::{ ConfigAuthrpc, ConfigAuthrpcError, diff --git a/crates/rproxy/src/jrpc/jrpc_request.rs b/crates/rproxy/src/jrpc/jrpc_request.rs index 2523e96..fba69a7 100644 --- a/crates/rproxy/src/jrpc/jrpc_request.rs +++ b/crates/rproxy/src/jrpc/jrpc_request.rs @@ -1,6 +1,7 @@ -use serde::Deserialize; use std::borrow::Cow; +use serde::Deserialize; + // JrpcRequestMeta ----------------------------------------------------- const JRPC_METHOD_FCUV1_WITH_PAYLOAD: Cow<'static, str> = From 2d8f2720ca9e594d3228125bd1e73b4a4cfa22da Mon Sep 17 00:00:00 2001 From: Ash Kunda <18058966+akundaz@users.noreply.github.com> Date: Tue, 21 Oct 2025 21:44:34 -0400 Subject: [PATCH 2/4] remove the ProxyInner trait --- crates/rproxy/src/server.rs | 10 ++- crates/rproxy/src/server/metrics.rs | 8 +- crates/rproxy/src/server/proxy.rs | 24 ++---- .../rproxy/src/server/proxy/http/authrpc.rs | 10 --- crates/rproxy/src/server/proxy/http/inner.rs | 8 +- crates/rproxy/src/server/proxy/http/proxy.rs | 79 ++++++++++++++----- crates/rproxy/src/server/proxy/http/rpc.rs | 39 ++++----- .../rproxy/src/server/proxy/ws/flashblocks.rs | 9 +-- crates/rproxy/src/server/proxy/ws/inner.rs | 4 +- crates/rproxy/src/server/proxy/ws/proxy.rs | 53 +++++++++---- 10 files changed, 133 insertions(+), 111 deletions(-) diff --git a/crates/rproxy/src/server.rs b/crates/rproxy/src/server.rs index 5417f83..c6766a4 100644 --- a/crates/rproxy/src/server.rs +++ b/crates/rproxy/src/server.rs @@ -20,7 +20,6 @@ use crate::{ server::{ metrics::Metrics, proxy::{ - ProxyInner, circuit_breaker::CircuitBreaker, config::{ConfigAuthrpc, ConfigFlashblocks, ConfigRpc}, http::{ProxyHttp, ProxyHttpInnerAuthrpc, ProxyHttpInnerRpc}, @@ -102,13 +101,14 @@ impl Server { config, tls, metrics, + "rproxy-authrpc", canceller.clone(), resetter, ) .await .inspect_err(|err| { error!( - proxy = ProxyHttpInnerRpc::name(), + proxy = "rproxy-authrpc", error = ?err, "Failed to start http-proxy, terminating...", ); @@ -130,13 +130,14 @@ impl Server { config, tls, metrics, + "rproxy-rpc", canceller.clone(), resetter, ) .await .inspect_err(|err| { error!( - proxy = ProxyHttpInnerRpc::name(), + proxy = "rproxy-rpc", error = ?err, "Failed to start http-proxy, terminating...", ); @@ -158,13 +159,14 @@ impl Server { config, tls, metrics, + "rproxy-flashblocks", canceller.clone(), resetter, ) .await .inspect_err(|err| { error!( - proxy = ProxyHttpInnerRpc::name(), + proxy = "rproxy-flashblocks", error = ?err, "Failed to start websocket-proxy, terminating...", ); diff --git a/crates/rproxy/src/server/metrics.rs b/crates/rproxy/src/server/metrics.rs index c2fdca8..c7bf27e 100644 --- a/crates/rproxy/src/server/metrics.rs +++ b/crates/rproxy/src/server/metrics.rs @@ -368,14 +368,14 @@ impl Metrics { #[derive(Clone, Debug, Default, Hash, PartialEq, Eq, EncodeLabelSet)] pub(crate) struct LabelsProxy { - pub(crate) proxy: &'static str, + pub(crate) proxy: String, } // LabelsProxyClientInfo ----------------------------------------------- #[derive(Clone, Debug, Default, Hash, PartialEq, Eq, EncodeLabelSet)] pub(crate) struct LabelsProxyClientInfo { - pub(crate) proxy: &'static str, + pub(crate) proxy: String, pub(crate) user_agent: String, } @@ -383,7 +383,7 @@ pub(crate) struct LabelsProxyClientInfo { #[derive(Clone, Debug, Default, Hash, PartialEq, Eq, EncodeLabelSet)] pub(crate) struct LabelsProxyHttpJrpc { - pub(crate) proxy: &'static str, + pub(crate) proxy: String, pub(crate) jrpc_method: Cow<'static, str>, } @@ -391,6 +391,6 @@ pub(crate) struct LabelsProxyHttpJrpc { #[derive(Clone, Debug, Default, Hash, PartialEq, Eq, EncodeLabelSet)] pub(crate) struct LabelsProxyWs { - pub(crate) proxy: &'static str, + pub(crate) proxy: String, pub(crate) destination: &'static str, } diff --git a/crates/rproxy/src/server/proxy.rs b/crates/rproxy/src/server/proxy.rs index b1fb68b..6382817 100644 --- a/crates/rproxy/src/server/proxy.rs +++ b/crates/rproxy/src/server/proxy.rs @@ -21,18 +21,16 @@ use crate::server::metrics::{LabelsProxy, Metrics}; // Proxy --------------------------------------------------------------- -pub(crate) trait Proxy

-where - P: ProxyInner, -{ +pub(crate) trait Proxy { fn on_connect( metrics: Arc, client_connections_count: Arc, + proxy_name: String, ) -> impl Fn(&dyn Any, &mut Extensions) { move |connection, extensions| { { let val = client_connections_count.fetch_add(1, Ordering::Relaxed) + 1; - let metric_labels = LabelsProxy { proxy: P::name() }; + let metric_labels = LabelsProxy { proxy: proxy_name.clone() }; metrics.client_connections_active_count.get_or_create(&metric_labels).set(val); metrics.client_connections_established_count.get_or_create(&metric_labels).inc(); @@ -76,7 +74,7 @@ where extensions.insert(ProxyConnectionGuard::new( id, - P::name(), + &proxy_name, remote_addr, local_addr, &metrics, @@ -87,12 +85,6 @@ where } } -// ProxyInner ---------------------------------------------------------- - -pub(crate) trait ProxyInner: 'static { - fn name() -> &'static str; -} - // ProxyConnectionGuard ------------------------------------------------ pub struct ProxyConnectionGuard { @@ -100,7 +92,7 @@ pub struct ProxyConnectionGuard { pub remote_addr: Option, pub local_addr: Option, - proxy_name: &'static str, + proxy_name: String, metrics: Arc, client_connections_count: Arc, } @@ -108,7 +100,7 @@ pub struct ProxyConnectionGuard { impl ProxyConnectionGuard { fn new( id: Uuid, - proxy_name: &'static str, + proxy_name: &str, remote_addr: Option, local_addr: Option, metrics: &Arc, @@ -118,7 +110,7 @@ impl ProxyConnectionGuard { id, remote_addr, local_addr, - proxy_name, + proxy_name: proxy_name.to_string(), metrics: metrics.clone(), client_connections_count, } @@ -129,7 +121,7 @@ impl Drop for ProxyConnectionGuard { fn drop(&mut self) { let val = self.client_connections_count.fetch_sub(1, Ordering::Relaxed) - 1; - let metric_labels = LabelsProxy { proxy: self.proxy_name }; + let metric_labels = LabelsProxy { proxy: self.proxy_name.clone() }; self.metrics.client_connections_active_count.get_or_create(&metric_labels).set(val); self.metrics.client_connections_closed_count.get_or_create(&metric_labels).inc(); diff --git a/crates/rproxy/src/server/proxy/http/authrpc.rs b/crates/rproxy/src/server/proxy/http/authrpc.rs index 6d45ff8..87230b7 100644 --- a/crates/rproxy/src/server/proxy/http/authrpc.rs +++ b/crates/rproxy/src/server/proxy/http/authrpc.rs @@ -1,14 +1,11 @@ use crate::{ jrpc::{JrpcRequestMeta, JrpcRequestMetaMaybeBatch}, server::proxy::{ - ProxyInner, config::ConfigAuthrpc, http::{ProxiedHttpRequest, ProxiedHttpResponse, ProxyHttpInner}, }, }; -const PROXY_HTTP_INNER_AUTHRPC_NAME: &str = "rproxy-authrpc"; - // ProxyHttpInnerAuthrpc ----------------------------------------------- #[derive(Clone)] @@ -16,13 +13,6 @@ pub(crate) struct ProxyHttpInnerAuthrpc { config: ConfigAuthrpc, } -impl ProxyInner for ProxyHttpInnerAuthrpc { - #[inline] - fn name() -> &'static str { - PROXY_HTTP_INNER_AUTHRPC_NAME - } -} - impl ProxyHttpInner for ProxyHttpInnerAuthrpc { fn new(config: ConfigAuthrpc) -> Self { Self { config } diff --git a/crates/rproxy/src/server/proxy/http/inner.rs b/crates/rproxy/src/server/proxy/http/inner.rs index dda0bc8..1f66737 100644 --- a/crates/rproxy/src/server/proxy/http/inner.rs +++ b/crates/rproxy/src/server/proxy/http/inner.rs @@ -1,15 +1,11 @@ use crate::{ jrpc::JrpcRequestMetaMaybeBatch, - server::proxy::{ - ProxyInner, - http::{ProxiedHttpRequest, ProxiedHttpResponse, config::ConfigProxyHttp}, - }, + server::proxy::http::{ProxiedHttpRequest, ProxiedHttpResponse, config::ConfigProxyHttp}, }; // ProxyHttpInner ------------------------------------------------------ -pub(crate) trait ProxyHttpInner: - ProxyInner + Clone + Send + Sized + Sync + 'static +pub(crate) trait ProxyHttpInner: Clone + Send + Sized + Sync + 'static where C: ConfigProxyHttp, { diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index f491d40..86190cd 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -99,6 +99,7 @@ where inner.clone(), id, shared.metrics.clone(), + &shared.proxy_name, config.backend_url(), connections_limit, config.backend_timeout(), @@ -113,6 +114,7 @@ where shared.inner(), id, shared.metrics.clone(), + &shared.proxy_name, peer_url.to_owned(), config.backend_max_concurrent_requests(), config.backend_timeout(), @@ -126,6 +128,7 @@ where worker_id: id, inner: inner.clone(), metrics: shared.metrics.clone(), + proxy_name: shared.proxy_name.clone(), mirroring_peers: peers.clone(), mirroring_peer_round_robin_index: AtomicUsize::new(0), } @@ -138,6 +141,7 @@ where config: C, tls: ConfigTls, metrics: Arc, + proxy_name: &str, canceller: tokio_util::sync::CancellationToken, resetter: broadcast::Sender<()>, ) -> Result<(), Box> { @@ -171,7 +175,7 @@ where ); } - let shared = ProxyHttpSharedState::::new(config, &metrics); + let shared = ProxyHttpSharedState::::new(config, &metrics, proxy_name); let client_connections_count = shared.client_connections_count.clone(); info!( @@ -191,7 +195,11 @@ where .wrap(NormalizePath::new(TrailingSlash::Trim)) .default_service(web::route().to(Self::receive)) }) - .on_connect(Self::on_connect(metrics, client_connections_count)) + .on_connect(Self::on_connect( + metrics.clone(), + client_connections_count, + proxy_name.to_string(), + )) .shutdown_signal(canceller.cancelled_owned()) .workers(workers_count); @@ -307,7 +315,7 @@ where .metrics .client_info .get_or_create(&LabelsProxyClientInfo { - proxy: P::name(), + proxy: this.shared.proxy_name.clone(), user_agent: user_agent.to_string(), }) .inc(); @@ -336,7 +344,7 @@ where this.shared .metrics .http_proxy_failure_count - .get_or_create(&LabelsProxy { proxy: P::name() }) + .get_or_create(&LabelsProxy { proxy: this.shared.proxy_name.clone() }) .inc(); return Ok(HttpResponse::BadGateway().body(format!("Backend error: {:?}", err))); } @@ -392,12 +400,14 @@ where self.postprocessor.do_send(ProxiedHttpCombo { req: cli_req, res: bck_res }); } + #[expect(clippy::too_many_arguments)] fn finalise_proxying( mut cli_req: ProxiedHttpRequest, mut bck_res: ProxiedHttpResponse, inner: Arc

, worker_id: Uuid, metrics: Arc, + proxy_name: &str, mirroring_peers: Arc>>>, mut mirroring_peer_round_robin_index: usize, ) { @@ -441,7 +451,13 @@ where worker_id, ); - Self::emit_metrics_on_proxy_success(&jrpc, &cli_req, &bck_res, metrics.clone()); + Self::emit_metrics_on_proxy_success( + &jrpc, + &cli_req, + &bck_res, + metrics.clone(), + proxy_name, + ); } Err(err) => { @@ -462,6 +478,7 @@ where mut mrr_res: ProxiedHttpResponse, inner: Arc

, metrics: Arc, + proxy_name: &str, worker_id: Uuid, ) { if cli_req.decompressed_size < cli_req.size { @@ -479,7 +496,7 @@ where metrics .http_mirror_success_count .get_or_create(&LabelsProxyHttpJrpc { - proxy: P::name(), + proxy: proxy_name.to_string(), jrpc_method: cli_req.info.jrpc_method_enriched, }) .inc(); @@ -723,15 +740,18 @@ where req: &ProxiedHttpRequest, res: &ProxiedHttpResponse, metrics: Arc, + proxy_name: &str, ) { let metric_labels_jrpc = match jrpc { - JrpcRequestMetaMaybeBatch::Single(jrpc) => { - LabelsProxyHttpJrpc { jrpc_method: jrpc.method_enriched(), proxy: P::name() } - } + JrpcRequestMetaMaybeBatch::Single(jrpc) => LabelsProxyHttpJrpc { + jrpc_method: jrpc.method_enriched(), + proxy: proxy_name.to_string(), + }, - JrpcRequestMetaMaybeBatch::Batch(_) => { - LabelsProxyHttpJrpc { jrpc_method: Cow::Borrowed("batch"), proxy: P::name() } - } + JrpcRequestMetaMaybeBatch::Batch(_) => LabelsProxyHttpJrpc { + jrpc_method: Cow::Borrowed("batch"), + proxy: proxy_name.to_string(), + }, }; let latency_backend = 1000000.0 * (res.start() - req.end()).as_seconds_f64(); @@ -765,7 +785,7 @@ where for jrpc in batch.iter() { let metric_labels_jrpc = LabelsProxyHttpJrpc { jrpc_method: jrpc.method_enriched(), - proxy: P::name(), + proxy: proxy_name.to_string(), }; metrics.http_proxy_success_count.get_or_create(&metric_labels_jrpc).inc(); } @@ -792,7 +812,7 @@ where } } -impl Proxy

for ProxyHttp +impl Proxy for ProxyHttp where C: ConfigProxyHttp, P: ProxyHttpInner, @@ -823,6 +843,7 @@ where { inner: Arc

, metrics: Arc, + proxy_name: String, client_connections_count: Arc, @@ -834,10 +855,11 @@ where C: ConfigProxyHttp, P: ProxyHttpInner, { - fn new(config: C, metrics: &Arc) -> Self { + fn new(config: C, metrics: &Arc, proxy_name: &str) -> Self { Self { inner: Arc::new(P::new(config)), metrics: metrics.clone(), + proxy_name: proxy_name.to_string(), client_connections_count: Arc::new(AtomicI64::new(0)), _config: PhantomData, } @@ -864,6 +886,7 @@ where inner: Arc

, worker_id: Uuid, metrics: Arc, + proxy_name: String, /// mirroring_peers is the vector of endpoints for mirroring peers. mirroring_peers: Arc>>>, @@ -896,6 +919,7 @@ where fn handle(&mut self, msg: ProxiedHttpCombo, ctx: &mut Self::Context) -> Self::Result { let inner = self.inner.clone(); let metrics = self.metrics.clone(); + let proxy_name = self.proxy_name.clone(); let worker_id = self.worker_id; let mirroring_peers = self.mirroring_peers.clone(); let mut mirroring_peer_round_robin_index = @@ -909,6 +933,7 @@ where inner, worker_id, metrics, + &proxy_name, mirroring_peers, mirroring_peer_round_robin_index, ); @@ -935,6 +960,7 @@ where inner: Arc

, worker_id: Uuid, metrics: Arc, + proxy_name: String, client: Client, url: Url, @@ -951,6 +977,7 @@ where inner: Arc

, worker_id: Uuid, metrics: Arc, + proxy_name: &str, url: Url, connections_limit: usize, timeout: std::time::Duration, @@ -966,7 +993,15 @@ where .timeout(timeout) .finish(); - Self { inner, worker_id, metrics, client, url, _config: PhantomData } + Self { + inner, + worker_id, + metrics, + proxy_name: proxy_name.to_string(), + client, + url, + _config: PhantomData, + } } fn new_backend_request(&self, info: &ProxyHttpRequestInfo) -> ClientRequest { @@ -1008,6 +1043,7 @@ where let inner = self.inner.clone(); let worker_id = self.worker_id; let metrics = self.metrics.clone(); + let proxy_name = self.proxy_name.clone(); let mrr_req = self.new_backend_request(&cli_req.info); let mrr_req_body = cli_req.body.clone(); @@ -1040,7 +1076,12 @@ where end, }; ProxyHttp::::postprocess_mirrored_response( - cli_req, mrr_res, inner, metrics, worker_id, + cli_req, + mrr_res, + inner, + metrics, + &proxy_name, + worker_id, ); } Err(err) => { @@ -1053,7 +1094,7 @@ where ); metrics .http_mirror_failure_count - .get_or_create(&LabelsProxy { proxy: P::name() }) + .get_or_create(&LabelsProxy { proxy: proxy_name.clone() }) .inc(); } }; @@ -1069,7 +1110,7 @@ where ); metrics .http_mirror_failure_count - .get_or_create(&LabelsProxy { proxy: P::name() }) + .get_or_create(&LabelsProxy { proxy: proxy_name.clone() }) .inc(); } } diff --git a/crates/rproxy/src/server/proxy/http/rpc.rs b/crates/rproxy/src/server/proxy/http/rpc.rs index fb80257..59eb2e5 100644 --- a/crates/rproxy/src/server/proxy/http/rpc.rs +++ b/crates/rproxy/src/server/proxy/http/rpc.rs @@ -3,14 +3,11 @@ use tracing::warn; use crate::{ jrpc::{JrpcError, JrpcRequestMeta, JrpcRequestMetaMaybeBatch, JrpcResponseMeta}, server::proxy::{ - ProxyInner, config::ConfigRpc, http::{ProxiedHttpRequest, ProxiedHttpResponse, ProxyHttpInner}, }, }; -const PROXY_HTTP_INNER_RPC_NAME: &str = "rproxy-rpc"; - // ProxyHttpInnerRpc --------------------------------------------------- #[derive(Clone)] @@ -18,13 +15,6 @@ pub(crate) struct ProxyHttpInnerRpc { config: ConfigRpc, } -impl ProxyInner for ProxyHttpInnerRpc { - #[inline] - fn name() -> &'static str { - PROXY_HTTP_INNER_RPC_NAME - } -} - impl ProxyHttpInner for ProxyHttpInnerRpc { fn new(config: ConfigRpc) -> Self { Self { config } @@ -54,20 +44,20 @@ impl ProxyHttpInner for ProxyHttpInnerRpc { match jrpc_req { JrpcRequestMetaMaybeBatch::Single(jrpc_req_single) => { - let jrpc_res_single = match serde_json::from_slice::( - &http_res.decompressed_body(), - ) { - Ok(jrpc_response) => jrpc_response, - Err(err) => { - warn!(proxy = Self::name(), error = ?err, "Failed to parse json-rpc response"); + let jrpc_res_single = + match serde_json::from_slice::(&http_res.decompressed_body()) + { + Ok(jrpc_response) => jrpc_response, + Err(err) => { + warn!(error = ?err, "Failed to parse json-rpc response"); - return should_mirror( - jrpc_req_single, - &JrpcResponseMeta { error: Some(JrpcError {}) }, - self.config.mirror_errored_requests, - ); - } - }; + return should_mirror( + jrpc_req_single, + &JrpcResponseMeta { error: Some(JrpcError {}) }, + self.config.mirror_errored_requests, + ); + } + }; should_mirror( jrpc_req_single, @@ -82,14 +72,13 @@ impl ProxyHttpInner for ProxyHttpInnerRpc { ) { Ok(jrpc_response) => jrpc_response, Err(err) => { - warn!(proxy = Self::name(), error = ?err, "Failed to parse json-rpc response"); + warn!(error = ?err, "Failed to parse json-rpc response"); vec![JrpcResponseMeta { error: Some(JrpcError {}) }; jrpc_req_batch.len()] } }; if jrpc_res_batch.len() != jrpc_req_batch.len() { warn!( - proxy = Self::name(), "A response to jrpc-batch has mismatching count of objects (want: {}, got: {})", jrpc_req_batch.len(), jrpc_res_batch.len(), diff --git a/crates/rproxy/src/server/proxy/ws/flashblocks.rs b/crates/rproxy/src/server/proxy/ws/flashblocks.rs index 6af6476..bf9c51a 100644 --- a/crates/rproxy/src/server/proxy/ws/flashblocks.rs +++ b/crates/rproxy/src/server/proxy/ws/flashblocks.rs @@ -1,5 +1,4 @@ -use crate::server::proxy::{ProxyInner, config::ConfigFlashblocks, ws::ProxyWsInner}; -const PROXY_WS_FLASHBLOCKS_RPC_NAME: &str = "rproxy-flashblocks"; +use crate::server::proxy::{config::ConfigFlashblocks, ws::ProxyWsInner}; // ProxyWsInnerFlashblocks --------------------------------------------- @@ -8,12 +7,6 @@ pub(crate) struct ProxyWsInnerFlashblocks { config: ConfigFlashblocks, } -impl ProxyInner for ProxyWsInnerFlashblocks { - fn name() -> &'static str { - PROXY_WS_FLASHBLOCKS_RPC_NAME - } -} - impl ProxyWsInner for ProxyWsInnerFlashblocks { fn new(config: ConfigFlashblocks) -> Self { Self { config } diff --git a/crates/rproxy/src/server/proxy/ws/inner.rs b/crates/rproxy/src/server/proxy/ws/inner.rs index c655eea..69726a5 100644 --- a/crates/rproxy/src/server/proxy/ws/inner.rs +++ b/crates/rproxy/src/server/proxy/ws/inner.rs @@ -1,8 +1,8 @@ -use crate::server::proxy::{ProxyInner, ws::config::ConfigProxyWs}; +use crate::server::proxy::ws::config::ConfigProxyWs; // ProxyWsInner -------------------------------------------------------- -pub(crate) trait ProxyWsInner: ProxyInner + Clone + Send + Sync +pub(crate) trait ProxyWsInner: Clone + Send + Sync + 'static where C: ConfigProxyWs, { diff --git a/crates/rproxy/src/server/proxy/ws/proxy.rs b/crates/rproxy/src/server/proxy/ws/proxy.rs index cea8955..8221501 100644 --- a/crates/rproxy/src/server/proxy/ws/proxy.rs +++ b/crates/rproxy/src/server/proxy/ws/proxy.rs @@ -103,6 +103,7 @@ where let postprocessor = ProxyWsPostprocessor:: { inner: shared.inner.clone(), metrics: shared.metrics.clone(), + proxy_name: shared.proxy_name.clone(), worker_id: id, _config: PhantomData, } @@ -131,6 +132,7 @@ where config: C, tls: ConfigTls, metrics: Arc, + proxy_name: &str, canceller: tokio_util::sync::CancellationToken, resetter: broadcast::Sender<()>, ) -> Result<(), Box> { @@ -151,7 +153,7 @@ where let workers_count = PARALLELISM.to_static(); - let shared = ProxyWsSharedState::::new(config, &metrics); + let shared = ProxyWsSharedState::::new(config, &metrics, proxy_name); let client_connections_count = shared.client_connections_count.clone(); let worker_canceller = canceller.clone(); let worker_resetter = resetter.clone(); @@ -175,7 +177,11 @@ where .wrap(NormalizePath::new(TrailingSlash::Trim)) .default_service(web::route().to(Self::receive)) }) - .on_connect(Self::on_connect(metrics, client_connections_count)) + .on_connect(Self::on_connect( + metrics.clone(), + client_connections_count, + proxy_name.to_string(), + )) .shutdown_signal(canceller.cancelled_owned()) .workers(workers_count); @@ -618,7 +624,7 @@ where .metrics .ws_proxy_failure_count .get_or_create(&LabelsProxyWs { - proxy: P::name(), + proxy: this.shared.proxy_name.clone(), destination: WS_LABEL_BACKEND, }) .inc(); @@ -655,7 +661,7 @@ where .metrics .ws_proxy_failure_count .get_or_create(&LabelsProxyWs { - proxy: P::name(), + proxy: this.shared.proxy_name.clone(), destination: WS_LABEL_BACKEND, }) .inc(); @@ -696,7 +702,7 @@ where .metrics .ws_latency_client .get_or_create(&LabelsProxyWs { - proxy: P::name(), + proxy: this.shared.proxy_name.clone(), destination: WS_LABEL_BACKEND, }) .record( @@ -791,7 +797,7 @@ where .metrics .ws_proxy_failure_count .get_or_create(&LabelsProxyWs { - proxy: P::name(), + proxy: this.shared.proxy_name.clone(), destination: WS_LABEL_CLIENT, }) .inc(); @@ -820,7 +826,7 @@ where .metrics .ws_proxy_failure_count .get_or_create(&LabelsProxyWs { - proxy: P::name(), + proxy: this.shared.proxy_name.clone(), destination: WS_LABEL_CLIENT, }) .inc(); @@ -861,7 +867,7 @@ where .metrics .ws_latency_backend .get_or_create(&LabelsProxyWs { - proxy: P::name(), + proxy: this.shared.proxy_name.clone(), destination: WS_LABEL_BACKEND, }) .record( @@ -932,11 +938,12 @@ where msg: ProxyWsMessage, inner: Arc

, metrics: Arc, + proxy_name: &str, worker_id: Uuid, ) { Self::maybe_log_proxied_message(&msg, inner.clone(), worker_id); - Self::emit_metrics_on_proxy_success(&msg, metrics.clone()); + Self::emit_metrics_on_proxy_success(&msg, metrics.clone(), proxy_name); } fn maybe_log_proxied_message(msg: &ProxyWsMessage, inner: Arc

, worker_id: Uuid) { @@ -1051,10 +1058,15 @@ where message } - fn emit_metrics_on_proxy_success(msg: &ProxyWsMessage, metrics: Arc) { + fn emit_metrics_on_proxy_success( + msg: &ProxyWsMessage, + metrics: Arc, + proxy_name: &str, + ) { match msg { ProxyWsMessage::BackendToClientBinary { msg, info: _, start, end } => { - let labels = LabelsProxyWs { proxy: P::name(), destination: WS_LABEL_CLIENT }; + let labels = + LabelsProxyWs { proxy: proxy_name.to_string(), destination: WS_LABEL_CLIENT }; metrics .ws_latency_proxy .get_or_create(&labels) @@ -1064,7 +1076,8 @@ where } ProxyWsMessage::BackendToClientText { msg, info: _, start, end } => { - let labels = LabelsProxyWs { proxy: P::name(), destination: WS_LABEL_CLIENT }; + let labels = + LabelsProxyWs { proxy: proxy_name.to_string(), destination: WS_LABEL_CLIENT }; metrics .ws_latency_proxy .get_or_create(&labels) @@ -1074,7 +1087,8 @@ where } ProxyWsMessage::ClientToBackendBinary { msg, info: _, start, end } => { - let labels = LabelsProxyWs { proxy: P::name(), destination: WS_LABEL_BACKEND }; + let labels = + LabelsProxyWs { proxy: proxy_name.to_string(), destination: WS_LABEL_BACKEND }; metrics .ws_latency_proxy .get_or_create(&labels) @@ -1084,7 +1098,8 @@ where } ProxyWsMessage::ClientToBackendText { msg, info: _, start, end } => { - let labels = LabelsProxyWs { proxy: P::name(), destination: WS_LABEL_BACKEND }; + let labels = + LabelsProxyWs { proxy: proxy_name.to_string(), destination: WS_LABEL_BACKEND }; metrics .ws_latency_proxy .get_or_create(&labels) @@ -1096,7 +1111,7 @@ where } } -impl Proxy

for ProxyWs +impl Proxy for ProxyWs where C: ConfigProxyWs, P: ProxyWsInner, @@ -1113,6 +1128,7 @@ where { inner: Arc

, metrics: Arc, + proxy_name: String, client_connections_count: Arc, @@ -1124,10 +1140,11 @@ where C: ConfigProxyWs, P: ProxyWsInner, { - fn new(config: C, metrics: &Arc) -> Self { + fn new(config: C, metrics: &Arc, proxy_name: &str) -> Self { Self { inner: Arc::new(P::new(config)), metrics: metrics.clone(), + proxy_name: proxy_name.to_string(), client_connections_count: Arc::new(AtomicI64::new(0)), _config: PhantomData, } @@ -1203,6 +1220,7 @@ where inner: Arc

, worker_id: Uuid, metrics: Arc, + proxy_name: String, _config: PhantomData, } @@ -1229,11 +1247,12 @@ where fn handle(&mut self, msg: ProxyWsMessage, ctx: &mut Self::Context) -> Self::Result { let inner = self.inner.clone(); let metrics = self.metrics.clone(); + let proxy_name = self.proxy_name.clone(); let worker_id = self.worker_id; ctx.spawn( async move { - ProxyWs::::finalise_proxying(msg, inner, metrics, worker_id); + ProxyWs::::finalise_proxying(msg, inner, metrics, &proxy_name, worker_id); } .into_actor(self), ); From d2360783c0a86cd46ea11cc2f6a3ee709048811b Mon Sep 17 00:00:00 2001 From: Ash Kunda <18058966+akundaz@users.noreply.github.com> Date: Wed, 22 Oct 2025 10:02:02 -0400 Subject: [PATCH 3/4] use &'static str for metric labels --- crates/rproxy/src/server/metrics.rs | 8 +- crates/rproxy/src/server/proxy.rs | 14 ++-- crates/rproxy/src/server/proxy/http/proxy.rs | 81 ++++++++------------ crates/rproxy/src/server/proxy/ws/proxy.rs | 50 +++++------- 4 files changed, 63 insertions(+), 90 deletions(-) diff --git a/crates/rproxy/src/server/metrics.rs b/crates/rproxy/src/server/metrics.rs index c7bf27e..c2fdca8 100644 --- a/crates/rproxy/src/server/metrics.rs +++ b/crates/rproxy/src/server/metrics.rs @@ -368,14 +368,14 @@ impl Metrics { #[derive(Clone, Debug, Default, Hash, PartialEq, Eq, EncodeLabelSet)] pub(crate) struct LabelsProxy { - pub(crate) proxy: String, + pub(crate) proxy: &'static str, } // LabelsProxyClientInfo ----------------------------------------------- #[derive(Clone, Debug, Default, Hash, PartialEq, Eq, EncodeLabelSet)] pub(crate) struct LabelsProxyClientInfo { - pub(crate) proxy: String, + pub(crate) proxy: &'static str, pub(crate) user_agent: String, } @@ -383,7 +383,7 @@ pub(crate) struct LabelsProxyClientInfo { #[derive(Clone, Debug, Default, Hash, PartialEq, Eq, EncodeLabelSet)] pub(crate) struct LabelsProxyHttpJrpc { - pub(crate) proxy: String, + pub(crate) proxy: &'static str, pub(crate) jrpc_method: Cow<'static, str>, } @@ -391,6 +391,6 @@ pub(crate) struct LabelsProxyHttpJrpc { #[derive(Clone, Debug, Default, Hash, PartialEq, Eq, EncodeLabelSet)] pub(crate) struct LabelsProxyWs { - pub(crate) proxy: String, + pub(crate) proxy: &'static str, pub(crate) destination: &'static str, } diff --git a/crates/rproxy/src/server/proxy.rs b/crates/rproxy/src/server/proxy.rs index 6382817..305c27b 100644 --- a/crates/rproxy/src/server/proxy.rs +++ b/crates/rproxy/src/server/proxy.rs @@ -25,12 +25,12 @@ pub(crate) trait Proxy { fn on_connect( metrics: Arc, client_connections_count: Arc, - proxy_name: String, + proxy_name: &'static str, ) -> impl Fn(&dyn Any, &mut Extensions) { move |connection, extensions| { { let val = client_connections_count.fetch_add(1, Ordering::Relaxed) + 1; - let metric_labels = LabelsProxy { proxy: proxy_name.clone() }; + let metric_labels = LabelsProxy { proxy: proxy_name }; metrics.client_connections_active_count.get_or_create(&metric_labels).set(val); metrics.client_connections_established_count.get_or_create(&metric_labels).inc(); @@ -74,7 +74,7 @@ pub(crate) trait Proxy { extensions.insert(ProxyConnectionGuard::new( id, - &proxy_name, + proxy_name, remote_addr, local_addr, &metrics, @@ -92,7 +92,7 @@ pub struct ProxyConnectionGuard { pub remote_addr: Option, pub local_addr: Option, - proxy_name: String, + proxy_name: &'static str, metrics: Arc, client_connections_count: Arc, } @@ -100,7 +100,7 @@ pub struct ProxyConnectionGuard { impl ProxyConnectionGuard { fn new( id: Uuid, - proxy_name: &str, + proxy_name: &'static str, remote_addr: Option, local_addr: Option, metrics: &Arc, @@ -110,7 +110,7 @@ impl ProxyConnectionGuard { id, remote_addr, local_addr, - proxy_name: proxy_name.to_string(), + proxy_name, metrics: metrics.clone(), client_connections_count, } @@ -121,7 +121,7 @@ impl Drop for ProxyConnectionGuard { fn drop(&mut self) { let val = self.client_connections_count.fetch_sub(1, Ordering::Relaxed) - 1; - let metric_labels = LabelsProxy { proxy: self.proxy_name.clone() }; + let metric_labels = LabelsProxy { proxy: self.proxy_name }; self.metrics.client_connections_active_count.get_or_create(&metric_labels).set(val); self.metrics.client_connections_closed_count.get_or_create(&metric_labels).inc(); diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 86190cd..0404a25 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -99,7 +99,7 @@ where inner.clone(), id, shared.metrics.clone(), - &shared.proxy_name, + shared.proxy_name, config.backend_url(), connections_limit, config.backend_timeout(), @@ -114,7 +114,7 @@ where shared.inner(), id, shared.metrics.clone(), - &shared.proxy_name, + shared.proxy_name, peer_url.to_owned(), config.backend_max_concurrent_requests(), config.backend_timeout(), @@ -128,7 +128,7 @@ where worker_id: id, inner: inner.clone(), metrics: shared.metrics.clone(), - proxy_name: shared.proxy_name.clone(), + proxy_name: shared.proxy_name, mirroring_peers: peers.clone(), mirroring_peer_round_robin_index: AtomicUsize::new(0), } @@ -141,7 +141,7 @@ where config: C, tls: ConfigTls, metrics: Arc, - proxy_name: &str, + proxy_name: &'static str, canceller: tokio_util::sync::CancellationToken, resetter: broadcast::Sender<()>, ) -> Result<(), Box> { @@ -195,11 +195,7 @@ where .wrap(NormalizePath::new(TrailingSlash::Trim)) .default_service(web::route().to(Self::receive)) }) - .on_connect(Self::on_connect( - metrics.clone(), - client_connections_count, - proxy_name.to_string(), - )) + .on_connect(Self::on_connect(metrics.clone(), client_connections_count, proxy_name)) .shutdown_signal(canceller.cancelled_owned()) .workers(workers_count); @@ -315,7 +311,7 @@ where .metrics .client_info .get_or_create(&LabelsProxyClientInfo { - proxy: this.shared.proxy_name.clone(), + proxy: this.shared.proxy_name, user_agent: user_agent.to_string(), }) .inc(); @@ -344,7 +340,7 @@ where this.shared .metrics .http_proxy_failure_count - .get_or_create(&LabelsProxy { proxy: this.shared.proxy_name.clone() }) + .get_or_create(&LabelsProxy { proxy: this.shared.proxy_name }) .inc(); return Ok(HttpResponse::BadGateway().body(format!("Backend error: {:?}", err))); } @@ -407,7 +403,7 @@ where inner: Arc

, worker_id: Uuid, metrics: Arc, - proxy_name: &str, + proxy_name: &'static str, mirroring_peers: Arc>>>, mut mirroring_peer_round_robin_index: usize, ) { @@ -478,7 +474,7 @@ where mut mrr_res: ProxiedHttpResponse, inner: Arc

, metrics: Arc, - proxy_name: &str, + proxy_name: &'static str, worker_id: Uuid, ) { if cli_req.decompressed_size < cli_req.size { @@ -496,7 +492,7 @@ where metrics .http_mirror_success_count .get_or_create(&LabelsProxyHttpJrpc { - proxy: proxy_name.to_string(), + proxy: proxy_name, jrpc_method: cli_req.info.jrpc_method_enriched, }) .inc(); @@ -740,18 +736,16 @@ where req: &ProxiedHttpRequest, res: &ProxiedHttpResponse, metrics: Arc, - proxy_name: &str, + proxy_name: &'static str, ) { let metric_labels_jrpc = match jrpc { - JrpcRequestMetaMaybeBatch::Single(jrpc) => LabelsProxyHttpJrpc { - jrpc_method: jrpc.method_enriched(), - proxy: proxy_name.to_string(), - }, + JrpcRequestMetaMaybeBatch::Single(jrpc) => { + LabelsProxyHttpJrpc { jrpc_method: jrpc.method_enriched(), proxy: proxy_name } + } - JrpcRequestMetaMaybeBatch::Batch(_) => LabelsProxyHttpJrpc { - jrpc_method: Cow::Borrowed("batch"), - proxy: proxy_name.to_string(), - }, + JrpcRequestMetaMaybeBatch::Batch(_) => { + LabelsProxyHttpJrpc { jrpc_method: Cow::Borrowed("batch"), proxy: proxy_name } + } }; let latency_backend = 1000000.0 * (res.start() - req.end()).as_seconds_f64(); @@ -785,7 +779,7 @@ where for jrpc in batch.iter() { let metric_labels_jrpc = LabelsProxyHttpJrpc { jrpc_method: jrpc.method_enriched(), - proxy: proxy_name.to_string(), + proxy: proxy_name, }; metrics.http_proxy_success_count.get_or_create(&metric_labels_jrpc).inc(); } @@ -843,7 +837,7 @@ where { inner: Arc

, metrics: Arc, - proxy_name: String, + proxy_name: &'static str, client_connections_count: Arc, @@ -855,11 +849,11 @@ where C: ConfigProxyHttp, P: ProxyHttpInner, { - fn new(config: C, metrics: &Arc, proxy_name: &str) -> Self { + fn new(config: C, metrics: &Arc, proxy_name: &'static str) -> Self { Self { inner: Arc::new(P::new(config)), metrics: metrics.clone(), - proxy_name: proxy_name.to_string(), + proxy_name, client_connections_count: Arc::new(AtomicI64::new(0)), _config: PhantomData, } @@ -886,7 +880,7 @@ where inner: Arc

, worker_id: Uuid, metrics: Arc, - proxy_name: String, + proxy_name: &'static str, /// mirroring_peers is the vector of endpoints for mirroring peers. mirroring_peers: Arc>>>, @@ -919,7 +913,7 @@ where fn handle(&mut self, msg: ProxiedHttpCombo, ctx: &mut Self::Context) -> Self::Result { let inner = self.inner.clone(); let metrics = self.metrics.clone(); - let proxy_name = self.proxy_name.clone(); + let proxy_name = self.proxy_name; let worker_id = self.worker_id; let mirroring_peers = self.mirroring_peers.clone(); let mut mirroring_peer_round_robin_index = @@ -933,7 +927,7 @@ where inner, worker_id, metrics, - &proxy_name, + proxy_name, mirroring_peers, mirroring_peer_round_robin_index, ); @@ -960,7 +954,7 @@ where inner: Arc

, worker_id: Uuid, metrics: Arc, - proxy_name: String, + proxy_name: &'static str, client: Client, url: Url, @@ -977,7 +971,7 @@ where inner: Arc

, worker_id: Uuid, metrics: Arc, - proxy_name: &str, + proxy_name: &'static str, url: Url, connections_limit: usize, timeout: std::time::Duration, @@ -993,15 +987,7 @@ where .timeout(timeout) .finish(); - Self { - inner, - worker_id, - metrics, - proxy_name: proxy_name.to_string(), - client, - url, - _config: PhantomData, - } + Self { inner, worker_id, metrics, proxy_name, client, url, _config: PhantomData } } fn new_backend_request(&self, info: &ProxyHttpRequestInfo) -> ClientRequest { @@ -1043,7 +1029,7 @@ where let inner = self.inner.clone(); let worker_id = self.worker_id; let metrics = self.metrics.clone(); - let proxy_name = self.proxy_name.clone(); + let proxy_name = self.proxy_name; let mrr_req = self.new_backend_request(&cli_req.info); let mrr_req_body = cli_req.body.clone(); @@ -1076,12 +1062,7 @@ where end, }; ProxyHttp::::postprocess_mirrored_response( - cli_req, - mrr_res, - inner, - metrics, - &proxy_name, - worker_id, + cli_req, mrr_res, inner, metrics, proxy_name, worker_id, ); } Err(err) => { @@ -1094,7 +1075,7 @@ where ); metrics .http_mirror_failure_count - .get_or_create(&LabelsProxy { proxy: proxy_name.clone() }) + .get_or_create(&LabelsProxy { proxy: proxy_name }) .inc(); } }; @@ -1110,7 +1091,7 @@ where ); metrics .http_mirror_failure_count - .get_or_create(&LabelsProxy { proxy: proxy_name.clone() }) + .get_or_create(&LabelsProxy { proxy: proxy_name }) .inc(); } } diff --git a/crates/rproxy/src/server/proxy/ws/proxy.rs b/crates/rproxy/src/server/proxy/ws/proxy.rs index 8221501..39d16ea 100644 --- a/crates/rproxy/src/server/proxy/ws/proxy.rs +++ b/crates/rproxy/src/server/proxy/ws/proxy.rs @@ -103,7 +103,7 @@ where let postprocessor = ProxyWsPostprocessor:: { inner: shared.inner.clone(), metrics: shared.metrics.clone(), - proxy_name: shared.proxy_name.clone(), + proxy_name: shared.proxy_name, worker_id: id, _config: PhantomData, } @@ -132,7 +132,7 @@ where config: C, tls: ConfigTls, metrics: Arc, - proxy_name: &str, + proxy_name: &'static str, canceller: tokio_util::sync::CancellationToken, resetter: broadcast::Sender<()>, ) -> Result<(), Box> { @@ -177,11 +177,7 @@ where .wrap(NormalizePath::new(TrailingSlash::Trim)) .default_service(web::route().to(Self::receive)) }) - .on_connect(Self::on_connect( - metrics.clone(), - client_connections_count, - proxy_name.to_string(), - )) + .on_connect(Self::on_connect(metrics.clone(), client_connections_count, proxy_name)) .shutdown_signal(canceller.cancelled_owned()) .workers(workers_count); @@ -624,7 +620,7 @@ where .metrics .ws_proxy_failure_count .get_or_create(&LabelsProxyWs { - proxy: this.shared.proxy_name.clone(), + proxy: this.shared.proxy_name, destination: WS_LABEL_BACKEND, }) .inc(); @@ -661,7 +657,7 @@ where .metrics .ws_proxy_failure_count .get_or_create(&LabelsProxyWs { - proxy: this.shared.proxy_name.clone(), + proxy: this.shared.proxy_name, destination: WS_LABEL_BACKEND, }) .inc(); @@ -702,7 +698,7 @@ where .metrics .ws_latency_client .get_or_create(&LabelsProxyWs { - proxy: this.shared.proxy_name.clone(), + proxy: this.shared.proxy_name, destination: WS_LABEL_BACKEND, }) .record( @@ -797,7 +793,7 @@ where .metrics .ws_proxy_failure_count .get_or_create(&LabelsProxyWs { - proxy: this.shared.proxy_name.clone(), + proxy: this.shared.proxy_name, destination: WS_LABEL_CLIENT, }) .inc(); @@ -826,7 +822,7 @@ where .metrics .ws_proxy_failure_count .get_or_create(&LabelsProxyWs { - proxy: this.shared.proxy_name.clone(), + proxy: this.shared.proxy_name, destination: WS_LABEL_CLIENT, }) .inc(); @@ -867,7 +863,7 @@ where .metrics .ws_latency_backend .get_or_create(&LabelsProxyWs { - proxy: this.shared.proxy_name.clone(), + proxy: this.shared.proxy_name, destination: WS_LABEL_BACKEND, }) .record( @@ -938,7 +934,7 @@ where msg: ProxyWsMessage, inner: Arc

, metrics: Arc, - proxy_name: &str, + proxy_name: &'static str, worker_id: Uuid, ) { Self::maybe_log_proxied_message(&msg, inner.clone(), worker_id); @@ -1061,12 +1057,11 @@ where fn emit_metrics_on_proxy_success( msg: &ProxyWsMessage, metrics: Arc, - proxy_name: &str, + proxy_name: &'static str, ) { match msg { ProxyWsMessage::BackendToClientBinary { msg, info: _, start, end } => { - let labels = - LabelsProxyWs { proxy: proxy_name.to_string(), destination: WS_LABEL_CLIENT }; + let labels = LabelsProxyWs { proxy: proxy_name, destination: WS_LABEL_CLIENT }; metrics .ws_latency_proxy .get_or_create(&labels) @@ -1076,8 +1071,7 @@ where } ProxyWsMessage::BackendToClientText { msg, info: _, start, end } => { - let labels = - LabelsProxyWs { proxy: proxy_name.to_string(), destination: WS_LABEL_CLIENT }; + let labels = LabelsProxyWs { proxy: proxy_name, destination: WS_LABEL_CLIENT }; metrics .ws_latency_proxy .get_or_create(&labels) @@ -1087,8 +1081,7 @@ where } ProxyWsMessage::ClientToBackendBinary { msg, info: _, start, end } => { - let labels = - LabelsProxyWs { proxy: proxy_name.to_string(), destination: WS_LABEL_BACKEND }; + let labels = LabelsProxyWs { proxy: proxy_name, destination: WS_LABEL_BACKEND }; metrics .ws_latency_proxy .get_or_create(&labels) @@ -1098,8 +1091,7 @@ where } ProxyWsMessage::ClientToBackendText { msg, info: _, start, end } => { - let labels = - LabelsProxyWs { proxy: proxy_name.to_string(), destination: WS_LABEL_BACKEND }; + let labels = LabelsProxyWs { proxy: proxy_name, destination: WS_LABEL_BACKEND }; metrics .ws_latency_proxy .get_or_create(&labels) @@ -1128,7 +1120,7 @@ where { inner: Arc

, metrics: Arc, - proxy_name: String, + proxy_name: &'static str, client_connections_count: Arc, @@ -1140,11 +1132,11 @@ where C: ConfigProxyWs, P: ProxyWsInner, { - fn new(config: C, metrics: &Arc, proxy_name: &str) -> Self { + fn new(config: C, metrics: &Arc, proxy_name: &'static str) -> Self { Self { inner: Arc::new(P::new(config)), metrics: metrics.clone(), - proxy_name: proxy_name.to_string(), + proxy_name, client_connections_count: Arc::new(AtomicI64::new(0)), _config: PhantomData, } @@ -1220,7 +1212,7 @@ where inner: Arc

, worker_id: Uuid, metrics: Arc, - proxy_name: String, + proxy_name: &'static str, _config: PhantomData, } @@ -1247,12 +1239,12 @@ where fn handle(&mut self, msg: ProxyWsMessage, ctx: &mut Self::Context) -> Self::Result { let inner = self.inner.clone(); let metrics = self.metrics.clone(); - let proxy_name = self.proxy_name.clone(); + let proxy_name = self.proxy_name; let worker_id = self.worker_id; ctx.spawn( async move { - ProxyWs::::finalise_proxying(msg, inner, metrics, &proxy_name, worker_id); + ProxyWs::::finalise_proxying(msg, inner, metrics, proxy_name, worker_id); } .into_actor(self), ); From eda15bc895c30fbf66153544d2d92a592a0b2353 Mon Sep 17 00:00:00 2001 From: Ash Kunda <18058966+akundaz@users.noreply.github.com> Date: Wed, 22 Oct 2025 16:31:07 -0400 Subject: [PATCH 4/4] instead of using spans, manually specify the proxy name --- crates/rproxy/src/server/proxy.rs | 6 +- crates/rproxy/src/server/proxy/http/proxy.rs | 45 ++++---- crates/rproxy/src/server/proxy/ws/proxy.rs | 109 ++++++++++--------- 3 files changed, 84 insertions(+), 76 deletions(-) diff --git a/crates/rproxy/src/server/proxy.rs b/crates/rproxy/src/server/proxy.rs index 305c27b..436c57a 100644 --- a/crates/rproxy/src/server/proxy.rs +++ b/crates/rproxy/src/server/proxy.rs @@ -52,20 +52,20 @@ pub(crate) trait Proxy { let remote_addr = match stream.peer_addr() { Ok(local_addr) => Some(local_addr.to_string()), Err(err) => { - warn!(proxy = P::name(), error = ?err, "Failed to get remote address"); + warn!(proxy = proxy_name, error = ?err, "Failed to get remote address"); None } }; let local_addr = match stream.local_addr() { Ok(local_addr) => Some(local_addr.to_string()), Err(err) => { - warn!(proxy = P::name(), error = ?err, "Failed to get remote address"); + warn!(proxy = proxy_name, error = ?err, "Failed to get remote address"); None } }; debug!( - proxy = P::name(), + proxy = proxy_name, connection_id = %id, remote_addr = remote_addr.as_ref().map_or("unknown", |v| v.as_str()), local_addr = local_addr.as_ref().map_or("unknown", |v| v.as_str()), diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 0404a25..807d22f 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -90,7 +90,7 @@ where fn new(shared: ProxyHttpSharedState, connections_limit: usize) -> Self { let id = Uuid::now_v7(); - debug!(proxy = P::name(), worker_id = %id, "Creating http-proxy worker..."); + debug!(proxy = shared.proxy_name, worker_id = %id, "Creating http-proxy worker..."); let config = shared.config(); let inner = shared.inner(); @@ -151,7 +151,7 @@ where Ok(listener) => listener, Err(err) => { error!( - proxy = P::name(), + proxy = proxy_name, addr = %config.listen_address(), error = ?err, "Failed to initialise a socket" @@ -179,7 +179,7 @@ where let client_connections_count = shared.client_connections_count.clone(); info!( - proxy = P::name(), + proxy = proxy_name, listen_address = %listen_address, workers_count = workers_count, max_concurrent_requests_per_worker = max_concurrent_requests_per_worker, @@ -216,7 +216,7 @@ where Ok(server) => server, Err(err) => { error!( - proxy = P::name(), + proxy = proxy_name, error = ?err, "Failed to initialise http-proxy", ); @@ -229,16 +229,16 @@ where let mut resetter = resetter.subscribe(); tokio::spawn(async move { if resetter.recv().await.is_ok() { - info!(proxy = P::name(), "Reset signal received, stopping http-proxy..."); + info!(proxy = proxy_name, "Reset signal received, stopping http-proxy..."); handler.stop(true).await; } }); if let Err(err) = server.await { - error!(proxy = P::name(), error = ?err, "Failure while running http-proxy") + error!(proxy = proxy_name, error = ?err, "Failure while running http-proxy") } - info!(proxy = P::name(), "Stopped http-proxy"); + info!(proxy = proxy_name, "Stopped http-proxy"); Ok(()) } @@ -329,7 +329,7 @@ where Ok(res) => res, Err(err) => { warn!( - proxy = P::name(), + proxy = this.shared.proxy_name, request_id = %id, connection_id = %connection_id, worker_id = %this.id, @@ -368,7 +368,7 @@ where if self.requests.insert_sync(id, req).is_err() { error!( - proxy = P::name(), + proxy = self.shared.proxy_name, request_id = %id, connection_id = %connection_id, worker_id = %self.id, @@ -382,7 +382,7 @@ where Some((_, req)) => req, None => { error!( - proxy = P::name(), + proxy = self.shared.proxy_name, request_id = %bck_res.info.id, worker_id = %self.id, "Proxied http response for unmatching request", @@ -444,6 +444,7 @@ where &cli_req, &bck_res, inner.clone(), + proxy_name, worker_id, ); @@ -458,7 +459,7 @@ where Err(err) => { warn!( - proxy = P::name(), + proxy = proxy_name, request_id = %cli_req.info.id, connection_id = %cli_req.info.connection_id, worker_id = %worker_id, @@ -487,7 +488,7 @@ where decompress(mrr_res.body.clone(), mrr_res.size, mrr_res.info.content_encoding()); } - Self::maybe_log_mirrored_request(&cli_req, &mrr_res, worker_id, inner.config()); + Self::maybe_log_mirrored_request(&cli_req, &mrr_res, proxy_name, worker_id, inner.config()); metrics .http_mirror_success_count @@ -503,6 +504,7 @@ where req: &ProxiedHttpRequest, res: &ProxiedHttpResponse, inner: Arc

, + proxy_name: &'static str, worker_id: Uuid, ) { let config = inner.config(); @@ -526,7 +528,7 @@ where }; info!( - proxy = P::name(), + proxy = proxy_name, request_id = %req.info.id, connection_id = %req.info.connection_id, worker_id = %worker_id, @@ -545,6 +547,7 @@ where fn maybe_log_mirrored_request( req: &ProxiedHttpRequest, res: &ProxiedHttpResponse, + proxy_name: &'static str, worker_id: Uuid, config: &C, ) { @@ -567,7 +570,7 @@ where }; info!( - proxy = P::name(), + proxy = proxy_name, request_id = %req.info.id, connection_id = %req.info.connection_id, worker_id = %worker_id, @@ -820,7 +823,7 @@ where { fn drop(&mut self) { debug!( - proxy = P::name(), + proxy = self.shared.proxy_name, worker_id = %self.id, "Destroying http-proxy worker...", ); @@ -1067,7 +1070,7 @@ where } Err(err) => { warn!( - proxy = P::name(), + proxy = proxy_name, request_id = %cli_req.info.id, connection_id = %cli_req.info.connection_id, error = ?err, @@ -1083,7 +1086,7 @@ where Err(err) => { warn!( - proxy = P::name(), + proxy = proxy_name, request_id = %cli_req.info.id, connection_id = %cli_req.info.connection_id, error = ?err, @@ -1309,7 +1312,7 @@ where Poll::Ready(Some(Err(err))) => { if let Some(info) = mem::take(this.info) { warn!( - proxy = P::name(), + proxy = this.proxy.shared.proxy_name, request_id = %info.id(), connection_id = %info.connection_id(), error = ?err, @@ -1317,7 +1320,7 @@ where ); } else { warn!( - proxy = P::name(), + proxy = this.proxy.shared.proxy_name, error = ?err, request_id = "unknown", "Proxy http request stream error", @@ -1407,14 +1410,14 @@ where Poll::Ready(Some(Err(err))) => { if let Some(info) = mem::take(this.info) { warn!( - proxy = P::name(), + proxy = this.proxy.shared.proxy_name, request_id = %info.id(), error = ?err, "Proxy http response stream error", ); } else { warn!( - proxy = P::name(), + proxy = this.proxy.shared.proxy_name, error = ?err, request_id = "unknown", "Proxy http response stream error", diff --git a/crates/rproxy/src/server/proxy/ws/proxy.rs b/crates/rproxy/src/server/proxy/ws/proxy.rs index 39d16ea..2abc1f5 100644 --- a/crates/rproxy/src/server/proxy/ws/proxy.rs +++ b/crates/rproxy/src/server/proxy/ws/proxy.rs @@ -98,7 +98,7 @@ where let config = shared.config(); - let backend = ProxyWsBackendEndpoint::new(id, config.backend_url()); + let backend = ProxyWsBackendEndpoint::new(id, shared.proxy_name, config.backend_url()); let postprocessor = ProxyWsPostprocessor:: { inner: shared.inner.clone(), @@ -142,7 +142,7 @@ where Ok(listener) => listener, Err(err) => { error!( - proxy = P::name(), + proxy = proxy_name, addr = %config.listen_address(), error = ?err, "Failed to initialise a socket" @@ -159,7 +159,7 @@ where let worker_resetter = resetter.clone(); info!( - proxy = P::name(), + proxy = proxy_name, listen_address = %listen_address, workers_count = workers_count, "Starting websocket-proxy...", @@ -197,7 +197,7 @@ where } { Ok(server) => server, Err(err) => { - error!(proxy = P::name(), error = ?err, "Failed to initialise websocket-proxy"); + error!(proxy = proxy_name, error = ?err, "Failed to initialise websocket-proxy"); return Err(Box::new(err)); } } @@ -207,16 +207,16 @@ where let mut resetter = resetter.subscribe(); tokio::spawn(async move { if resetter.recv().await.is_ok() { - info!(proxy = P::name(), "Reset signal received, stopping websocket-proxy..."); + info!(proxy = proxy_name, "Reset signal received, stopping websocket-proxy..."); handler.stop(true).await; } }); if let Err(err) = proxy.await { - error!(proxy = P::name(), error = ?err, "Failure while running websocket-proxy") + error!(proxy = proxy_name, error = ?err, "Failure while running websocket-proxy") } - info!(proxy = P::name(), "Stopped websocket-proxy"); + info!(proxy = proxy_name, "Stopped websocket-proxy"); Ok(()) } @@ -255,7 +255,7 @@ where Ok(res) => res, Err(err) => { error!( - proxy = P::name(), + proxy = this.shared.proxy_name, request_id = %info.id(), connection_id = %info.connection_id(), worker_id = %this.id, @@ -279,7 +279,7 @@ where ) { let bck_uri = this.backend.new_backend_uri(&info); trace!( - proxy = P::name(), + proxy = this.shared.proxy_name, request_id = %info.id(), connection_id = %info.connection_id(), worker_id = %this.id, @@ -297,7 +297,7 @@ where Ok(Err(err)) => { error!( - proxy = P::name(), + proxy = this.shared.proxy_name, request_id = %info.id(), connection_id = %info.connection_id(), worker_id = %this.id, @@ -313,7 +313,7 @@ where .await { error!( - proxy = P::name(), + proxy = this.shared.proxy_name, request_id = %info.id(), connection_id = %info.connection_id(), worker_id = %this.id, @@ -326,7 +326,7 @@ where Err(_) => { error!( - proxy = P::name(), + proxy = this.shared.proxy_name, request_id = %info.id(), connection_id = %info.connection_id(), worker_id = %this.id, @@ -341,7 +341,7 @@ where .await { error!( - proxy = P::name(), + proxy = this.shared.proxy_name, request_id = %info.id(), connection_id = %info.connection_id(), worker_id = %this.id, @@ -367,7 +367,7 @@ where mut bck_rx: SplitStream>>, ) { info!( - proxy = P::name(), + proxy = this.shared.proxy_name, connection_id = %info.connection_id(), worker_id = %this.id, "Starting websocket pump..." @@ -426,7 +426,7 @@ where msg != WS_CLOSE_OK { debug!( - proxy = P::name(), + proxy = this.shared.proxy_name, connection_id = %info.connection_id(), worker_id = %this.id, msg = %msg, @@ -440,7 +440,7 @@ where .await { error!( - proxy = P::name(), + proxy = this.shared.proxy_name, connection_id = %info.connection_id(), worker_id = %this.id, msg = %msg, @@ -450,7 +450,7 @@ where } debug!( - proxy = P::name(), + proxy = this.shared.proxy_name, connection_id = %info.connection_id(), worker_id = %this.id, msg = %msg, @@ -464,7 +464,7 @@ where .await { error!( - proxy = P::name(), + proxy = this.shared.proxy_name, connection_id = %info.connection_id(), worker_id = %this.id, msg = %msg, @@ -474,7 +474,7 @@ where } } else { debug!( - proxy = P::name(), + proxy = this.shared.proxy_name, connection_id = %info.connection_id(), worker_id = %this.id, "Closing client websocket session..." @@ -487,7 +487,7 @@ where .await { error!( - proxy = P::name(), + proxy = this.shared.proxy_name, connection_id = %info.connection_id(), worker_id = %this.id, error = ?err, @@ -496,7 +496,7 @@ where } debug!( - proxy = P::name(), + proxy = this.shared.proxy_name, connection_id = %info.connection_id(), worker_id = %this.id, "Closing backend websocket session..." @@ -509,7 +509,7 @@ where .await { error!( - proxy = P::name(), + proxy = this.shared.proxy_name, connection_id = %info.connection_id(), worker_id = %this.id, error = ?err, @@ -519,7 +519,7 @@ where } info!( - proxy = P::name(), + proxy = this.shared.proxy_name, connection_id = %info.connection_id(), worker_id = %this.id, "Stopped websocket pump" @@ -540,7 +540,7 @@ where if this.ping_balance_cli.load(Ordering::Relaxed) > ping_threshold { error!( - proxy = P::name(), + proxy = this.shared.proxy_name, connection_id = %info.connection_id(), worker_id = %this.id, "More than {} websocket pings sent to client didn't return, terminating the pump...", ping_threshold, @@ -551,7 +551,7 @@ where let cli_ping = ProxyWsPing::new(info.connection_id()); if let Err(err) = cli_tx.ping(&cli_ping.to_slice()).await { error!( - proxy = P::name(), + proxy = this.shared.proxy_name, connection_id = %info.connection_id(), worker_id = %this.id, error = ?err, @@ -568,7 +568,7 @@ where if this.ping_balance_bck.load(Ordering::Relaxed) > ping_threshold { error!( - proxy = P::name(), + proxy = this.shared.proxy_name, connection_id = %info.connection_id(), worker_id = %this.id, "More than {} websocket pings sent to backend didn't return, terminating the pump...", ping_threshold, @@ -579,7 +579,7 @@ where let bck_ping = ProxyWsPing::new(info.connection_id()); if let Err(err) = bck_tx.send(tungstenite::Message::Ping(bck_ping.to_bytes())).await { error!( - proxy = P::name(), + proxy = this.shared.proxy_name, connection_id = %info.connection_id(), worker_id = %this.id, error = ?err, @@ -610,7 +610,7 @@ where bck_tx.send(tungstenite::Message::Binary(bytes.clone())).await { error!( - proxy = P::name(), + proxy = this.shared.proxy_name, connection_id = %info.connection_id(), worker_id = %this.id, error = ?err, @@ -647,7 +647,7 @@ where .await { error!( - proxy = P::name(), + proxy = this.shared.proxy_name, connection_id = %info.connection_id(), worker_id = %this.id, error = ?err, @@ -676,7 +676,7 @@ where actix_ws::Message::Ping(bytes) => { if let Err(err) = cli_tx.pong(&bytes).await { error!( - proxy = P::name(), + proxy = this.shared.proxy_name, connection_id = %info.connection_id(), worker_id = %this.id, error = ?err, @@ -708,7 +708,7 @@ where return Ok(()); } warn!( - proxy = P::name(), + proxy = this.shared.proxy_name, connection_id = %info.connection_id(), worker_id = %this.id, "Unexpected websocket pong received from client", @@ -730,7 +730,7 @@ where .await { error!( - proxy = P::name(), + proxy = this.shared.proxy_name, connection_id = %info.connection_id(), worker_id = %this.id, error = ?err, @@ -747,7 +747,7 @@ where Some(Err(err)) => { error!( - proxy = P::name(), + proxy = this.shared.proxy_name, connection_id = %info.connection_id(), worker_id = %this.id, error = ?err, @@ -758,7 +758,7 @@ where None => { info!( - proxy = P::name(), + proxy = this.shared.proxy_name, connection_id = %info.connection_id(), worker_id = %this.id, "Client had closed websocket stream" @@ -783,7 +783,7 @@ where tungstenite::Message::Binary(bytes) => { if let Err(err) = cli_tx.binary(bytes.clone()).await { error!( - proxy = P::name(), + proxy = this.shared.proxy_name, connection_id = %info.connection_id(), worker_id = %this.id, error = ?err, @@ -812,7 +812,7 @@ where tungstenite::Message::Text(text) => { if let Err(err) = cli_tx.text(text.clone().as_str()).await { error!( - proxy = P::name(), + proxy = this.shared.proxy_name, connection_id = %info.connection_id(), worker_id = %this.id, error = ?err, @@ -841,7 +841,7 @@ where tungstenite::Message::Ping(bytes) => { if let Err(err) = bck_tx.send(tungstenite::Message::Pong(bytes)).await { error!( - proxy = P::name(), + proxy = this.shared.proxy_name, connection_id = %info.connection_id(), worker_id = %this.id, error = ?err, @@ -873,7 +873,7 @@ where return Ok(()); } warn!( - proxy = P::name(), + proxy = this.shared.proxy_name, connection_id = %info.connection_id(), worker_id = %this.id, "Unexpected websocket pong received from backend", @@ -892,7 +892,7 @@ where .await { error!( - proxy = P::name(), + proxy = this.shared.proxy_name, connection_id = %info.connection_id(), worker_id = %this.id, error = ?err, @@ -909,7 +909,7 @@ where Some(Err(err)) => { error!( - proxy = P::name(), + proxy = this.shared.proxy_name, connection_id = %info.connection_id(), worker_id = %this.id, error = ?err, @@ -920,7 +920,7 @@ where None => { info!( - proxy = P::name(), + proxy = this.shared.proxy_name, connection_id = %info.connection_id(), worker_id = %this.id, "Backend had closed websocket stream" @@ -937,12 +937,17 @@ where proxy_name: &'static str, worker_id: Uuid, ) { - Self::maybe_log_proxied_message(&msg, inner.clone(), worker_id); + Self::maybe_log_proxied_message(&msg, inner.clone(), proxy_name, worker_id); Self::emit_metrics_on_proxy_success(&msg, metrics.clone(), proxy_name); } - fn maybe_log_proxied_message(msg: &ProxyWsMessage, inner: Arc

, worker_id: Uuid) { + fn maybe_log_proxied_message( + msg: &ProxyWsMessage, + inner: Arc

, + proxy_name: &'static str, + worker_id: Uuid, + ) { let config = inner.config(); match msg { @@ -957,7 +962,7 @@ where }; info!( - proxy = P::name(), + proxy = proxy_name, connection_id = %info.connection_id(), worker_id = %worker_id, remote_addr = info.remote_addr(), @@ -979,7 +984,7 @@ where }; info!( - proxy = P::name(), + proxy = proxy_name, connection_id = %info.connection_id(), worker_id = %worker_id, remote_addr = info.remote_addr(), @@ -1001,7 +1006,7 @@ where }; info!( - proxy = P::name(), + proxy = proxy_name, connection_id = %info.connection_id(), worker_id = %worker_id, remote_addr = info.remote_addr(), @@ -1023,7 +1028,7 @@ where }; info!( - proxy = P::name(), + proxy = proxy_name, connection_id = %info.connection_id(), worker_id = %worker_id, remote_addr = info.remote_addr(), @@ -1156,7 +1161,7 @@ where P: ProxyWsInner, { worker_id: Uuid, - + proxy_name: &'static str, url: tungstenite::http::Uri, _config: PhantomData, @@ -1168,8 +1173,8 @@ where C: ConfigProxyWs, P: ProxyWsInner, { - fn new(worker_id: Uuid, url: tungstenite::http::Uri) -> Self { - Self { worker_id, url, _config: PhantomData, _inner: PhantomData } + fn new(worker_id: Uuid, proxy_name: &'static str, url: tungstenite::http::Uri) -> Self { + Self { worker_id, proxy_name, url, _config: PhantomData, _inner: PhantomData } } fn new_backend_uri(&self, info: &ProxyHttpRequestInfo) -> tungstenite::http::Uri { @@ -1177,7 +1182,7 @@ where let pq = tungstenite::http::uri::PathAndQuery::from_str(info.path_and_query()) .inspect_err(|err| { error!( - proxy = P::name(), + proxy = self.proxy_name, request_id = %info.id(), connection_id = %info.connection_id(), worker_id = %self.worker_id, @@ -1191,7 +1196,7 @@ where tungstenite::http::Uri::from_parts(parts) .inspect_err(|err| { error!( - proxy = P::name(), + proxy = self.proxy_name, request_id = %info.id(), connection_id = %info.connection_id(), worker_id = %self.worker_id,