Skip to content

Commit

Permalink
feat(rust): add secure channel persistence to kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
SanjoDeundiak committed May 28, 2024
1 parent baafa1e commit f878fb5
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,16 @@ impl KafkaSecureChannelControllerImpl {
consumer_decryptor_address: &Address,
encrypted_content: Vec<u8>,
) -> ockam_core::Result<Vec<u8>> {
let secure_channel_entry = self
.get_secure_channel_for(consumer_decryptor_address)
let secure_channel_decryptor_api_address = self
.get_or_load_secure_channel_decryptor_api_address_for(
context,
consumer_decryptor_address,
)
.await?;

let decrypt_response = context
.send_and_receive(
route![secure_channel_entry.decryptor_api_address().clone()],
route![secure_channel_decryptor_api_address],
DecryptionRequest(encrypted_content),
)
.await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::nodes::service::SecureChannelType;
use crate::DefaultAddress;
use ockam::identity::SecureChannelRegistryEntry;
use ockam_core::errcode::{Kind, Origin};
use ockam_core::{Address, Error};
use ockam_core::{Address, Error, Result};
use ockam_multiaddr::proto::{Secure, Service};
use ockam_multiaddr::MultiAddr;
use ockam_node::Context;
Expand All @@ -18,7 +18,7 @@ impl KafkaSecureChannelControllerImpl {
inner: &MutexGuard<'_, InnerSecureChannelControllerImpl>,
context: &Context,
mut destination: MultiAddr,
) -> ockam_core::Result<Address> {
) -> Result<Address> {
destination.push_back(Service::new(DefaultAddress::SECURE_CHANNEL_LISTENER))?;

let secure_channel = inner
Expand All @@ -42,7 +42,7 @@ impl KafkaSecureChannelControllerImpl {
inner: &MutexGuard<'_, InnerSecureChannelControllerImpl>,
context: &Context,
mut destination: MultiAddr,
) -> ockam_core::Result<Address> {
) -> Result<Address> {
destination.push_back(Service::new(DefaultAddress::KEY_EXCHANGER_LISTENER))?;

let secure_channel = inner
Expand All @@ -66,7 +66,7 @@ impl KafkaSecureChannelControllerImpl {
inner: &MutexGuard<'_, InnerSecureChannelControllerImpl>,
context: &Context,
encryptor_address: &Address,
) -> ockam_core::Result<()> {
) -> Result<()> {
inner
.node_manager
.delete_secure_channel(context, encryptor_address)
Expand All @@ -80,7 +80,7 @@ impl KafkaSecureChannelControllerImpl {
context: &mut Context,
topic_name: &str,
partition: i32,
) -> ockam_core::Result<SecureChannelRegistryEntry> {
) -> Result<SecureChannelRegistryEntry> {
let mut inner = self.inner.lock().await;

// when we have only one consumer, we use the same secure channel for all topics
Expand Down Expand Up @@ -197,7 +197,7 @@ impl KafkaSecureChannelControllerImpl {
async fn validate_consumer_credentials(
inner: &MutexGuard<'_, InnerSecureChannelControllerImpl>,
entry: &SecureChannelRegistryEntry,
) -> ockam_core::Result<()> {
) -> Result<()> {
let authorized = inner
.consumer_policy_access_control
.is_identity_authorized(entry.their_id())
Expand All @@ -218,33 +218,52 @@ impl KafkaSecureChannelControllerImpl {

/// Returns the secure channel entry for the consumer decryptor address and validate it
/// against the producer manual policy.
pub(crate) async fn get_secure_channel_for(
pub(crate) async fn get_or_load_secure_channel_decryptor_api_address_for(
&self,
consumer_decryptor_address: &Address,
) -> ockam_core::Result<SecureChannelRegistryEntry> {
ctx: &Context,
decryptor_remote_address: &Address,
) -> Result<Address> {
let inner = self.inner.lock().await;
let entry = inner
let (decryptor_api_address, their_identifier) = match inner
.secure_channels
.secure_channel_registry()
.get_channel_by_decryptor_address(consumer_decryptor_address)
.ok_or_else(|| {
Error::new(
Origin::Channel,
Kind::Unknown,
format!(
"secure channel decrypt doesn't exists: {}",
consumer_decryptor_address.address()
.get_channel_by_decryptor_address(decryptor_remote_address)
{
Some(entry) => (
entry.decryptor_api_address().clone(),
entry.their_id().clone(),
),
None => {
match inner
.secure_channels
.start_persisted_secure_channel_decryptor(ctx, decryptor_remote_address)
.await
{
Ok(sc) => (
sc.decryptor_api_address().clone(),
sc.their_identifier().clone(),
),
)
})?;
Err(_) => {
return Err(Error::new(
Origin::Channel,
Kind::Unknown,
format!(
"secure channel decryptor doesn't exist: {}",
decryptor_remote_address.address()
),
));
}
}
}
};

let authorized = inner
.producer_policy_access_control
.is_identity_authorized(entry.their_id())
.is_identity_authorized(&their_identifier)
.await?;

if authorized {
Ok(entry)
Ok(decryptor_api_address)
} else {
Err(Error::new(
Origin::Transport,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ impl NodeManager {
};

let options = if secure_channel_type == SecureChannelType::KeyExchangeOnly {
options.key_exchange_only()
// TODO: Should key exchange channels be persisted automatically?
options.key_exchange_only().persist()?
} else {
options
};
Expand Down Expand Up @@ -373,7 +374,8 @@ impl NodeManager {
};

let options = if secure_channel_type == SecureChannelType::KeyExchangeOnly {
options.key_exchange_only()
// TODO: Should key exchange channels be persisted automatically?
options.key_exchange_only().persist()?
} else {
options
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn test_key_exchange_only(ctx: &mut Context) -> ockam_core::Result<()> {
.create_secure_channel(ctx, &alice, route!["bob_listener"], alice_options)
.await?;

ctx.sleep(Duration::from_millis(10)).await;
ctx.sleep(Duration::from_millis(50)).await;

let bob_channel = secure_channels_bob
.secure_channel_registry()
Expand Down

0 comments on commit f878fb5

Please sign in to comment.