Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): rewrote router into a state container #8641

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,6 @@ impl ConnectionBuilder {
if last_pass && is_last {
let is_terminal = ctx
.get_metadata(address.clone())
.await
.ok()
.flatten()
.map(|m| m.is_terminal)
.unwrap_or(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ impl NodeManager {
)));
}

if ctx.is_worker_registered_at(addr.clone()).await? {
if ctx.is_worker_registered_at(&addr) {
ctx.stop_worker(addr.clone()).await?
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,8 @@ impl NodeManagerWorker {
&self,
ctx: &Context,
) -> Result<Response<WorkerList>, Response<Error>> {
let workers = match ctx.list_workers().await {
Err(e) => Err(Response::internal_error_no_request(&e.to_string())),
Ok(workers) => Ok(workers),
}?;

let list = workers
let list = ctx
.list_workers()
.into_iter()
.map(|addr| WorkerStatus::new(addr.address()))
.collect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,7 @@ impl TestNode {
}

pub async fn create(runtime: Arc<Runtime>, listen_addr: Option<&str>) -> Self {
let (mut context, mut executor) = NodeBuilder::new().with_runtime(runtime.clone()).build();
runtime.spawn(async move {
executor.start_router().await.expect("cannot start router");
});
let (mut context, _executor) = NodeBuilder::new().with_runtime(runtime.clone()).build();
let node_manager_handle = start_manager_for_tests(
&mut context,
listen_addr,
Expand Down
6 changes: 3 additions & 3 deletions implementations/rust/ockam/ockam_api/tests/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ async fn authority_starts_with_default_configuration(ctx: &mut Context) -> Resul
let configuration = default_configuration().await?;
start_authority_node(ctx, &configuration).await?;

let workers = ctx.list_workers().await?;
let workers = ctx.list_workers();

assert!(!workers.contains(&Address::from(DefaultAddress::DIRECT_AUTHENTICATOR)));
assert!(!workers.contains(&Address::from(DefaultAddress::ENROLLMENT_TOKEN_ACCEPTOR)));
Expand All @@ -28,7 +28,7 @@ async fn authority_starts_direct_authenticator(ctx: &mut Context) -> Result<()>
configuration.no_direct_authentication = false;
start_authority_node(ctx, &configuration).await?;

let workers = ctx.list_workers().await?;
let workers = ctx.list_workers();

assert!(workers.contains(&Address::from(DefaultAddress::DIRECT_AUTHENTICATOR)));
assert!(!workers.contains(&Address::from(DefaultAddress::ENROLLMENT_TOKEN_ACCEPTOR)));
Expand All @@ -46,7 +46,7 @@ async fn authority_starts_enrollment_token(ctx: &mut Context) -> Result<()> {
configuration.no_token_enrollment = false;
start_authority_node(ctx, &configuration).await?;

let workers = ctx.list_workers().await?;
let workers = ctx.list_workers();

assert!(!workers.contains(&Address::from(DefaultAddress::DIRECT_AUTHENTICATOR)));
assert!(workers.contains(&Address::from(DefaultAddress::ENROLLMENT_TOKEN_ACCEPTOR)));
Expand Down
10 changes: 2 additions & 8 deletions implementations/rust/ockam/ockam_api/tests/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,12 @@ async fn start_monitoring__available__should_be_up_fast(ctx: &mut Context) -> Re
ctx.start_worker(Address::from_string("echo"), MockEchoer::new())
.await?;

assert!(
!ctx.is_worker_registered_at(session.collector_address().clone())
.await?
);
assert!(!ctx.is_worker_registered_at(session.collector_address()));

// Start the Session in a separate task
session.start_monitoring().await?;

assert!(
ctx.is_worker_registered_at(session.collector_address().clone())
.await?
);
assert!(ctx.is_worker_registered_at(session.collector_address()));

let mut time_to_restore = 0;

Expand Down
12 changes: 2 additions & 10 deletions implementations/rust/ockam/ockam_app_lib/src/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,20 +98,12 @@ impl AppState {
) -> Result<AppState> {
let cli_state = CliState::with_default_dir()?;
let rt = Arc::new(Runtime::new().expect("cannot create a tokio runtime"));
let (context, mut executor) = NodeBuilder::new()
let (context, _executor) = NodeBuilder::new()
.no_logging()
.with_runtime(rt.clone())
.build();
let context = Arc::new(context);

// start the router, it is needed for the node manager creation
rt.spawn(async move {
let result = executor.start_router().await;
if let Err(e) = result {
error!(%e, "Failed to start the router")
}
});

let runtime = context.runtime().clone();
let future = async {
Self::make(
Expand Down Expand Up @@ -327,7 +319,7 @@ impl AppState {

info!("stopped the old node manager");

for w in self.context.list_workers().await.into_diagnostic()? {
for w in self.context.list_workers() {
let _ = self.context.stop_worker(w.address()).await;
}
info!("stopped all the ctx workers");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,7 @@ async fn should_stop_encryptor__and__decryptor__in__secure_channel(
0
);

let workers = ctx.list_workers().await?;
let workers = ctx.list_workers();
assert!(!workers.contains(channel1.decryptor_messaging_address()));
assert!(!workers.contains(channel1.encryptor_messaging_address()));
assert!(!workers.contains(channel2.decryptor_messaging_address()));
Expand Down
3 changes: 0 additions & 3 deletions implementations/rust/ockam/ockam_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@ no_std = ["ockam_core/no_std", "ockam_transport_core/no_std", "heapless"]
# Feature: "alloc" enables support for heap allocation (implied by `feature = "std"`)
alloc = ["ockam_core/alloc", "ockam_executor/alloc", "futures/alloc", "minicbor/alloc"]

# Feature: "dump_internals" when set, will dump the internal state of
# workers at startup via the trace! macro.
dump_internals = []
# TODO should these features be combined?
metrics = []

Expand Down
23 changes: 7 additions & 16 deletions implementations/rust/ockam/ockam_node/src/async_drop.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use crate::tokio::sync::{
mpsc::Sender as DefaultSender,
oneshot::{self, Receiver, Sender},
};
use crate::NodeMessage;
use crate::router::Router;
use crate::tokio::sync::oneshot::{self, Receiver, Sender};
use ockam_core::Address;
use std::sync::Arc;

/// A helper to implement Drop mechanisms, but async
///
Expand All @@ -19,7 +17,7 @@ use ockam_core::Address;
/// additional metadata to generate messages.
pub struct AsyncDrop {
rx: Receiver<Address>,
sender: DefaultSender<NodeMessage>,
router: Arc<Router>,
}

impl AsyncDrop {
Expand All @@ -29,9 +27,9 @@ impl AsyncDrop {
/// Context that creates this hook, while the `address` field must
/// refer to the address of the context that will be deallocated
/// this way.
pub fn new(sender: DefaultSender<NodeMessage>) -> (Self, Sender<Address>) {
pub fn new(router: Arc<Router>) -> (Self, Sender<Address>) {
let (tx, rx) = oneshot::channel();
(Self { rx, sender }, tx)
(Self { rx, router }, tx)
}

/// Wait for the cancellation of the channel and then send a
Expand All @@ -42,16 +40,9 @@ impl AsyncDrop {
pub async fn run(self) {
if let Ok(addr) = self.rx.await {
debug!("Received AsyncDrop request for address: {}", addr);

let (msg, mut reply) = NodeMessage::stop_worker(addr, true);
if let Err(e) = self.sender.send(msg).await {
if let Err(e) = self.router.stop_worker(addr, true).await {
debug!("Failed sending AsyncDrop request to router: {}", e);
}

// Then check that address was properly shut down
if reply.recv().await.is_none() {
debug!("AsyncDrop router reply was None");
}
}
}
}
116 changes: 16 additions & 100 deletions implementations/rust/ockam/ockam_node/src/context/context.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::channel_types::{SmallReceiver, SmallSender};
use crate::channel_types::SmallReceiver;
use crate::tokio::runtime::Handle;
use crate::{error::*, AsyncDropSender, NodeMessage};
use crate::AsyncDropSender;
use core::sync::atomic::AtomicUsize;
use ockam_core::compat::collections::HashMap;
use ockam_core::compat::sync::{Arc, RwLock};
Expand All @@ -14,6 +14,7 @@ use ockam_core::{
Result, TransportType,
};

use crate::router::Router;
#[cfg(feature = "std")]
use core::fmt::{Debug, Formatter};
use ockam_core::errcode::{Kind, Origin};
Expand All @@ -25,7 +26,7 @@ pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
/// Context contains Node state and references to the runtime.
pub struct Context {
pub(super) mailboxes: Mailboxes,
pub(super) sender: SmallSender<NodeMessage>,
pub(super) router: Arc<Router>,
pub(super) rt: Handle,
pub(super) receiver: SmallReceiver<RelayMessage>,
pub(super) async_drop_sender: Option<AsyncDropSender>,
Expand All @@ -49,7 +50,6 @@ impl Debug for Context {
fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
f.debug_struct("Context")
.field("mailboxes", &self.mailboxes)
.field("sender", &self.sender)
.field("runtime", &self.rt)
.finish()
}
Expand All @@ -67,8 +67,8 @@ impl Context {
}

/// Return a reference to sender
pub(crate) fn sender(&self) -> &SmallSender<NodeMessage> {
&self.sender
pub(crate) fn router(&self) -> Arc<Router> {
self.router.clone()
}

/// Return the primary address of the current worker
Expand Down Expand Up @@ -126,82 +126,22 @@ impl Context {
/// Clusters are de-allocated in reverse order of their
/// initialisation when the node is stopped.
pub async fn set_cluster<S: Into<String>>(&self, label: S) -> Result<()> {
let (msg, mut rx) = NodeMessage::set_cluster(self.address(), label.into());
self.sender
.send(msg)
.await
.map_err(NodeError::from_send_err)?;
rx.recv()
.await
.ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())??
.is_ok()
self.router.set_cluster(self.address(), label.into())
}

/// Return a list of all available worker addresses on a node
pub async fn list_workers(&self) -> Result<Vec<Address>> {
let (msg, mut reply_rx) = NodeMessage::list_workers();

self.sender
.send(msg)
.await
.map_err(NodeError::from_send_err)?;

reply_rx
.recv()
.await
.ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())??
.take_workers()
pub fn list_workers(&self) -> Vec<Address> {
self.router.list_workers()
}

/// Return true if a worker is already registered at this address
pub async fn is_worker_registered_at(&self, address: Address) -> Result<bool> {
let (msg, mut reply_rx) = NodeMessage::is_worker_registered_at(address.clone());

self.sender
.send(msg)
.await
.map_err(NodeError::from_send_err)?;

reply_rx
.recv()
.await
.ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())??
.take_worker_is_registered()
pub fn is_worker_registered_at(&self, address: &Address) -> bool {
self.router.is_worker_registered_at(address)
}

/// Send a shutdown acknowledgement to the router
pub(crate) async fn send_stop_ack(&self) -> Result<()> {
self.sender
.send(NodeMessage::StopAck(self.address()))
.await
.map_err(NodeError::from_send_err)?;
Ok(())
}

/// This function is called by Relay to indicate a worker is initialised
pub(crate) async fn set_ready(&mut self) -> Result<()> {
self.sender
.send(NodeMessage::set_ready(self.address()))
.await
.map_err(NodeError::from_send_err)?;
Ok(())
}

/// Wait for a particular address to become "ready"
pub async fn wait_for<A: Into<Address>>(&self, addr: A) -> Result<()> {
let (msg, mut reply) = NodeMessage::get_ready(addr.into());
self.sender
.send(msg)
.await
.map_err(NodeError::from_send_err)?;

// This call blocks until the address has become ready or is
// dropped by the router
reply
.recv()
.await
.ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())??;
Ok(())
pub(crate) async fn stop_ack(&self) -> Result<()> {
self.router.stop_ack(self.address()).await
}

/// Finds the terminal address of a route, if present
Expand All @@ -210,7 +150,6 @@ impl Context {
route: impl Into<Vec<Address>>,
) -> Result<Option<AddressAndMetadata>> {
let addresses = route.into();

if addresses.iter().any(|a| !a.transport_type().is_local()) {
return Err(Error::new(
Origin::Node,
Expand All @@ -219,34 +158,11 @@ impl Context {
));
}

let (msg, mut reply) = NodeMessage::find_terminal_address(addresses);
self.sender
.send(msg)
.await
.map_err(NodeError::from_send_err)?;

reply
.recv()
.await
.ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())??
.take_terminal_address()
Ok(self.router.find_terminal_address(addresses))
}

/// Read metadata for the provided address
pub async fn get_metadata(
&self,
address: impl Into<Address>,
) -> Result<Option<AddressMetadata>> {
let (msg, mut reply) = NodeMessage::get_metadata(address.into());
self.sender
.send(msg)
.await
.map_err(NodeError::from_send_err)?;

reply
.recv()
.await
.ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())??
.take_metadata()
pub fn get_metadata(&self, address: impl Into<Address>) -> Option<AddressMetadata> {
self.router.get_address_metadata(&address.into())
}
}
Loading
Loading