diff --git a/implementations/rust/ockam/ockam_api/src/kafka/secure_channel_map/controller.rs b/implementations/rust/ockam/ockam_api/src/kafka/secure_channel_map/controller.rs index 9d1e56d0d67..b7cac346580 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/secure_channel_map/controller.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/secure_channel_map/controller.rs @@ -72,13 +72,16 @@ impl KafkaSecureChannelControllerImpl { consumer_decryptor_address: &Address, encrypted_content: Vec, ) -> ockam_core::Result> { - let secure_channel_entry = self - .get_secure_channel_for(consumer_decryptor_address) + let secure_channel_decryptor_api_address = self + .get_or_load_secure_channel_decryptor_api_address_for( + context, + consumer_decryptor_address, + ) .await?; let decrypt_response = context .send_and_receive( - route![secure_channel_entry.decryptor_api_address().clone()], + route![secure_channel_decryptor_api_address], DecryptionRequest(encrypted_content), ) .await?; diff --git a/implementations/rust/ockam/ockam_api/src/kafka/secure_channel_map/secure_channels.rs b/implementations/rust/ockam/ockam_api/src/kafka/secure_channel_map/secure_channels.rs index fb8ee5cd9a2..602acf7300d 100644 --- a/implementations/rust/ockam/ockam_api/src/kafka/secure_channel_map/secure_channels.rs +++ b/implementations/rust/ockam/ockam_api/src/kafka/secure_channel_map/secure_channels.rs @@ -6,7 +6,7 @@ use crate::nodes::service::SecureChannelType; use crate::DefaultAddress; use ockam::identity::SecureChannelRegistryEntry; use ockam_core::errcode::{Kind, Origin}; -use ockam_core::{Address, Error}; +use ockam_core::{Address, Error, Result}; use ockam_multiaddr::proto::{Secure, Service}; use ockam_multiaddr::MultiAddr; use ockam_node::Context; @@ -18,7 +18,7 @@ impl KafkaSecureChannelControllerImpl { inner: &MutexGuard<'_, InnerSecureChannelControllerImpl>, context: &Context, mut destination: MultiAddr, - ) -> ockam_core::Result
{ + ) -> Result
{ destination.push_back(Service::new(DefaultAddress::SECURE_CHANNEL_LISTENER))?; let secure_channel = inner @@ -42,7 +42,7 @@ impl KafkaSecureChannelControllerImpl { inner: &MutexGuard<'_, InnerSecureChannelControllerImpl>, context: &Context, mut destination: MultiAddr, - ) -> ockam_core::Result
{ + ) -> Result
{ destination.push_back(Service::new(DefaultAddress::KEY_EXCHANGER_LISTENER))?; let secure_channel = inner @@ -66,7 +66,7 @@ impl KafkaSecureChannelControllerImpl { inner: &MutexGuard<'_, InnerSecureChannelControllerImpl>, context: &Context, encryptor_address: &Address, - ) -> ockam_core::Result<()> { + ) -> Result<()> { inner .node_manager .delete_secure_channel(context, encryptor_address) @@ -80,7 +80,7 @@ impl KafkaSecureChannelControllerImpl { context: &mut Context, topic_name: &str, partition: i32, - ) -> ockam_core::Result { + ) -> Result { let mut inner = self.inner.lock().await; // when we have only one consumer, we use the same secure channel for all topics @@ -197,7 +197,7 @@ impl KafkaSecureChannelControllerImpl { async fn validate_consumer_credentials( inner: &MutexGuard<'_, InnerSecureChannelControllerImpl>, entry: &SecureChannelRegistryEntry, - ) -> ockam_core::Result<()> { + ) -> Result<()> { let authorized = inner .consumer_policy_access_control .is_identity_authorized(entry.their_id()) @@ -218,33 +218,52 @@ impl KafkaSecureChannelControllerImpl { /// Returns the secure channel entry for the consumer decryptor address and validate it /// against the producer manual policy. - pub(crate) async fn get_secure_channel_for( + pub(crate) async fn get_or_load_secure_channel_decryptor_api_address_for( &self, - consumer_decryptor_address: &Address, - ) -> ockam_core::Result { + ctx: &Context, + decryptor_remote_address: &Address, + ) -> Result
{ let inner = self.inner.lock().await; - let entry = inner + let (decryptor_api_address, their_identifier) = match inner .secure_channels .secure_channel_registry() - .get_channel_by_decryptor_address(consumer_decryptor_address) - .ok_or_else(|| { - Error::new( - Origin::Channel, - Kind::Unknown, - format!( - "secure channel decrypt doesn't exists: {}", - consumer_decryptor_address.address() + .get_channel_by_decryptor_address(decryptor_remote_address) + { + Some(entry) => ( + entry.decryptor_api_address().clone(), + entry.their_id().clone(), + ), + None => { + match inner + .secure_channels + .start_persisted_secure_channel_decryptor(ctx, decryptor_remote_address) + .await + { + Ok(sc) => ( + sc.decryptor_api_address().clone(), + sc.their_identifier().clone(), ), - ) - })?; + Err(_) => { + return Err(Error::new( + Origin::Channel, + Kind::Unknown, + format!( + "secure channel decryptor doesn't exist: {}", + decryptor_remote_address.address() + ), + )); + } + } + } + }; let authorized = inner .producer_policy_access_control - .is_identity_authorized(entry.their_id()) + .is_identity_authorized(&their_identifier) .await?; if authorized { - Ok(entry) + Ok(decryptor_api_address) } else { Err(Error::new( Origin::Transport, diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs index 40fb3a5dc6f..f0663dc140d 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/secure_channel.rs @@ -243,7 +243,8 @@ impl NodeManager { }; let options = if secure_channel_type == SecureChannelType::KeyExchangeOnly { - options.key_exchange_only() + // TODO: Should key exchange channels be persisted automatically? + options.key_exchange_only().persist()? } else { options }; @@ -373,7 +374,8 @@ impl NodeManager { }; let options = if secure_channel_type == SecureChannelType::KeyExchangeOnly { - options.key_exchange_only() + // TODO: Should key exchange channels be persisted automatically? + options.key_exchange_only().persist()? } else { options }; diff --git a/implementations/rust/ockam/ockam_identity/tests/persistence.rs b/implementations/rust/ockam/ockam_identity/tests/persistence.rs index 15115ffef04..58fc3e84f70 100644 --- a/implementations/rust/ockam/ockam_identity/tests/persistence.rs +++ b/implementations/rust/ockam/ockam_identity/tests/persistence.rs @@ -37,7 +37,7 @@ async fn test_key_exchange_only(ctx: &mut Context) -> ockam_core::Result<()> { .create_secure_channel(ctx, &alice, route!["bob_listener"], alice_options) .await?; - ctx.sleep(Duration::from_millis(10)).await; + ctx.sleep(Duration::from_millis(50)).await; let bob_channel = secure_channels_bob .secure_channel_registry()