Skip to content

Commit 842df41

Browse files
Initial support for operations on Shovels
1 parent 20d7fb0 commit 842df41

12 files changed

+461
-24
lines changed

CHANGELOG.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,16 @@
22

33
## v0.23.0 (in development)
44

5-
No (documented) changes yet.
5+
### Breaking Changes
6+
7+
* `RuntimeParameterDefinition#name`, `RuntimeParameterDefinition#vhost`, and `RuntimeParameterDefinition#component` types changed from `String` to `&str`
8+
9+
### Enhancements
10+
11+
* `Client#declare_amqp091_shovel` is a new function that declares a dynamic shovel
12+
where both source and destination use AMQP 0-9-1
13+
14+
* `Client#delete_shovel` is a new function for deleting shovels
615

716

817
## v0.22.0 (Feb 8, 2025)

src/api.rs

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use std::fmt;
2424

2525
use crate::error::Error;
2626
use crate::error::Error::{ClientErrorResponse, NotFound, ServerErrorResponse};
27-
use crate::requests::{EmptyPayload, StreamParams};
27+
use crate::requests::{Amqp091ShovelParams, EmptyPayload, StreamParams, SHOVEL_COMPONENT};
2828
use crate::responses::{
2929
DeprecatedFeatureList, FeatureFlag, FeatureFlagList, FeatureFlagStability, FeatureFlagState,
3030
GetMessage, OAuthConfiguration, SchemaDefinitionSyncStatus, VirtualHostDefinitionSet,
@@ -993,7 +993,10 @@ where
993993
Ok(response)
994994
}
995995

996-
pub async fn upsert_runtime_parameter(&self, param: &RuntimeParameterDefinition) -> Result<()> {
996+
pub async fn upsert_runtime_parameter<'a>(
997+
&self,
998+
param: &'a RuntimeParameterDefinition<'a>,
999+
) -> Result<()> {
9971000
let _response = self
9981001
.http_put(
9991002
path!("parameters", param.component, param.vhost, param.name),
@@ -1364,12 +1367,43 @@ where
13641367
// Shovels
13651368
//
13661369

1367-
pub async fn list_shovels(&self) -> crate::blocking_api::Result<Vec<responses::Shovel>> {
1370+
pub async fn list_shovels(&self) -> Result<Vec<responses::Shovel>> {
13681371
let response = self.http_get("shovels", None, None).await?;
13691372
let response = response.json().await?;
13701373
Ok(response)
13711374
}
13721375

1376+
pub async fn declare_amqp091_shovel(&self, params: Amqp091ShovelParams<'_>) -> Result<()> {
1377+
let runtime_param = RuntimeParameterDefinition::from(params);
1378+
1379+
let _response = self
1380+
.http_put(
1381+
path!(
1382+
"parameters",
1383+
SHOVEL_COMPONENT,
1384+
runtime_param.vhost,
1385+
runtime_param.name
1386+
),
1387+
&runtime_param,
1388+
None,
1389+
None,
1390+
)
1391+
.await?;
1392+
Ok(())
1393+
}
1394+
1395+
pub async fn delete_shovel(&self, vhost: &str, name: &str, idempotently: bool) -> Result<()> {
1396+
let excludes = if idempotently {
1397+
Some(StatusCode::NOT_FOUND)
1398+
} else {
1399+
None
1400+
};
1401+
let _response = self
1402+
.http_delete(path!("shovels", "vhost", vhost, name), excludes, None)
1403+
.await?;
1404+
Ok(())
1405+
}
1406+
13731407
//
13741408
// Publish and consume messages
13751409
//

src/blocking_api.rs

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
use crate::error::Error;
1717
use crate::error::Error::{ClientErrorResponse, NotFound, ServerErrorResponse};
18-
use crate::requests::{EmptyPayload, StreamParams};
18+
use crate::requests::{Amqp091ShovelParams, EmptyPayload, StreamParams, SHOVEL_COMPONENT};
1919
use crate::responses::{
2020
DeprecatedFeatureList, FeatureFlag, FeatureFlagList, FeatureFlagStability, FeatureFlagState,
2121
GetMessage, OAuthConfiguration, VirtualHostDefinitionSet, WarmStandbyReplicationStatus,
@@ -879,7 +879,10 @@ where
879879
Ok(response)
880880
}
881881

882-
pub fn upsert_runtime_parameter(&self, param: &RuntimeParameterDefinition) -> Result<()> {
882+
pub fn upsert_runtime_parameter<'a>(
883+
&self,
884+
param: &'a RuntimeParameterDefinition<'a>,
885+
) -> Result<()> {
883886
let _response = self.http_put(
884887
path!("parameters", param.component, param.vhost, param.name),
885888
&param,
@@ -1203,6 +1206,33 @@ where
12031206
Ok(response)
12041207
}
12051208

1209+
pub fn declare_amqp091_shovel(&self, params: Amqp091ShovelParams<'_>) -> Result<()> {
1210+
let runtime_param = RuntimeParameterDefinition::from(params);
1211+
1212+
let _response = self.http_put(
1213+
path!(
1214+
"parameters",
1215+
SHOVEL_COMPONENT,
1216+
runtime_param.vhost,
1217+
runtime_param.name
1218+
),
1219+
&runtime_param,
1220+
None,
1221+
None,
1222+
)?;
1223+
Ok(())
1224+
}
1225+
1226+
pub fn delete_shovel(&self, vhost: &str, name: &str, idempotently: bool) -> Result<()> {
1227+
let excludes = if idempotently {
1228+
Some(StatusCode::NOT_FOUND)
1229+
} else {
1230+
None
1231+
};
1232+
let _response = self.http_delete(path!("shovels", "vhost", vhost, name), excludes, None)?;
1233+
Ok(())
1234+
}
1235+
12061236
//
12071237
// Publish and consume messages
12081238
//

src/commons.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -577,3 +577,36 @@ impl From<UserLimitTarget> for String {
577577
value.as_ref().to_string()
578578
}
579579
}
580+
581+
#[derive(Default, Debug, Deserialize, Serialize, Clone, PartialEq, Eq)]
582+
pub enum ShovelAcknowledgementMode {
583+
#[serde(rename = "no-ack")]
584+
Immediate,
585+
#[serde(rename = "on-publish")]
586+
WhenPublished,
587+
#[default]
588+
#[serde(rename = "on-confirm")]
589+
WhenConfirmed,
590+
}
591+
592+
impl From<&str> for ShovelAcknowledgementMode {
593+
fn from(value: &str) -> Self {
594+
match value {
595+
"no-ack" => ShovelAcknowledgementMode::Immediate,
596+
"on-publish" => ShovelAcknowledgementMode::WhenPublished,
597+
"on-confirm" => ShovelAcknowledgementMode::WhenConfirmed,
598+
_ => ShovelAcknowledgementMode::default(),
599+
}
600+
}
601+
}
602+
603+
impl From<String> for ShovelAcknowledgementMode {
604+
fn from(value: String) -> Self {
605+
match value.as_str() {
606+
"no-ack" => ShovelAcknowledgementMode::Immediate,
607+
"on-publish" => ShovelAcknowledgementMode::WhenPublished,
608+
"on-confirm" => ShovelAcknowledgementMode::WhenConfirmed,
609+
_ => ShovelAcknowledgementMode::default(),
610+
}
611+
}
612+
}

src/requests.rs

Lines changed: 135 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
14-
use crate::commons::{ExchangeType, PolicyTarget, QueueType};
14+
use crate::commons::{ExchangeType, PolicyTarget, QueueType, ShovelAcknowledgementMode};
1515
use serde::{Deserialize, Serialize};
1616
use serde_json::{json, Map, Value};
1717

@@ -373,10 +373,10 @@ pub type RuntimeParameterValue = Map<String, Value>;
373373

374374
/// Represents a [runtime parameter](https://rabbitmq.com/docs/parameters/).
375375
#[derive(Serialize, Deserialize)]
376-
pub struct RuntimeParameterDefinition {
377-
pub name: String,
378-
pub vhost: String,
379-
pub component: String,
376+
pub struct RuntimeParameterDefinition<'a> {
377+
pub name: &'a str,
378+
pub vhost: &'a str,
379+
pub component: &'a str,
380380
pub value: RuntimeParameterValue,
381381
}
382382

@@ -404,6 +404,136 @@ pub struct Permissions<'a> {
404404
pub write: &'a str,
405405
}
406406

407+
pub(crate) const SHOVEL_COMPONENT: &str = "shovel";
408+
409+
/// Represents a dynamic shovel definition.
410+
#[derive(Serialize)]
411+
pub struct Amqp091ShovelParams<'a> {
412+
pub name: &'a str,
413+
pub vhost: &'a str,
414+
415+
pub acknowledgement_mode: ShovelAcknowledgementMode,
416+
pub reconnect_delay: Option<u16>,
417+
418+
pub source: Amqp091ShovelSourceParams<'a>,
419+
pub destination: Amqp091ShovelDestinationParams<'a>,
420+
}
421+
422+
impl<'a> From<Amqp091ShovelParams<'a>> for RuntimeParameterDefinition<'a> {
423+
fn from(params: Amqp091ShovelParams<'a>) -> Self {
424+
let mut value = Map::new();
425+
426+
value.insert("src-uri".to_owned(), json!(params.source.source_uri));
427+
if let Some(sq) = params.source.source_queue {
428+
value.insert("src-queue".to_owned(), json!(sq));
429+
}
430+
if let Some(sx) = params.source.source_exchange {
431+
value.insert("src-exchange".to_owned(), json!(sx));
432+
}
433+
if let Some(sxrk) = params.source.source_exchange_routing_key {
434+
value.insert("src-exchange-key".to_owned(), json!(sxrk));
435+
}
436+
437+
value.insert(
438+
"dest-uri".to_owned(),
439+
json!(params.destination.destination_uri),
440+
);
441+
value.insert("ack-mode".to_owned(), json!(params.acknowledgement_mode));
442+
443+
if let Some(dq) = params.destination.destination_queue {
444+
value.insert("dest-queue".to_owned(), json!(dq));
445+
}
446+
if let Some(dx) = params.destination.destination_exchange {
447+
value.insert("dest-exchange".to_owned(), json!(dx));
448+
}
449+
if let Some(dxrk) = params.destination.destination_exchange_routing_key {
450+
value.insert("dest-exchange-key".to_owned(), json!(dxrk));
451+
}
452+
453+
if let Some(val) = params.reconnect_delay {
454+
value.insert("reconnect-delay".to_owned(), json!(val));
455+
}
456+
457+
Self {
458+
name: params.name,
459+
vhost: params.vhost,
460+
component: SHOVEL_COMPONENT,
461+
value,
462+
}
463+
}
464+
}
465+
466+
#[derive(Serialize)]
467+
pub struct Amqp091ShovelSourceParams<'a> {
468+
pub source_uri: &'a str,
469+
470+
pub source_queue: Option<&'a str>,
471+
472+
pub source_exchange: Option<&'a str>,
473+
pub source_exchange_routing_key: Option<&'a str>,
474+
}
475+
476+
impl<'a> Amqp091ShovelSourceParams<'a> {
477+
pub fn queue_source(source_uri: &'a str, source_queue: &'a str) -> Self {
478+
Self {
479+
source_uri,
480+
source_queue: Some(source_queue),
481+
482+
source_exchange: None,
483+
source_exchange_routing_key: None,
484+
}
485+
}
486+
487+
pub fn exchange_source(
488+
source_uri: &'a str,
489+
source_exchange: &'a str,
490+
source_exchange_routing_key: Option<&'a str>,
491+
) -> Self {
492+
Self {
493+
source_uri,
494+
source_exchange: Some(source_exchange),
495+
source_exchange_routing_key,
496+
497+
source_queue: None,
498+
}
499+
}
500+
}
501+
502+
#[derive(Serialize)]
503+
pub struct Amqp091ShovelDestinationParams<'a> {
504+
pub destination_uri: &'a str,
505+
506+
pub destination_queue: Option<&'a str>,
507+
pub destination_exchange: Option<&'a str>,
508+
pub destination_exchange_routing_key: Option<&'a str>,
509+
}
510+
511+
impl<'a> Amqp091ShovelDestinationParams<'a> {
512+
pub fn queue_destination(destination_uri: &'a str, destination_queue: &'a str) -> Self {
513+
Self {
514+
destination_uri,
515+
destination_queue: Some(destination_queue),
516+
517+
destination_exchange: None,
518+
destination_exchange_routing_key: None,
519+
}
520+
}
521+
522+
pub fn exchange_destination(
523+
destination_uri: &'a str,
524+
destination_exchange: &'a str,
525+
destination_exchange_routing_key: Option<&'a str>,
526+
) -> Self {
527+
Self {
528+
destination_uri,
529+
destination_exchange: Some(destination_exchange),
530+
destination_exchange_routing_key,
531+
532+
destination_queue: None,
533+
}
534+
}
535+
}
536+
407537
pub type MessageProperties = Map<String, Value>;
408538

409539
#[derive(Serialize, Default)]

tests/async_consumer_tests.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,6 @@ async fn test_async_list_vhost_consumers() {
3636

3737
let result2 = rc.list_consumers_in(vh_params.name).await;
3838
assert!(result2.is_ok(), "list_consumers_in returned {:?}", result2);
39+
40+
rc.delete_vhost(&vh_params.name, true).await.unwrap();
3941
}

0 commit comments

Comments
 (0)