From 1bc206b7db3463d93ac1ece1b2055ebe593fea37 Mon Sep 17 00:00:00 2001 From: ripark Date: Thu, 13 Feb 2025 02:55:22 +0000 Subject: [PATCH] Making it possible to set the custom endpoint from the outside of the client (producer only) --- sdk/core/azure_core_amqp/src/connection.rs | 2 ++ sdk/core/azure_core_amqp/src/fe2o3/connection.rs | 14 +++++++++++++- .../azure_messaging_eventhubs/src/producer/mod.rs | 8 ++++++++ 3 files changed, 23 insertions(+), 1 deletion(-) diff --git a/sdk/core/azure_core_amqp/src/connection.rs b/sdk/core/azure_core_amqp/src/connection.rs index a106b11690..74dff4dced 100644 --- a/sdk/core/azure_core_amqp/src/connection.rs +++ b/sdk/core/azure_core_amqp/src/connection.rs @@ -24,6 +24,7 @@ pub struct AmqpConnectionOptions { pub desired_capabilities: Option>, pub properties: Option>, pub buffer_size: Option, + pub custom_endpoint: Option, } impl AmqpConnectionOptions {} @@ -206,6 +207,7 @@ mod tests { incoming_locales: Some(vec!["en-US".to_string()]), offered_capabilities: Some(vec!["capability".into()]), desired_capabilities: Some(vec!["capability".into()]), + custom_endpoint: None, properties: Some( vec![("key", "value")] .into_iter() diff --git a/sdk/core/azure_core_amqp/src/fe2o3/connection.rs b/sdk/core/azure_core_amqp/src/fe2o3/connection.rs index d0655f6959..1667c48364 100644 --- a/sdk/core/azure_core_amqp/src/fe2o3/connection.rs +++ b/sdk/core/azure_core_amqp/src/fe2o3/connection.rs @@ -51,7 +51,14 @@ impl AmqpConnectionApis for Fe2o3AmqpConnection { .container_id(id) .max_frame_size(65536); + let mut endpoint = url.clone(); + if let Some(options) = options { + if let Some(custom_endpoint) = options.custom_endpoint { + endpoint = custom_endpoint.clone(); + builder = builder.hostname(url.host_str().unwrap()); + } + if let Some(frame_size) = options.max_frame_size { builder = builder.max_frame_size(frame_size); } @@ -101,7 +108,12 @@ impl AmqpConnectionApis for Fe2o3AmqpConnection { } } self.connection - .set(Mutex::new(builder.open(url).await.map_err(AmqpOpen::from)?)) + .set(Mutex::new( + builder + .open(endpoint.clone()) + .await + .map_err(AmqpOpen::from)?, + )) .map_err(|_| { azure_core::Error::new( azure_core::error::ErrorKind::Other, diff --git a/sdk/eventhubs/azure_messaging_eventhubs/src/producer/mod.rs b/sdk/eventhubs/azure_messaging_eventhubs/src/producer/mod.rs index 22c3600f87..f2854ccbfc 100644 --- a/sdk/eventhubs/azure_messaging_eventhubs/src/producer/mod.rs +++ b/sdk/eventhubs/azure_messaging_eventhubs/src/producer/mod.rs @@ -47,6 +47,9 @@ pub struct ProducerClientOptions { /// The maximum size of a message that can be sent to the Event Hub. pub max_message_size: Option, + + /// A custom endpoint to connect to. Use this to connect through a TCP proxy. + pub custom_endpoint: Option, } impl ProducerClientOptions {} @@ -367,6 +370,7 @@ impl ProducerClient { async fn ensure_connection(&self, url: &str) -> Result<()> { if self.connection.get().is_none() { let connection = AmqpConnection::new(); + connection .open( self.options @@ -386,6 +390,10 @@ impl ProducerClient { .map(|(k, v)| (AmqpSymbol::from(k), AmqpValue::from(v))) .collect(), ), + custom_endpoint: Some(Url::parse( + // TODO: yeah...probably not right. + self.options.custom_endpoint.as_ref().unwrap().as_str(), + )?), ..Default::default() }), )