Skip to content

Commit b9fd820

Browse files
Improve responses::Shovel and requests::RuntimeParameterDefinition convertability
1 parent 492eab7 commit b9fd820

File tree

3 files changed

+1123
-0
lines changed

3 files changed

+1123
-0
lines changed

src/requests/shovels.rs

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
// limitations under the License.
1414

1515
use crate::commons::MessageTransferAcknowledgementMode;
16+
use crate::error::ConversionError;
1617
use crate::requests::parameters::RuntimeParameterDefinition;
18+
use crate::responses::{RuntimeParameter, Shovel};
1719
use serde::Serialize;
1820
use serde_json::{Map, Value, json};
1921

@@ -368,3 +370,234 @@ impl<'a> Amqp10ShovelDestinationParams<'a> {
368370
/// Used to specify custom properties that should be applied to messages
369371
/// when they are re-published by shovels.
370372
pub type MessageProperties = Map<String, Value>;
373+
374+
/// A helper type used for conversion between shovels and runtime parameters
375+
/// (the end goal is usually modifying a shovel).
376+
#[derive(Default, Debug, Serialize, Clone, PartialEq, Eq)]
377+
pub struct OwnedShovelParams {
378+
pub name: String,
379+
pub vhost: String,
380+
pub source_protocol: String,
381+
pub destination_protocol: String,
382+
pub acknowledgement_mode: MessageTransferAcknowledgementMode,
383+
pub reconnect_delay: Option<u32>,
384+
385+
pub source_uri: String,
386+
pub source_queue: Option<String>,
387+
pub source_exchange: Option<String>,
388+
pub source_exchange_routing_key: Option<String>,
389+
pub source_address: Option<String>,
390+
pub source_predeclared: Option<bool>,
391+
392+
pub destination_uri: String,
393+
pub destination_queue: Option<String>,
394+
pub destination_exchange: Option<String>,
395+
pub destination_exchange_routing_key: Option<String>,
396+
pub destination_address: Option<String>,
397+
pub destination_predeclared: Option<bool>,
398+
}
399+
400+
impl TryFrom<RuntimeParameter> for OwnedShovelParams {
401+
type Error = ConversionError;
402+
403+
fn try_from(param: RuntimeParameter) -> Result<Self, Self::Error> {
404+
let values = &param.value.0;
405+
406+
let source_protocol = values
407+
.get("src-protocol")
408+
.and_then(|v| v.as_str())
409+
.ok_or_else(|| ConversionError::MissingProperty {
410+
argument: "src-protocol".to_string(),
411+
})?
412+
.to_string();
413+
414+
let destination_protocol = values
415+
.get("dest-protocol")
416+
.and_then(|v| v.as_str())
417+
.ok_or_else(|| ConversionError::MissingProperty {
418+
argument: "dest-protocol".to_string(),
419+
})?
420+
.to_string();
421+
422+
let source_uri = values
423+
.get("src-uri")
424+
.and_then(|v| v.as_str())
425+
.ok_or_else(|| ConversionError::MissingProperty {
426+
argument: "src-uri".to_string(),
427+
})?
428+
.to_string();
429+
430+
let destination_uri = values
431+
.get("dest-uri")
432+
.and_then(|v| v.as_str())
433+
.ok_or_else(|| ConversionError::MissingProperty {
434+
argument: "dest-uri".to_string(),
435+
})?
436+
.to_string();
437+
438+
let acknowledgement_mode = values
439+
.get("ack-mode")
440+
.and_then(|v| v.as_str())
441+
.map(MessageTransferAcknowledgementMode::from)
442+
.unwrap_or_default();
443+
444+
let reconnect_delay = values
445+
.get("reconnect-delay")
446+
.and_then(|v| v.as_u64())
447+
.map(|v| v as u32);
448+
449+
let source_queue = values
450+
.get("src-queue")
451+
.and_then(|v| v.as_str())
452+
.map(String::from);
453+
454+
let source_exchange = values
455+
.get("src-exchange")
456+
.and_then(|v| v.as_str())
457+
.map(String::from);
458+
459+
let source_exchange_routing_key = values
460+
.get("src-exchange-key")
461+
.and_then(|v| v.as_str())
462+
.map(String::from);
463+
464+
let source_address = values
465+
.get("src-address")
466+
.and_then(|v| v.as_str())
467+
.map(String::from);
468+
469+
let source_predeclared = values.get("src-predeclared").and_then(|v| v.as_bool());
470+
471+
let destination_queue = values
472+
.get("dest-queue")
473+
.and_then(|v| v.as_str())
474+
.map(String::from);
475+
476+
let destination_exchange = values
477+
.get("dest-exchange")
478+
.and_then(|v| v.as_str())
479+
.map(String::from);
480+
481+
let destination_exchange_routing_key = values
482+
.get("dest-exchange-key")
483+
.and_then(|v| v.as_str())
484+
.map(String::from);
485+
486+
let destination_address = values
487+
.get("dest-address")
488+
.and_then(|v| v.as_str())
489+
.map(String::from);
490+
491+
let destination_predeclared = values.get("dest-predeclared").and_then(|v| v.as_bool());
492+
493+
Ok(OwnedShovelParams {
494+
name: param.name,
495+
vhost: param.vhost.to_string(),
496+
source_protocol,
497+
destination_protocol,
498+
acknowledgement_mode,
499+
reconnect_delay,
500+
source_uri,
501+
source_queue,
502+
source_exchange,
503+
source_exchange_routing_key,
504+
source_address,
505+
source_predeclared,
506+
destination_uri,
507+
destination_queue,
508+
destination_exchange,
509+
destination_exchange_routing_key,
510+
destination_address,
511+
destination_predeclared,
512+
})
513+
}
514+
}
515+
516+
impl<'a> From<&'a OwnedShovelParams> for RuntimeParameterDefinition<'a> {
517+
fn from(params: &'a OwnedShovelParams) -> Self {
518+
let mut value = Map::new();
519+
520+
value.insert("src-protocol".to_owned(), json!(params.source_protocol));
521+
value.insert(
522+
"dest-protocol".to_owned(),
523+
json!(params.destination_protocol),
524+
);
525+
value.insert("src-uri".to_owned(), json!(params.source_uri));
526+
value.insert("dest-uri".to_owned(), json!(params.destination_uri));
527+
value.insert("ack-mode".to_owned(), json!(params.acknowledgement_mode));
528+
529+
if let Some(delay) = params.reconnect_delay {
530+
value.insert("reconnect-delay".to_owned(), json!(delay));
531+
}
532+
533+
if let Some(queue) = &params.source_queue {
534+
value.insert("src-queue".to_owned(), json!(queue));
535+
}
536+
if let Some(exchange) = &params.source_exchange {
537+
value.insert("src-exchange".to_owned(), json!(exchange));
538+
}
539+
if let Some(key) = &params.source_exchange_routing_key {
540+
value.insert("src-exchange-key".to_owned(), json!(key));
541+
}
542+
if let Some(address) = &params.source_address {
543+
value.insert("src-address".to_owned(), json!(address));
544+
}
545+
if let Some(predeclared) = params.source_predeclared {
546+
value.insert("src-predeclared".to_owned(), json!(predeclared));
547+
}
548+
549+
if let Some(queue) = &params.destination_queue {
550+
value.insert("dest-queue".to_owned(), json!(queue));
551+
}
552+
if let Some(exchange) = &params.destination_exchange {
553+
value.insert("dest-exchange".to_owned(), json!(exchange));
554+
}
555+
if let Some(key) = &params.destination_exchange_routing_key {
556+
value.insert("dest-exchange-key".to_owned(), json!(key));
557+
}
558+
if let Some(address) = &params.destination_address {
559+
value.insert("dest-address".to_owned(), json!(address));
560+
}
561+
if let Some(predeclared) = params.destination_predeclared {
562+
value.insert("dest-predeclared".to_owned(), json!(predeclared));
563+
}
564+
565+
Self {
566+
name: &params.name,
567+
vhost: &params.vhost,
568+
component: SHOVEL_COMPONENT,
569+
value,
570+
}
571+
}
572+
}
573+
574+
impl From<Shovel> for OwnedShovelParams {
575+
fn from(shovel: Shovel) -> Self {
576+
Self {
577+
name: shovel.name,
578+
vhost: shovel.vhost.unwrap_or_default(),
579+
source_protocol: shovel
580+
.source_protocol
581+
.map(|p| p.to_string())
582+
.unwrap_or_default(),
583+
destination_protocol: shovel
584+
.destination_protocol
585+
.map(|p| p.to_string())
586+
.unwrap_or_default(),
587+
acknowledgement_mode: MessageTransferAcknowledgementMode::default(),
588+
reconnect_delay: None,
589+
source_uri: shovel.source_uri.unwrap_or_default(),
590+
source_queue: shovel.source,
591+
source_exchange: None,
592+
source_exchange_routing_key: None,
593+
source_address: shovel.source_address,
594+
source_predeclared: None,
595+
destination_uri: shovel.destination_uri.unwrap_or_default(),
596+
destination_queue: shovel.destination,
597+
destination_exchange: None,
598+
destination_exchange_routing_key: None,
599+
destination_address: shovel.destination_address,
600+
destination_predeclared: None,
601+
}
602+
}
603+
}

0 commit comments

Comments
 (0)