Skip to content

Commit 8592212

Browse files
Introduce a function for declaring an AMQP 1.0 shovel
1 parent b77234d commit 8592212

File tree

7 files changed

+225
-5
lines changed

7 files changed

+225
-5
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@
44

55
### Enhancements
66

7+
* `Client#declare_amqp10_shovel` is a new function that declares a dynamic shovel
8+
where both source and destination use AMQP 1.0
9+
10+
11+
712
* Both `Amqp091ShovelSourceParams` and `Amqp091ShovelDestinationParams` now support a new boolean option, `predeclared`,
813
that enables [either or both sides to rely on a pre-declared topology](https://www.rabbitmq.com/docs/shovel-dynamic#predeclared-topology)
914

src/api.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ use std::fmt;
2424

2525
use crate::error::Error;
2626
use crate::error::Error::{ClientErrorResponse, NotFound, ServerErrorResponse};
27-
use crate::requests::{Amqp091ShovelParams, EmptyPayload, StreamParams, SHOVEL_COMPONENT};
27+
use crate::requests::{
28+
Amqp091ShovelParams, Amqp10ShovelParams, EmptyPayload, StreamParams, SHOVEL_COMPONENT,
29+
};
2830
use crate::responses::{
2931
DeprecatedFeatureList, FeatureFlag, FeatureFlagList, FeatureFlagStability, FeatureFlagState,
3032
GetMessage, OAuthConfiguration, SchemaDefinitionSyncStatus, VirtualHostDefinitionSet,
@@ -1392,6 +1394,25 @@ where
13921394
Ok(())
13931395
}
13941396

1397+
pub async fn declare_amqp10_shovel(&self, params: Amqp10ShovelParams<'_>) -> Result<()> {
1398+
let runtime_param = RuntimeParameterDefinition::from(params);
1399+
1400+
let _response = self
1401+
.http_put(
1402+
path!(
1403+
"parameters",
1404+
SHOVEL_COMPONENT,
1405+
runtime_param.vhost,
1406+
runtime_param.name
1407+
),
1408+
&runtime_param,
1409+
None,
1410+
None,
1411+
)
1412+
.await?;
1413+
Ok(())
1414+
}
1415+
13951416
pub async fn delete_shovel(&self, vhost: &str, name: &str, idempotently: bool) -> Result<()> {
13961417
let excludes = if idempotently {
13971418
Some(StatusCode::NOT_FOUND)

src/blocking_api.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515

1616
use crate::error::Error;
1717
use crate::error::Error::{ClientErrorResponse, NotFound, ServerErrorResponse};
18-
use crate::requests::{Amqp091ShovelParams, EmptyPayload, StreamParams, SHOVEL_COMPONENT};
18+
use crate::requests::{
19+
Amqp091ShovelParams, Amqp10ShovelParams, EmptyPayload, StreamParams, SHOVEL_COMPONENT,
20+
};
1921
use crate::responses::{
2022
DeprecatedFeatureList, FeatureFlag, FeatureFlagList, FeatureFlagStability, FeatureFlagState,
2123
GetMessage, OAuthConfiguration, VirtualHostDefinitionSet, WarmStandbyReplicationStatus,
@@ -1223,6 +1225,23 @@ where
12231225
Ok(())
12241226
}
12251227

1228+
pub fn declare_amqp10_shovel(&self, params: Amqp10ShovelParams<'_>) -> Result<()> {
1229+
let runtime_param = RuntimeParameterDefinition::from(params);
1230+
1231+
let _response = self.http_put(
1232+
path!(
1233+
"parameters",
1234+
SHOVEL_COMPONENT,
1235+
runtime_param.vhost,
1236+
runtime_param.name
1237+
),
1238+
&runtime_param,
1239+
None,
1240+
None,
1241+
)?;
1242+
Ok(())
1243+
}
1244+
12261245
pub fn delete_shovel(&self, vhost: &str, name: &str, idempotently: bool) -> Result<()> {
12271246
let excludes = if idempotently {
12281247
Some(StatusCode::NOT_FOUND)

src/requests.rs

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,7 @@ pub struct Permissions<'a> {
406406

407407
pub(crate) const SHOVEL_COMPONENT: &str = "shovel";
408408

409-
/// Represents a dynamic shovel definition.
409+
/// Represents a dynamic AMQP 0-9-1 shovel definition.
410410
#[derive(Serialize)]
411411
pub struct Amqp091ShovelParams<'a> {
412412
pub name: &'a str,
@@ -423,6 +423,9 @@ impl<'a> From<Amqp091ShovelParams<'a>> for RuntimeParameterDefinition<'a> {
423423
fn from(params: Amqp091ShovelParams<'a>) -> Self {
424424
let mut value = Map::new();
425425

426+
value.insert("src-protocol".to_owned(), json!("amqp091"));
427+
value.insert("dest-protocol".to_owned(), json!("amqp091"));
428+
426429
value.insert("src-uri".to_owned(), json!(params.source.source_uri));
427430
if let Some(sq) = params.source.source_queue {
428431
value.insert("src-queue".to_owned(), json!(sq));
@@ -614,6 +617,85 @@ impl<'a> Amqp091ShovelDestinationParams<'a> {
614617
}
615618
}
616619

620+
/// Represents a dynamic shovel definition.
621+
#[derive(Serialize)]
622+
pub struct Amqp10ShovelParams<'a> {
623+
pub name: &'a str,
624+
pub vhost: &'a str,
625+
626+
pub acknowledgement_mode: ShovelAcknowledgementMode,
627+
pub reconnect_delay: Option<u16>,
628+
629+
pub source: Amqp10ShovelSourceParams<'a>,
630+
pub destination: Amqp10ShovelDestinationParams<'a>,
631+
}
632+
633+
#[derive(Serialize)]
634+
pub struct Amqp10ShovelSourceParams<'a> {
635+
pub source_uri: &'a str,
636+
pub source_address: &'a str,
637+
}
638+
639+
impl<'a> Amqp10ShovelSourceParams<'a> {
640+
pub fn new(uri: &'a str, address: &'a str) -> Self {
641+
Self {
642+
source_uri: uri,
643+
source_address: address,
644+
}
645+
}
646+
}
647+
648+
impl<'a> From<Amqp10ShovelParams<'a>> for RuntimeParameterDefinition<'a> {
649+
fn from(params: Amqp10ShovelParams<'a>) -> Self {
650+
let mut value = Map::new();
651+
652+
value.insert("src-protocol".to_owned(), json!("amqp10"));
653+
value.insert("dest-protocol".to_owned(), json!("amqp10"));
654+
655+
value.insert("src-uri".to_owned(), json!(params.source.source_uri));
656+
value.insert(
657+
"src-address".to_owned(),
658+
json!(params.source.source_address),
659+
);
660+
661+
value.insert(
662+
"dest-uri".to_owned(),
663+
json!(params.destination.destination_uri),
664+
);
665+
value.insert(
666+
"dest-address".to_owned(),
667+
json!(params.destination.destination_address),
668+
);
669+
670+
value.insert("ack-mode".to_owned(), json!(params.acknowledgement_mode));
671+
if let Some(val) = params.reconnect_delay {
672+
value.insert("reconnect-delay".to_owned(), json!(val));
673+
}
674+
675+
Self {
676+
name: params.name,
677+
vhost: params.vhost,
678+
component: SHOVEL_COMPONENT,
679+
value,
680+
}
681+
}
682+
}
683+
684+
#[derive(Serialize)]
685+
pub struct Amqp10ShovelDestinationParams<'a> {
686+
pub destination_uri: &'a str,
687+
pub destination_address: &'a str,
688+
}
689+
690+
impl<'a> Amqp10ShovelDestinationParams<'a> {
691+
pub fn new(uri: &'a str, address: &'a str) -> Self {
692+
Self {
693+
destination_uri: uri,
694+
destination_address: address,
695+
}
696+
}
697+
}
698+
617699
pub type MessageProperties = Map<String, Value>;
618700

619701
#[derive(Serialize, Default)]

src/responses.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1612,6 +1612,13 @@ pub struct Shovel {
16121612
#[cfg_attr(feature = "tabled", tabled(display = "display_option"))]
16131613
pub destination: Option<String>,
16141614

1615+
#[serde(rename = "src_address")]
1616+
#[cfg_attr(feature = "tabled", tabled(display = "display_option"))]
1617+
pub source_address: Option<String>,
1618+
#[serde(rename = "dest_address")]
1619+
#[cfg_attr(feature = "tabled", tabled(display = "display_option"))]
1620+
pub destination_address: Option<String>,
1621+
16151622
#[serde(rename = "src_protocol")]
16161623
#[cfg_attr(feature = "tabled", tabled(display = "display_option"))]
16171624
pub source_protocol: Option<MessagingProtocol>,

tests/async_dynamic_shovel_tests.rs

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
// limitations under the License.
1414
use rabbitmq_http_client::commons::ShovelAcknowledgementMode;
1515
use rabbitmq_http_client::requests::{
16-
Amqp091ShovelDestinationParams, Amqp091ShovelParams, Amqp091ShovelSourceParams, QueueParams,
16+
Amqp091ShovelDestinationParams, Amqp091ShovelParams, Amqp091ShovelSourceParams,
17+
Amqp10ShovelDestinationParams, Amqp10ShovelParams, Amqp10ShovelSourceParams, QueueParams,
1718
};
1819
use rabbitmq_http_client::{api::Client, requests::VirtualHostParams};
1920

@@ -56,6 +57,48 @@ async fn test_async_declare_a_dynamic_amqp091_shovel() {
5657
let _ = rc.delete_vhost(vh_params.name, false).await;
5758
}
5859

60+
#[tokio::test]
61+
async fn test_async_declare_a_dynamic_amqp10_shovel() {
62+
let endpoint = endpoint();
63+
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
64+
65+
let vh = "rust.http.api.async.test_async_declare_a_dynamic_amqp10_shovel";
66+
let sh = "test_async_declare_a_dynamic_amqp10_shovel";
67+
68+
let vh_params = VirtualHostParams::named(vh);
69+
let result1 = rc.create_vhost(&vh_params).await;
70+
assert!(result1.is_ok());
71+
72+
// note: 4.1.0 will use a different addressing scheme,
73+
// see https://www.rabbitmq.com/docs/next/amqp#address-v2
74+
let src_queue = format!("{}.src.q", sh);
75+
let src_address = format!("/queue/{}.src.q", sh);
76+
let dest_queue = format!("{}.dest.q", sh);
77+
let dest_address = format!("/queue/{}.dest.q", sh);
78+
79+
let src_params = QueueParams::new_durable_classic_queue(&src_queue, None);
80+
let result2 = rc.declare_queue(vh, &src_params).await;
81+
assert!(result2.is_ok());
82+
83+
let dest_params = QueueParams::new_durable_classic_queue(&dest_queue, None);
84+
let result3 = rc.declare_queue(vh, &dest_params).await;
85+
assert!(result3.is_ok());
86+
87+
let amqp_endpoint = amqp_endpoint_with_vhost(&vh);
88+
let shovel_params = Amqp10ShovelParams {
89+
vhost: &vh,
90+
name: sh,
91+
acknowledgement_mode: ShovelAcknowledgementMode::WhenConfirmed,
92+
reconnect_delay: Some(5),
93+
source: Amqp10ShovelSourceParams::new(&amqp_endpoint, &src_address),
94+
destination: Amqp10ShovelDestinationParams::new(&amqp_endpoint, &dest_address),
95+
};
96+
let result4 = rc.declare_amqp10_shovel(shovel_params).await;
97+
assert!(result4.is_ok());
98+
99+
let _ = rc.delete_vhost(vh_params.name, false).await;
100+
}
101+
59102
#[tokio::test]
60103
async fn test_async_declare_a_dynamic_amqp091_shovel_with_predeclared_source_topology() {
61104
let endpoint = endpoint();

tests/blocking_dynamic_shovel_tests.rs

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
// limitations under the License.
1414
use rabbitmq_http_client::commons::ShovelAcknowledgementMode;
1515
use rabbitmq_http_client::requests::{
16-
Amqp091ShovelDestinationParams, Amqp091ShovelParams, Amqp091ShovelSourceParams, QueueParams,
16+
Amqp091ShovelDestinationParams, Amqp091ShovelParams, Amqp091ShovelSourceParams,
17+
Amqp10ShovelDestinationParams, Amqp10ShovelParams, Amqp10ShovelSourceParams, QueueParams,
1718
};
1819
use rabbitmq_http_client::{blocking_api::Client, requests::VirtualHostParams};
1920

@@ -56,6 +57,48 @@ fn test_blocking_declare_a_dynamic_amqp091_shovel() {
5657
let _ = rc.delete_vhost(vh_params.name, false);
5758
}
5859

60+
#[test]
61+
fn test_blocking_declare_a_dynamic_amqp10_shovel() {
62+
let endpoint = endpoint();
63+
let rc = Client::new(&endpoint, USERNAME, PASSWORD);
64+
65+
let vh = "rust.http.api.blocking.test_blocking_declare_a_dynamic_amqp10_shovel";
66+
let sh = "test_async_declare_a_dynamic_amqp10_shovel";
67+
68+
let vh_params = VirtualHostParams::named(vh);
69+
let result1 = rc.create_vhost(&vh_params);
70+
assert!(result1.is_ok());
71+
72+
// note: 4.1.0 will use a different addressing scheme,
73+
// see https://www.rabbitmq.com/docs/next/amqp#address-v2
74+
let src_queue = format!("{}.src.q", sh);
75+
let src_address = format!("/queue/{}.src.q", sh);
76+
let dest_queue = format!("{}.dest.q", sh);
77+
let dest_address = format!("/queue/{}.dest.q", sh);
78+
79+
let src_params = QueueParams::new_durable_classic_queue(&src_queue, None);
80+
let result2 = rc.declare_queue(vh, &src_params);
81+
assert!(result2.is_ok());
82+
83+
let dest_params = QueueParams::new_durable_classic_queue(&dest_queue, None);
84+
let result3 = rc.declare_queue(vh, &dest_params);
85+
assert!(result3.is_ok());
86+
87+
let amqp_endpoint = amqp_endpoint_with_vhost(&vh);
88+
let shovel_params = Amqp10ShovelParams {
89+
vhost: &vh,
90+
name: sh,
91+
acknowledgement_mode: ShovelAcknowledgementMode::WhenConfirmed,
92+
reconnect_delay: Some(5),
93+
source: Amqp10ShovelSourceParams::new(&amqp_endpoint, &src_address),
94+
destination: Amqp10ShovelDestinationParams::new(&amqp_endpoint, &dest_address),
95+
};
96+
let result4 = rc.declare_amqp10_shovel(shovel_params);
97+
assert!(result4.is_ok());
98+
99+
let _ = rc.delete_vhost(vh_params.name, false);
100+
}
101+
59102
#[test]
60103
fn test_blocking_declare_a_dynamic_amqp091_shovel_with_predeclared_source_topology() {
61104
let endpoint = endpoint();

0 commit comments

Comments
 (0)