diff --git a/implementations/rust/ockam/ockam_api/src/nodes/connection/mod.rs b/implementations/rust/ockam/ockam_api/src/nodes/connection/mod.rs index 3d97e591ea5..4b118410436 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/connection/mod.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/connection/mod.rs @@ -307,9 +307,6 @@ impl ConnectionBuilder { if last_pass && is_last { let is_terminal = ctx .get_metadata(address.clone()) - .await - .ok() - .flatten() .map(|m| m.is_terminal) .unwrap_or(false); diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/node_services.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/node_services.rs index 4c1d6fc81f1..27a11c0ad6e 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/node_services.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/node_services.rs @@ -202,7 +202,7 @@ impl NodeManager { ))); } - if ctx.is_worker_registered_at(addr.clone()).await? { + if ctx.is_worker_registered_at(&addr) { ctx.stop_worker(addr.clone()).await? }; diff --git a/implementations/rust/ockam/ockam_api/src/nodes/service/workers.rs b/implementations/rust/ockam/ockam_api/src/nodes/service/workers.rs index bcc9cb951d6..8a350ed4141 100644 --- a/implementations/rust/ockam/ockam_api/src/nodes/service/workers.rs +++ b/implementations/rust/ockam/ockam_api/src/nodes/service/workers.rs @@ -10,12 +10,8 @@ impl NodeManagerWorker { &self, ctx: &Context, ) -> Result, Response> { - let workers = match ctx.list_workers().await { - Err(e) => Err(Response::internal_error_no_request(&e.to_string())), - Ok(workers) => Ok(workers), - }?; - - let list = workers + let list = ctx + .list_workers() .into_iter() .map(|addr| WorkerStatus::new(addr.address())) .collect(); diff --git a/implementations/rust/ockam/ockam_api/src/test_utils/mod.rs b/implementations/rust/ockam/ockam_api/src/test_utils/mod.rs index eb866a05fe0..e2ec9d23c72 100644 --- a/implementations/rust/ockam/ockam_api/src/test_utils/mod.rs +++ b/implementations/rust/ockam/ockam_api/src/test_utils/mod.rs @@ -218,10 +218,7 @@ impl TestNode { } pub async fn create(runtime: Arc, listen_addr: Option<&str>) -> Self { - let (mut context, mut executor) = NodeBuilder::new().with_runtime(runtime.clone()).build(); - runtime.spawn(async move { - executor.start_router().await.expect("cannot start router"); - }); + let (mut context, _executor) = NodeBuilder::new().with_runtime(runtime.clone()).build(); let node_manager_handle = start_manager_for_tests( &mut context, listen_addr, diff --git a/implementations/rust/ockam/ockam_api/tests/authority.rs b/implementations/rust/ockam/ockam_api/tests/authority.rs index a10d0cda4b1..7ec768ffc0f 100644 --- a/implementations/rust/ockam/ockam_api/tests/authority.rs +++ b/implementations/rust/ockam/ockam_api/tests/authority.rs @@ -10,7 +10,7 @@ async fn authority_starts_with_default_configuration(ctx: &mut Context) -> Resul let configuration = default_configuration().await?; start_authority_node(ctx, &configuration).await?; - let workers = ctx.list_workers().await?; + let workers = ctx.list_workers(); assert!(!workers.contains(&Address::from(DefaultAddress::DIRECT_AUTHENTICATOR))); assert!(!workers.contains(&Address::from(DefaultAddress::ENROLLMENT_TOKEN_ACCEPTOR))); @@ -28,7 +28,7 @@ async fn authority_starts_direct_authenticator(ctx: &mut Context) -> Result<()> configuration.no_direct_authentication = false; start_authority_node(ctx, &configuration).await?; - let workers = ctx.list_workers().await?; + let workers = ctx.list_workers(); assert!(workers.contains(&Address::from(DefaultAddress::DIRECT_AUTHENTICATOR))); assert!(!workers.contains(&Address::from(DefaultAddress::ENROLLMENT_TOKEN_ACCEPTOR))); @@ -46,7 +46,7 @@ async fn authority_starts_enrollment_token(ctx: &mut Context) -> Result<()> { configuration.no_token_enrollment = false; start_authority_node(ctx, &configuration).await?; - let workers = ctx.list_workers().await?; + let workers = ctx.list_workers(); assert!(!workers.contains(&Address::from(DefaultAddress::DIRECT_AUTHENTICATOR))); assert!(workers.contains(&Address::from(DefaultAddress::ENROLLMENT_TOKEN_ACCEPTOR))); diff --git a/implementations/rust/ockam/ockam_api/tests/session.rs b/implementations/rust/ockam/ockam_api/tests/session.rs index 2b55a6ecbb2..1fbb2d0f3d8 100644 --- a/implementations/rust/ockam/ockam_api/tests/session.rs +++ b/implementations/rust/ockam/ockam_api/tests/session.rs @@ -114,18 +114,12 @@ async fn start_monitoring__available__should_be_up_fast(ctx: &mut Context) -> Re ctx.start_worker(Address::from_string("echo"), MockEchoer::new()) .await?; - assert!( - !ctx.is_worker_registered_at(session.collector_address().clone()) - .await? - ); + assert!(!ctx.is_worker_registered_at(session.collector_address())); // Start the Session in a separate task session.start_monitoring().await?; - assert!( - ctx.is_worker_registered_at(session.collector_address().clone()) - .await? - ); + assert!(ctx.is_worker_registered_at(session.collector_address())); let mut time_to_restore = 0; diff --git a/implementations/rust/ockam/ockam_app_lib/src/state/mod.rs b/implementations/rust/ockam/ockam_app_lib/src/state/mod.rs index 90c7246e21f..b1c89ddb383 100644 --- a/implementations/rust/ockam/ockam_app_lib/src/state/mod.rs +++ b/implementations/rust/ockam/ockam_app_lib/src/state/mod.rs @@ -98,20 +98,12 @@ impl AppState { ) -> Result { let cli_state = CliState::with_default_dir()?; let rt = Arc::new(Runtime::new().expect("cannot create a tokio runtime")); - let (context, mut executor) = NodeBuilder::new() + let (context, _executor) = NodeBuilder::new() .no_logging() .with_runtime(rt.clone()) .build(); let context = Arc::new(context); - // start the router, it is needed for the node manager creation - rt.spawn(async move { - let result = executor.start_router().await; - if let Err(e) = result { - error!(%e, "Failed to start the router") - } - }); - let runtime = context.runtime().clone(); let future = async { Self::make( @@ -327,7 +319,7 @@ impl AppState { info!("stopped the old node manager"); - for w in self.context.list_workers().await.into_diagnostic()? { + for w in self.context.list_workers() { let _ = self.context.stop_worker(w.address()).await; } info!("stopped all the ctx workers"); diff --git a/implementations/rust/ockam/ockam_identity/tests/channel.rs b/implementations/rust/ockam/ockam_identity/tests/channel.rs index 2f405bb3a09..779be524319 100644 --- a/implementations/rust/ockam/ockam_identity/tests/channel.rs +++ b/implementations/rust/ockam/ockam_identity/tests/channel.rs @@ -1083,7 +1083,7 @@ async fn should_stop_encryptor__and__decryptor__in__secure_channel( 0 ); - let workers = ctx.list_workers().await?; + let workers = ctx.list_workers(); assert!(!workers.contains(channel1.decryptor_messaging_address())); assert!(!workers.contains(channel1.encryptor_messaging_address())); assert!(!workers.contains(channel2.decryptor_messaging_address())); diff --git a/implementations/rust/ockam/ockam_node/Cargo.toml b/implementations/rust/ockam/ockam_node/Cargo.toml index e7ef12dae84..eca42acca09 100644 --- a/implementations/rust/ockam/ockam_node/Cargo.toml +++ b/implementations/rust/ockam/ockam_node/Cargo.toml @@ -58,9 +58,6 @@ no_std = ["ockam_core/no_std", "ockam_transport_core/no_std", "heapless"] # Feature: "alloc" enables support for heap allocation (implied by `feature = "std"`) alloc = ["ockam_core/alloc", "ockam_executor/alloc", "futures/alloc", "minicbor/alloc"] -# Feature: "dump_internals" when set, will dump the internal state of -# workers at startup via the trace! macro. -dump_internals = [] # TODO should these features be combined? metrics = [] diff --git a/implementations/rust/ockam/ockam_node/src/async_drop.rs b/implementations/rust/ockam/ockam_node/src/async_drop.rs index 0c45bced995..4f4c55b8e3e 100644 --- a/implementations/rust/ockam/ockam_node/src/async_drop.rs +++ b/implementations/rust/ockam/ockam_node/src/async_drop.rs @@ -1,9 +1,7 @@ -use crate::tokio::sync::{ - mpsc::Sender as DefaultSender, - oneshot::{self, Receiver, Sender}, -}; -use crate::NodeMessage; +use crate::router::Router; +use crate::tokio::sync::oneshot::{self, Receiver, Sender}; use ockam_core::Address; +use std::sync::Arc; /// A helper to implement Drop mechanisms, but async /// @@ -19,7 +17,7 @@ use ockam_core::Address; /// additional metadata to generate messages. pub struct AsyncDrop { rx: Receiver
, - sender: DefaultSender, + router: Arc, } impl AsyncDrop { @@ -29,9 +27,9 @@ impl AsyncDrop { /// Context that creates this hook, while the `address` field must /// refer to the address of the context that will be deallocated /// this way. - pub fn new(sender: DefaultSender) -> (Self, Sender
) { + pub fn new(router: Arc) -> (Self, Sender
) { let (tx, rx) = oneshot::channel(); - (Self { rx, sender }, tx) + (Self { rx, router }, tx) } /// Wait for the cancellation of the channel and then send a @@ -42,16 +40,9 @@ impl AsyncDrop { pub async fn run(self) { if let Ok(addr) = self.rx.await { debug!("Received AsyncDrop request for address: {}", addr); - - let (msg, mut reply) = NodeMessage::stop_worker(addr, true); - if let Err(e) = self.sender.send(msg).await { + if let Err(e) = self.router.stop_worker(addr, true).await { debug!("Failed sending AsyncDrop request to router: {}", e); } - - // Then check that address was properly shut down - if reply.recv().await.is_none() { - debug!("AsyncDrop router reply was None"); - } } } } diff --git a/implementations/rust/ockam/ockam_node/src/context/context.rs b/implementations/rust/ockam/ockam_node/src/context/context.rs index b3f1e0d163d..da1dd88a74d 100644 --- a/implementations/rust/ockam/ockam_node/src/context/context.rs +++ b/implementations/rust/ockam/ockam_node/src/context/context.rs @@ -1,6 +1,6 @@ -use crate::channel_types::{SmallReceiver, SmallSender}; +use crate::channel_types::SmallReceiver; use crate::tokio::runtime::Handle; -use crate::{error::*, AsyncDropSender, NodeMessage}; +use crate::AsyncDropSender; use core::sync::atomic::AtomicUsize; use ockam_core::compat::collections::HashMap; use ockam_core::compat::sync::{Arc, RwLock}; @@ -14,6 +14,7 @@ use ockam_core::{ Result, TransportType, }; +use crate::router::Router; #[cfg(feature = "std")] use core::fmt::{Debug, Formatter}; use ockam_core::errcode::{Kind, Origin}; @@ -25,7 +26,7 @@ pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); /// Context contains Node state and references to the runtime. pub struct Context { pub(super) mailboxes: Mailboxes, - pub(super) sender: SmallSender, + pub(super) router: Arc, pub(super) rt: Handle, pub(super) receiver: SmallReceiver, pub(super) async_drop_sender: Option, @@ -49,7 +50,6 @@ impl Debug for Context { fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { f.debug_struct("Context") .field("mailboxes", &self.mailboxes) - .field("sender", &self.sender) .field("runtime", &self.rt) .finish() } @@ -67,8 +67,8 @@ impl Context { } /// Return a reference to sender - pub(crate) fn sender(&self) -> &SmallSender { - &self.sender + pub(crate) fn router(&self) -> Arc { + self.router.clone() } /// Return the primary address of the current worker @@ -126,82 +126,22 @@ impl Context { /// Clusters are de-allocated in reverse order of their /// initialisation when the node is stopped. pub async fn set_cluster>(&self, label: S) -> Result<()> { - let (msg, mut rx) = NodeMessage::set_cluster(self.address(), label.into()); - self.sender - .send(msg) - .await - .map_err(NodeError::from_send_err)?; - rx.recv() - .await - .ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())?? - .is_ok() + self.router.set_cluster(self.address(), label.into()) } /// Return a list of all available worker addresses on a node - pub async fn list_workers(&self) -> Result> { - let (msg, mut reply_rx) = NodeMessage::list_workers(); - - self.sender - .send(msg) - .await - .map_err(NodeError::from_send_err)?; - - reply_rx - .recv() - .await - .ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())?? - .take_workers() + pub fn list_workers(&self) -> Vec
{ + self.router.list_workers() } /// Return true if a worker is already registered at this address - pub async fn is_worker_registered_at(&self, address: Address) -> Result { - let (msg, mut reply_rx) = NodeMessage::is_worker_registered_at(address.clone()); - - self.sender - .send(msg) - .await - .map_err(NodeError::from_send_err)?; - - reply_rx - .recv() - .await - .ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())?? - .take_worker_is_registered() + pub fn is_worker_registered_at(&self, address: &Address) -> bool { + self.router.is_worker_registered_at(address) } /// Send a shutdown acknowledgement to the router - pub(crate) async fn send_stop_ack(&self) -> Result<()> { - self.sender - .send(NodeMessage::StopAck(self.address())) - .await - .map_err(NodeError::from_send_err)?; - Ok(()) - } - - /// This function is called by Relay to indicate a worker is initialised - pub(crate) async fn set_ready(&mut self) -> Result<()> { - self.sender - .send(NodeMessage::set_ready(self.address())) - .await - .map_err(NodeError::from_send_err)?; - Ok(()) - } - - /// Wait for a particular address to become "ready" - pub async fn wait_for>(&self, addr: A) -> Result<()> { - let (msg, mut reply) = NodeMessage::get_ready(addr.into()); - self.sender - .send(msg) - .await - .map_err(NodeError::from_send_err)?; - - // This call blocks until the address has become ready or is - // dropped by the router - reply - .recv() - .await - .ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())??; - Ok(()) + pub(crate) async fn stop_ack(&self) -> Result<()> { + self.router.stop_ack(self.address()).await } /// Finds the terminal address of a route, if present @@ -210,7 +150,6 @@ impl Context { route: impl Into>, ) -> Result> { let addresses = route.into(); - if addresses.iter().any(|a| !a.transport_type().is_local()) { return Err(Error::new( Origin::Node, @@ -219,34 +158,11 @@ impl Context { )); } - let (msg, mut reply) = NodeMessage::find_terminal_address(addresses); - self.sender - .send(msg) - .await - .map_err(NodeError::from_send_err)?; - - reply - .recv() - .await - .ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())?? - .take_terminal_address() + Ok(self.router.find_terminal_address(addresses)) } /// Read metadata for the provided address - pub async fn get_metadata( - &self, - address: impl Into
, - ) -> Result> { - let (msg, mut reply) = NodeMessage::get_metadata(address.into()); - self.sender - .send(msg) - .await - .map_err(NodeError::from_send_err)?; - - reply - .recv() - .await - .ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())?? - .take_metadata() + pub fn get_metadata(&self, address: impl Into
) -> Option { + self.router.get_address_metadata(&address.into()) } } diff --git a/implementations/rust/ockam/ockam_node/src/context/context_lifecycle.rs b/implementations/rust/ockam/ockam_node/src/context/context_lifecycle.rs index 148bf3b091d..ef694245ebd 100644 --- a/implementations/rust/ockam/ockam_node/src/context/context_lifecycle.rs +++ b/implementations/rust/ockam/ockam_node/src/context/context_lifecycle.rs @@ -9,18 +9,18 @@ use ockam_core::flow_control::FlowControls; #[cfg(feature = "std")] use ockam_core::OpenTelemetryContext; use ockam_core::{ - errcode::{Kind, Origin}, - Address, AsyncTryClone, DenyAll, Error, IncomingAccessControl, Mailboxes, - OutgoingAccessControl, Result, TransportType, + Address, AsyncTryClone, DenyAll, IncomingAccessControl, Mailboxes, OutgoingAccessControl, + Result, TransportType, }; use ockam_transport_core::Transport; use tokio::runtime::Handle; use crate::async_drop::AsyncDrop; -use crate::channel_types::{message_channel, small_channel, SmallReceiver, SmallSender}; +use crate::channel_types::{message_channel, small_channel, SmallReceiver}; +use crate::router::Router; use crate::{debugger, Context}; -use crate::{error::*, relay::CtrlSignal, router::SenderPair, NodeMessage}; +use crate::{relay::CtrlSignal, router::SenderPair}; /// A special type of `Context` that has no worker relay and inherits /// the parent `Context`'s access control @@ -64,7 +64,7 @@ impl Context { #[allow(clippy::too_many_arguments)] pub(crate) fn new( rt: Handle, - sender: SmallSender, + router: Arc, mailboxes: Mailboxes, async_drop_sender: Option, transports: Arc>>>, @@ -76,7 +76,7 @@ impl Context { ( Self { rt, - sender, + router, mailboxes, receiver, async_drop_sender, @@ -100,7 +100,7 @@ impl Context { ) -> (Context, SenderPair, SmallReceiver) { Context::new( self.runtime().clone(), - self.sender().clone(), + self.router(), mailboxes, None, self.transports.clone(), @@ -117,7 +117,7 @@ impl Context { ) -> (Context, SenderPair, SmallReceiver) { Context::new( self.runtime().clone(), - self.sender().clone(), + self.router(), mailboxes, Some(drop_sender), self.transports.clone(), @@ -228,28 +228,15 @@ impl Context { // This handler is spawned and listens for an event from the // Drop handler, and then forwards a message to the Node // router. - let (async_drop, drop_sender) = AsyncDrop::new(self.sender.clone()); + let (async_drop, drop_sender) = AsyncDrop::new(self.router.clone()); self.rt.spawn(async_drop.run()); // Create a new context and get access to the mailbox senders let addresses = mailboxes.addresses(); let (ctx, sender, _) = self.copy_with_mailboxes_detached(mailboxes, drop_sender); - // Create a "detached relay" and register it with the router - let (msg, mut rx) = NodeMessage::start_worker( - addresses, - sender, - true, - Arc::clone(&self.mailbox_count), - vec![], - ); - self.sender - .send(msg) - .await - .map_err(|e| Error::new(Origin::Node, Kind::Invalid, e))?; - rx.recv() - .await - .ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())??; + self.router + .start_worker(addresses, sender, true, vec![], self.mailbox_count.clone())?; Ok(ctx) } @@ -272,7 +259,7 @@ mod tests { assert!(copy.is_transport_registered(transport.transport_type())); // after a detached copy with new mailboxes the list of transports should be intact - let (_, drop_sender) = AsyncDrop::new(ctx.sender.clone()); + let (_, drop_sender) = AsyncDrop::new(ctx.router.clone()); let (copy, _, _) = ctx.copy_with_mailboxes_detached(mailboxes, drop_sender); assert!(copy.is_transport_registered(transport.transport_type())); Ok(()) diff --git a/implementations/rust/ockam/ockam_node/src/context/register_router.rs b/implementations/rust/ockam/ockam_node/src/context/register_router.rs index 12a13ac697f..65e16c61999 100644 --- a/implementations/rust/ockam/ockam_node/src/context/register_router.rs +++ b/implementations/rust/ockam/ockam_node/src/context/register_router.rs @@ -1,24 +1,10 @@ -use crate::channel_types::small_channel; -use crate::{error::*, Context, NodeMessage}; +use crate::Context; use ockam_core::{Address, Result, TransportType}; impl Context { // TODO: This method should be deprecated /// Register a router for a specific address type - pub async fn register>(&self, type_: TransportType, addr: A) -> Result<()> { - self.register_impl(type_, addr.into()).await - } - - async fn register_impl(&self, type_: TransportType, addr: Address) -> Result<()> { - let (tx, mut rx) = small_channel(); - self.sender - .send(NodeMessage::Router(type_, addr, tx)) - .await - .map_err(NodeError::from_send_err)?; - - rx.recv() - .await - .ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())??; - Ok(()) + pub fn register>(&self, type_: TransportType, addr: A) -> Result<()> { + self.router.register_router(type_, addr.into()) } } diff --git a/implementations/rust/ockam/ockam_node/src/context/send_message.rs b/implementations/rust/ockam/ockam_node/src/context/send_message.rs index cf1c76e46d9..ceb68019822 100644 --- a/implementations/rust/ockam/ockam_node/src/context/send_message.rs +++ b/implementations/rust/ockam/ockam_node/src/context/send_message.rs @@ -1,7 +1,6 @@ -use crate::channel_types::small_channel; use crate::context::MessageWait; +use crate::error::*; use crate::{debugger, Context, MessageReceiveOptions, DEFAULT_TIMEOUT}; -use crate::{error::*, NodeMessage}; use cfg_if::cfg_if; use core::time::Duration; use ockam_core::compat::{sync::Arc, vec::Vec}; @@ -232,7 +231,6 @@ impl Context { } // First resolve the next hop in the route - let (reply_tx, mut reply_rx) = small_channel(); let addr = match route.next() { Ok(next) => next.clone(), Err(err) => { @@ -242,16 +240,7 @@ impl Context { } }; - let req = NodeMessage::SenderReq(addr, reply_tx); - self.sender - .send(req) - .await - .map_err(NodeError::from_send_err)?; - let (addr, sender) = reply_rx - .recv() - .await - .ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())?? - .take_sender()?; + let sender = self.router.resolve(&addr)?; // Pack the payload into a TransportMessage let payload = msg.encode().map_err(|_| NodeError::Data.internal())?; @@ -337,7 +326,6 @@ impl Context { } // First resolve the next hop in the route - let (reply_tx, mut reply_rx) = small_channel(); let addr = match local_msg.onward_route_ref().next() { Ok(next) => next.clone(), Err(err) => { @@ -349,16 +337,7 @@ impl Context { return Err(err); } }; - let req = NodeMessage::SenderReq(addr, reply_tx); - self.sender - .send(req) - .await - .map_err(NodeError::from_send_err)?; - let (addr, sender) = reply_rx - .recv() - .await - .ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())?? - .take_sender()?; + let sender = self.router.resolve(&addr)?; // Pack the transport message into a RelayMessage wrapper let relay_msg = RelayMessage::new(sending_address, addr, local_msg); diff --git a/implementations/rust/ockam/ockam_node/src/context/stop_env.rs b/implementations/rust/ockam/ockam_node/src/context/stop_env.rs index f1830f1296e..93c9da7fdec 100644 --- a/implementations/rust/ockam/ockam_node/src/context/stop_env.rs +++ b/implementations/rust/ockam/ockam_node/src/context/stop_env.rs @@ -1,9 +1,5 @@ use crate::Context; -use crate::{error::*, NodeMessage, ShutdownType}; -use ockam_core::{ - errcode::{Kind, Origin}, - Error, Result, -}; +use ockam_core::Result; impl Context { /// Signal to the local runtime to shut down immediately @@ -11,15 +7,9 @@ impl Context { /// **WARNING**: calling this function may result in data loss. /// It is recommended to use the much safer /// [`Context::stop`](Context::stop) function instead! - pub async fn stop_now(&self) -> Result<()> { - let tx = self.sender.clone(); + pub async fn stop_now(&self) { info!("Immediately shutting down all workers"); - let (msg, _) = NodeMessage::stop_node(ShutdownType::Immediate); - - match tx.send(msg).await { - Ok(()) => Ok(()), - Err(e) => Err(Error::new(Origin::Node, Kind::Invalid, e)), - } + self.router.stop_now() } /// Signal to the local runtime to shut down @@ -37,16 +27,6 @@ impl Context { /// This call will hang until a safe shutdown has been completed /// or the desired timeout has been reached. pub async fn stop_timeout(&self, seconds: u8) -> Result<()> { - let (req, mut rx) = NodeMessage::stop_node(ShutdownType::Graceful(seconds)); - self.sender - .send(req) - .await - .map_err(NodeError::from_send_err)?; - - // Wait until we get the all-clear - rx.recv() - .await - .ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())??; - Ok(()) + self.router.clone().stop_graceful(seconds).await } } diff --git a/implementations/rust/ockam/ockam_node/src/context/worker_lifecycle.rs b/implementations/rust/ockam/ockam_node/src/context/worker_lifecycle.rs index 9d91ac91590..da156a43795 100644 --- a/implementations/rust/ockam/ockam_node/src/context/worker_lifecycle.rs +++ b/implementations/rust/ockam/ockam_node/src/context/worker_lifecycle.rs @@ -1,4 +1,4 @@ -use crate::{Context, NodeError, NodeMessage, NodeReason}; +use crate::Context; use crate::{ProcessorBuilder, WorkerBuilder}; use ockam_core::{ Address, IncomingAccessControl, OutgoingAccessControl, Processor, Result, Worker, @@ -90,9 +90,7 @@ impl Context { /// Each address in the set must be unique and unused on the /// current node. Workers must implement the Worker trait and be /// thread-safe. Workers run asynchronously and will be scheduled - /// independently of each other. To wait for the initialisation - /// of your worker to complete you can use - /// [`wait_for()`](Self::wait_for). + /// independently of each other. /// /// ```rust /// use ockam_core::{AllowAll, Result, Worker, worker}; @@ -234,19 +232,9 @@ impl Context { debug!("Shutting down {} {}", t.str(), addr); // Send the stop request - let (req, mut rx) = match t { - AddressType::Worker => NodeMessage::stop_worker(addr, false), - AddressType::Processor => NodeMessage::stop_processor(addr), - }; - self.sender - .send(req) - .await - .map_err(NodeError::from_send_err)?; - - // Then check that address was properly shut down - rx.recv() - .await - .ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())??; - Ok(()) + match t { + AddressType::Worker => self.router.stop_worker(addr, false).await, + AddressType::Processor => self.router.stop_processor(addr).await, + } } } diff --git a/implementations/rust/ockam/ockam_node/src/executor.rs b/implementations/rust/ockam/ockam_node/src/executor.rs index 10b3ead11df..32e853ab178 100644 --- a/implementations/rust/ockam/ockam_node/src/executor.rs +++ b/implementations/rust/ockam/ockam_node/src/executor.rs @@ -1,12 +1,10 @@ // use crate::message::BaseMessage; -use crate::channel_types::SmallSender; #[cfg(feature = "std")] use crate::runtime; use crate::{ router::{Router, SenderPair}, tokio::runtime::Runtime, - NodeMessage, }; use core::future::Future; use ockam_core::{compat::sync::Arc, Address, Result}; @@ -38,7 +36,7 @@ pub struct Executor { /// Reference to the runtime needed to spawn tasks rt: Arc, /// Main worker and application router - router: Router, + router: Arc, /// Metrics collection endpoint #[cfg(feature = "metrics")] metrics: Arc, @@ -47,7 +45,7 @@ pub struct Executor { impl Executor { /// Create a new Ockam node [`Executor`] instance pub fn new(rt: Arc, flow_controls: &FlowControls) -> Self { - let router = Router::new(flow_controls); + let router = Arc::new(Router::new(flow_controls)); #[cfg(feature = "metrics")] let metrics = Metrics::new(&rt, router.get_metrics_readout()); Self { @@ -58,18 +56,13 @@ impl Executor { } } - /// Start the router asynchronously - pub async fn start_router(&mut self) -> Result<()> { - self.router.run().await - } - /// Get access to the internal message sender - pub(crate) fn sender(&self) -> SmallSender { - self.router.sender() + pub(crate) fn router(&self) -> Arc { + self.router.clone() } /// Initialize the root application worker - pub(crate) fn initialize_system>(&mut self, address: S, senders: SenderPair) { + pub(crate) fn initialize_system>(&self, address: S, senders: SenderPair) { trace!("Initializing node executor"); self.router.init(address.into(), senders); } @@ -100,13 +93,9 @@ impl Executor { ); // Spawn user code second - let sender = self.sender(); - let future = Executor::wrapper(sender, future); + let future = Executor::wrapper(self.router.clone(), future); let join_body = self.rt.spawn(future.with_current_context()); - // Then block on the execution of the router - self.rt.block_on(self.router.run().with_current_context())?; - // Shut down metrics collector #[cfg(feature = "metrics")] alive.fetch_or(true, Ordering::Acquire); @@ -149,9 +138,6 @@ impl Executor { // Spawn user code second let join_body = self.rt.spawn(future.with_current_context()); - // Then block on the execution of the router - self.rt.block_on(self.router.run().with_current_context())?; - // Shut down metrics collector #[cfg(feature = "metrics")] alive.fetch_or(true, Ordering::Acquire); @@ -167,10 +153,7 @@ impl Executor { /// Wrapper around the user provided future that will shut down the node on error #[cfg(feature = "std")] - async fn wrapper( - sender: SmallSender, - future: F, - ) -> core::result::Result + async fn wrapper(router: Arc, future: F) -> core::result::Result where F: Future> + Send + 'static, { @@ -184,9 +167,9 @@ impl Executor { // I think way AbortNode is implemented right now, it is more of an // internal/private message not meant to be directly used, without changing the // router state. - let (req, mut rx) = NodeMessage::stop_node(crate::ShutdownType::Graceful(1)); - let _ = sender.send(req).await; - let _ = rx.recv().await; + if let Err(error) = router.stop_graceful(1).await { + error!("Failed to stop gracefully: {}", error); + } Err(e) } } diff --git a/implementations/rust/ockam/ockam_node/src/lib.rs b/implementations/rust/ockam/ockam_node/src/lib.rs index 511af9a735a..06dcfe71497 100644 --- a/implementations/rust/ockam/ockam_node/src/lib.rs +++ b/implementations/rust/ockam/ockam_node/src/lib.rs @@ -57,11 +57,11 @@ mod context; mod delayed; mod error; mod executor; -mod messages; mod node; mod processor_builder; mod relay; mod router; +mod shutdown; /// Support for storing persistent values pub mod storage; @@ -79,8 +79,8 @@ pub use context::*; pub use delayed::*; pub use error::*; pub use executor::*; -pub use messages::*; pub use processor_builder::ProcessorBuilder; +pub use shutdown::*; #[cfg(feature = "std")] pub use storage::database; pub use worker_builder::WorkerBuilder; diff --git a/implementations/rust/ockam/ockam_node/src/messages.rs b/implementations/rust/ockam/ockam_node/src/messages.rs deleted file mode 100644 index a0cbef00e29..00000000000 --- a/implementations/rust/ockam/ockam_node/src/messages.rs +++ /dev/null @@ -1,399 +0,0 @@ -use crate::channel_types::{small_channel, MessageSender, SmallReceiver, SmallSender}; -use crate::{ - error::{NodeError, NodeReason, RouterReason, WorkerReason}, - router::SenderPair, -}; -use core::{fmt, sync::atomic::AtomicUsize}; -use ockam_core::compat::{string::String, sync::Arc, vec::Vec}; -use ockam_core::{ - Address, AddressAndMetadata, AddressMetadata, Error, RelayMessage, Result, TransportType, -}; - -/// Messages sent from the Node to the Executor -#[derive(Debug)] -pub enum NodeMessage { - /// Start a new worker and store the send handle - StartWorker { - /// The set of addresses in use by this worker - addrs: Vec
, - /// Pair of senders to the worker relay (msgs and ctrl) - senders: SenderPair, - /// A detached context/ "worker" runs no relay state - detached: bool, - /// A mechanism to read channel fill-state for a worker - mailbox_count: Arc, - /// Reply channel for command confirmation - reply: SmallSender, - /// List of metadata for each address - addresses_metadata: Vec, - }, - /// Return a list of all worker addresses - ListWorkers(SmallSender), - /// Return a list of all worker addresses - IsWorkerRegisteredAt(SmallSender, Address), - /// Add an existing address to a cluster - SetCluster(Address, String, SmallSender), - /// Stop an existing worker - StopWorker(Address, bool, SmallSender), - /// Start a new processor - StartProcessor { - /// The set of addresses in use by this processor - addrs: Vec
, - /// Pair of senders to the worker relay (msgs and ctrl) - senders: SenderPair, - /// Reply channel for command confirmation - reply: SmallSender, - /// List of metadata for each address - addresses_metadata: Vec, - }, - /// Stop an existing processor - StopProcessor(Address, SmallSender), - /// Stop the node (and all workers) - StopNode(ShutdownType, SmallSender), - /// Immediately stop the node runtime - AbortNode, - /// Let the router know a particular address has stopped - StopAck(Address), - /// Request the sender for a worker address - SenderReq(Address, SmallSender), - /// Register a new router for a route id type - Router(TransportType, Address, SmallSender), - /// Message the router to set an address as "ready" - SetReady(Address), - /// Check whether an address has been marked as "ready" - CheckReady(Address, SmallSender), - /// Find the terminal address for a given route - FindTerminalAddress(Vec
, SmallSender), - /// Get address metadata - GetMetadata(Address, SmallSender), -} - -impl fmt::Display for NodeMessage { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - NodeMessage::StartWorker { .. } => write!(f, "StartWorker"), - NodeMessage::ListWorkers(_) => write!(f, "ListWorkers"), - NodeMessage::IsWorkerRegisteredAt(_, _) => write!(f, "IsWorkerRegisteredAt"), - NodeMessage::SetCluster(_, _, _) => write!(f, "SetCluster"), - NodeMessage::StopWorker(_, _, _) => write!(f, "StopWorker"), - NodeMessage::StartProcessor { .. } => write!(f, "StartProcessor"), - NodeMessage::StopProcessor(_, _) => write!(f, "StopProcessor"), - NodeMessage::StopNode(_, _) => write!(f, "StopNode"), - NodeMessage::AbortNode => write!(f, "AbortNode"), - NodeMessage::StopAck(_) => write!(f, "StopAck"), - NodeMessage::SenderReq(_, _) => write!(f, "SenderReq"), - NodeMessage::Router(_, _, _) => write!(f, "Router"), - NodeMessage::SetReady(_) => write!(f, "SetReady"), - NodeMessage::CheckReady(_, _) => write!(f, "CheckReady"), - NodeMessage::FindTerminalAddress(_, _) => write!(f, "FindTerminalAddress"), - NodeMessage::GetMetadata(_, _) => write!(f, "ReadMetadata"), - } - } -} - -impl NodeMessage { - /// Create a start worker message - /// - /// * `senders`: message and command senders for the relay - /// - /// * `detached`: indicate whether this worker address has a full - /// relay behind it that can respond to shutdown - /// commands. Setting this to `true` will disable - /// stop ACK support in the router - pub fn start_worker( - addrs: Vec
, - senders: SenderPair, - detached: bool, - mailbox_count: Arc, - metadata: Vec, - ) -> (Self, SmallReceiver) { - let (reply, rx) = small_channel(); - ( - Self::StartWorker { - addrs, - senders, - detached, - mailbox_count, - reply, - addresses_metadata: metadata, - }, - rx, - ) - } - - /// Create a start worker message - pub fn start_processor( - addrs: Vec
, - senders: SenderPair, - metadata: Vec, - ) -> (Self, SmallReceiver) { - let (tx, rx) = small_channel(); - ( - Self::StartProcessor { - addrs, - senders, - reply: tx, - addresses_metadata: metadata, - }, - rx, - ) - } - - /// Create a stop worker message and reply receiver - pub fn stop_processor(address: Address) -> (Self, SmallReceiver) { - let (tx, rx) = small_channel(); - (Self::StopProcessor(address, tx), rx) - } - - /// Create a list worker message and reply receiver - pub fn list_workers() -> (Self, SmallReceiver) { - let (tx, rx) = small_channel(); - (Self::ListWorkers(tx), rx) - } - - /// Check if a worker is already registered at a given address - pub fn is_worker_registered_at(address: Address) -> (Self, SmallReceiver) { - let (tx, rx) = small_channel(); - (Self::IsWorkerRegisteredAt(tx, address), rx) - } - - /// Create a set cluster message and reply receiver - pub fn set_cluster(addr: Address, label: String) -> (Self, SmallReceiver) { - let (tx, rx) = small_channel(); - (Self::SetCluster(addr, label, tx), rx) - } - - /// Create a stop worker message and reply receiver - pub fn stop_worker(address: Address, detached: bool) -> (Self, SmallReceiver) { - let (tx, rx) = small_channel(); - (Self::StopWorker(address, detached, tx), rx) - } - - /// Create a stop node message - pub fn stop_node(tt: ShutdownType) -> (Self, SmallReceiver) { - let (tx, rx) = small_channel(); - (Self::StopNode(tt, tx), rx) - } - - /// Create a sender request message and reply receiver - pub fn sender_request(route: Address) -> (Self, SmallReceiver) { - let (tx, rx) = small_channel(); - (Self::SenderReq(route, tx), rx) - } - - /// Create a SetReady message and reply receiver - pub fn set_ready(addr: Address) -> Self { - Self::SetReady(addr) - } - - /// Create a GetReady message and reply receiver - pub fn get_ready(addr: Address) -> (Self, SmallReceiver) { - let (tx, rx) = small_channel(); - (Self::CheckReady(addr, tx), rx) - } - - /// Creates a [NodeMessage::FindTerminalAddress] message and reply receiver - pub fn find_terminal_address(addrs: Vec
) -> (Self, SmallReceiver) { - let (tx, rx) = small_channel(); - (Self::FindTerminalAddress(addrs, tx), rx) - } - - /// Creates a [NodeMessage::ReadMetadata] message and reply receiver - pub fn get_metadata(address: Address) -> (Self, SmallReceiver) { - let (tx, rx) = small_channel(); - (Self::GetMetadata(address, tx), rx) - } -} - -/// The reply/result of a Node -pub type NodeReplyResult = core::result::Result; - -/// Successful return values from a router command -#[derive(Debug)] -pub enum RouterReply { - /// Success with no payload - Ok, - /// A list of worker addresses - Workers(Vec
), - /// Return true if there is a worker already registered at a given Address - WorkerIsRegisteredAtAddress(bool), - /// Message sender to a specific worker - Sender { - /// The address a message is being sent to - addr: Address, - /// The relay sender - sender: MessageSender, - }, - /// Indicate the 'ready' state of an address - State(bool), - /// Metadata value of the first terminal address in the route - TerminalAddress(Option), - /// Optional metadata value - Metadata(Option), -} - -/// Specify the type of node shutdown -/// -/// For most users `ShutdownType::Graceful()` is recommended. The -/// `Default` implementation uses a 1 second timeout. -#[derive(Debug, Copy, Clone)] -#[non_exhaustive] -pub enum ShutdownType { - /// Execute a graceful shutdown given a maximum timeout - /// - /// The following steps will be taken by the internal router - /// during graceful shutdown procedure: - /// - /// * Signal clusterless workers to stop - /// * Wait for shutdown ACK hooks from worker set - /// * Signal worker clusters in reverse-creation order to stop - /// * Wait for shutdown ACK hooks from each cluster before moving onto the - /// next - /// * All shutdown-signaled workers may process their entire mailbox, - /// while not allowing new messages to be queued - /// - /// Graceful shutdown procedure will be pre-maturely terminated - /// when reaching the timeout (failover into `Immediate` - /// strategy). **A given timeout of `0` will wait forever!** - Graceful(u8), - /// Immediately shutdown workers and run shutdown hooks - /// - /// This strategy can lead to data loss: - /// - /// * Unhandled mailbox messages will be dropped - /// * Shutdown hooks may not be able to send messages - /// - /// This strategy is not recommended for general use, but will be - /// selected as a failover, if the `Graceful` strategy reaches its - /// timeout limit. - Immediate, -} - -impl Default for ShutdownType { - fn default() -> Self { - Self::Graceful(1) - } -} - -impl RouterReply { - /// Return [RouterReply::Ok] - pub fn ok() -> NodeReplyResult { - Ok(RouterReply::Ok) - } - - /// Return [RouterReply::State] - pub fn state(b: bool) -> NodeReplyResult { - Ok(RouterReply::State(b)) - } - - /// Return [NodeError::Address] not found - #[track_caller] - pub fn no_such_address(a: Address) -> NodeReplyResult { - Err(NodeError::Address(a).not_found()) - } - - /// Return [NodeError::Address] already exists for the given address - #[track_caller] - pub fn worker_exists(a: Address) -> NodeReplyResult { - Err(NodeError::Address(a).already_exists()) - } - - /// Return [NodeError::RouterState] already exists - #[track_caller] - pub fn router_exists() -> NodeReplyResult { - Err(NodeError::RouterState(RouterReason::Duplicate).already_exists()) - } - - /// Return [NodeError::NodeState] conflict - #[track_caller] - pub fn node_rejected(reason: NodeReason) -> NodeReplyResult { - Err(NodeError::NodeState(reason).conflict()) - } - - /// Return [NodeError::WorkerState] conflict - #[track_caller] - pub fn worker_rejected(reason: WorkerReason) -> NodeReplyResult { - Err(NodeError::WorkerState(reason).conflict()) - } - - /// Return [RouterReply::Workers] for the given addresses - pub fn workers(v: Vec
) -> NodeReplyResult { - Ok(Self::Workers(v)) - } - - /// Return [RouterReply::WorkerIsRegistered] for a given address - pub fn worker_is_registered_at_address(registered: bool) -> NodeReplyResult { - Ok(Self::WorkerIsRegisteredAtAddress(registered)) - } - - /// Returns [RouterReply::TerminalAddress] for the given address - pub fn terminal_address(address: Option) -> NodeReplyResult { - Ok(Self::TerminalAddress(address)) - } - - /// Returns [RouterReply::Metadata] for the given metadata - pub fn metadata(value: Option) -> NodeReplyResult { - Ok(Self::Metadata(value)) - } - - /// Return [RouterReply::Sender] for the given information - pub fn sender(addr: Address, sender: MessageSender) -> NodeReplyResult { - Ok(RouterReply::Sender { addr, sender }) - } - - /// Consume the wrapper and return [RouterReply::Sender] - pub fn take_sender(self) -> Result<(Address, MessageSender)> { - match self { - Self::Sender { addr, sender } => Ok((addr, sender)), - _ => Err(NodeError::NodeState(NodeReason::Unknown).internal()), - } - } - - /// Consume the wrapper and return [RouterReply::Workers] - pub fn take_workers(self) -> Result> { - match self { - Self::Workers(w) => Ok(w), - _ => Err(NodeError::NodeState(NodeReason::Unknown).internal()), - } - } - - /// Consume the wrapper and return a bool - pub fn take_worker_is_registered(self) -> Result { - match self { - Self::WorkerIsRegisteredAtAddress(b) => Ok(b), - _ => Err(NodeError::NodeState(NodeReason::Unknown).internal()), - } - } - - /// Consume the wrapper and return [RouterReply::State] - pub fn take_state(self) -> Result { - match self { - Self::State(b) => Ok(b), - _ => Err(NodeError::NodeState(NodeReason::Unknown).internal()), - } - } - - /// Consumes the wrapper and returns [RouterReply::TerminalAddress] - pub fn take_terminal_address(self) -> Result> { - match self { - Self::TerminalAddress(addr) => Ok(addr), - _ => Err(NodeError::NodeState(NodeReason::Unknown).internal()), - } - } - - /// Consumes the wrapper and returns [RouterReply::Metadata] - pub fn take_metadata(self) -> Result> { - match self { - Self::Metadata(value) => Ok(value), - _ => Err(NodeError::NodeState(NodeReason::Unknown).internal()), - } - } - - /// Returns Ok if self is [RouterReply::Ok] - pub fn is_ok(self) -> Result<()> { - match self { - Self::Ok => Ok(()), - _ => Err(NodeError::NodeState(NodeReason::Unknown).internal()), - } - } -} diff --git a/implementations/rust/ockam/ockam_node/src/node.rs b/implementations/rust/ockam/ockam_node/src/node.rs index e46c9707e66..f1482a56c16 100644 --- a/implementations/rust/ockam/ockam_node/src/node.rs +++ b/implementations/rust/ockam/ockam_node/src/node.rs @@ -116,7 +116,7 @@ impl NodeBuilder { #[cfg(not(feature = "std"))] Arc::new(Runtime::new().expect("cannot initialize the tokio runtime")) }); - let mut exe = Executor::new(rt.clone(), &flow_controls); + let exe = Executor::new(rt.clone(), &flow_controls); let addr: Address = "app".into(); #[cfg(feature = "watchdog")] @@ -129,7 +129,7 @@ impl NodeBuilder { // messages from workers, and to buffer incoming transcoded data. let (ctx, sender, _) = Context::new( rt.handle().clone(), - exe.sender(), + exe.router(), Mailboxes::new( Mailbox::new(addr, Arc::new(AllowAll), Arc::new(AllowAll)), vec![], diff --git a/implementations/rust/ockam/ockam_node/src/processor_builder.rs b/implementations/rust/ockam/ockam_node/src/processor_builder.rs index 323194485a4..4edc3232cc4 100644 --- a/implementations/rust/ockam/ockam_node/src/processor_builder.rs +++ b/implementations/rust/ockam/ockam_node/src/processor_builder.rs @@ -1,11 +1,9 @@ use crate::debugger; -use crate::error::{NodeError, NodeReason}; -use crate::{relay::ProcessorRelay, Context, NodeMessage}; +use crate::{relay::ProcessorRelay, Context}; use alloc::string::String; use ockam_core::compat::{sync::Arc, vec::Vec}; use ockam_core::{ - errcode::{Kind, Origin}, - Address, AddressAndMetadata, AddressMetadata, DenyAll, Error, IncomingAccessControl, Mailboxes, + Address, AddressAndMetadata, AddressMetadata, DenyAll, IncomingAccessControl, Mailboxes, OutgoingAccessControl, Processor, Result, }; @@ -268,18 +266,8 @@ where debugger::log_inherit_context("PROCESSOR", context, &ctx); - // Send start request to router - let (msg, mut rx) = NodeMessage::start_processor(addresses, sender, metadata); - context - .sender() - .send(msg) - .await - .map_err(|e| Error::new(Origin::Node, Kind::Invalid, e))?; - - // Wait for the actual return code - rx.recv() - .await - .ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())??; + let router = context.router(); + router.start_processor(addresses, sender, metadata)?; // Then initialise the processor message relay ProcessorRelay::

::init(context.runtime(), processor, ctx, ctrl_rx); diff --git a/implementations/rust/ockam/ockam_node/src/relay/mod.rs b/implementations/rust/ockam/ockam_node/src/relay/mod.rs index 17435eddd44..67ed3013d35 100644 --- a/implementations/rust/ockam/ockam_node/src/relay/mod.rs +++ b/implementations/rust/ockam/ockam_node/src/relay/mod.rs @@ -7,8 +7,6 @@ pub use worker_relay::*; /// A signal type used to communicate between router and worker relay #[derive(Clone, Debug)] pub enum CtrlSignal { - /// Interrupt current message execution but resume run-loop - Interrupt, /// Interrupt current message execution and shut down InterruptStop, } diff --git a/implementations/rust/ockam/ockam_node/src/relay/processor_relay.rs b/implementations/rust/ockam/ockam_node/src/relay/processor_relay.rs index 817f2bd21db..1a4e200a753 100644 --- a/implementations/rust/ockam/ockam_node/src/relay/processor_relay.rs +++ b/implementations/rust/ockam/ockam_node/src/relay/processor_relay.rs @@ -38,10 +38,6 @@ where } } - if let Err(e) = ctx.set_ready().await { - error!("Failed to mark processor '{}' as 'ready': {}", ctx_addr, e); - } - // This future encodes the main processor run loop logic let run_loop = async { loop { @@ -119,7 +115,7 @@ async fn shutdown_and_stop_ack

( // Finally send the router a stop ACK -- log errors trace!("Sending shutdown ACK"); - if let Err(e) = ctx.send_stop_ack().await { - error!("Error occurred during stop ACK sending: {}", e); - } + ctx.stop_ack().await.unwrap_or_else(|e| { + error!("Failed to send stop ACK for '{}': {}", ctx_addr, e); + }); } diff --git a/implementations/rust/ockam/ockam_node/src/relay/worker_relay.rs b/implementations/rust/ockam/ockam_node/src/relay/worker_relay.rs index c2b2ccfd1ee..0d2aaa3adbb 100644 --- a/implementations/rust/ockam/ockam_node/src/relay/worker_relay.rs +++ b/implementations/rust/ockam/ockam_node/src/relay/worker_relay.rs @@ -111,10 +111,6 @@ where let address = self.ctx.address(); - if let Err(e) = self.ctx.set_ready().await { - error!("Failed to mark worker '{}' as 'ready': {}", address, e); - } - #[cfg(feature = "std")] loop { crate::tokio::select! { @@ -180,9 +176,13 @@ where // Finally send the router a stop ACK -- log errors trace!("Sending shutdown ACK"); - if let Err(e) = self.ctx.send_stop_ack().await { - error!("Error occurred during stop ACK sending: {}", e); - } + self.ctx.stop_ack().await.unwrap_or_else(|e| { + error!( + "Failed to send stop ACK for worker '{}': {}", + self.ctx.address(), + e + ) + }); } /// Build and spawn a new worker relay, returning a send handle to it diff --git a/implementations/rust/ockam/ockam_node/src/router/mod.rs b/implementations/rust/ockam/ockam_node/src/router/mod.rs index 1644ccbdd56..ffc60516e18 100644 --- a/implementations/rust/ockam/ockam_node/src/router/mod.rs +++ b/implementations/rust/ockam/ockam_node/src/router/mod.rs @@ -7,21 +7,20 @@ mod stop_processor; mod stop_worker; mod utils; -#[cfg(feature = "metrics")] -use std::sync::atomic::AtomicUsize; +use core::sync::atomic::AtomicUsize; use record::{AddressRecord, InternalMap, WorkerMeta}; use state::{NodeState, RouterState}; -use crate::channel_types::{router_channel, MessageSender, RouterReceiver, SmallSender}; -use crate::{ - error::{NodeError, NodeReason}, - relay::CtrlSignal, - NodeMessage, NodeReplyResult, RouterReply, ShutdownType, -}; +use crate::channel_types::{MessageSender, SmallSender}; +use crate::relay::CtrlSignal; +use ockam_core::compat::sync::RwLock as SyncRwLock; use ockam_core::compat::{collections::BTreeMap, sync::Arc}; +use ockam_core::errcode::{Kind, Origin}; use ockam_core::flow_control::FlowControls; -use ockam_core::{Address, RelayMessage, Result, TransportType}; +use ockam_core::{ + Address, AddressAndMetadata, AddressMetadata, Error, RelayMessage, Result, TransportType, +}; /// A pair of senders to a worker relay #[derive(Debug)] @@ -45,9 +44,7 @@ pub struct Router { /// Internal address state map: InternalMap, /// Externally registered router components - external: BTreeMap, - /// Receiver for messages from node - receiver: Option>, + external: SyncRwLock>, } enum RouteType { @@ -65,12 +62,10 @@ fn determine_type(next: &Address) -> RouteType { impl Router { pub fn new(flow_controls: &FlowControls) -> Self { - let (sender, receiver) = router_channel(); Self { - state: RouterState::new(sender), + state: RouterState::new(), map: InternalMap::new(flow_controls), - external: BTreeMap::new(), - receiver: Some(receiver), + external: Default::default(), } } @@ -79,14 +74,7 @@ impl Router { self.map.get_metrics() } - /// Get the router receiver - fn get_recv(&mut self) -> Result<&mut RouterReceiver> { - self.receiver - .as_mut() - .ok_or_else(|| NodeError::NodeState(NodeReason::Corrupt).internal()) - } - - pub fn init(&mut self, addr: Address, senders: SenderPair) { + pub fn init(&self, addr: Address, senders: SenderPair) { self.map.insert_address_record( addr.clone(), AddressRecord::new( @@ -99,262 +87,312 @@ impl Router { detached: true, }, ), + vec![], ); - self.map.insert_alias(&addr, &addr); } - pub fn sender(&self) -> SmallSender { - self.state.sender.clone() + fn check_addr_not_exist(&self, addr: &Address) -> Result<()> { + self.map.check_addr_not_exist(addr) } - /// A utility facade to hide failures that are not really failures - pub async fn run(&mut self) -> Result<()> { - match self.run_inner().await { - // Everything is A-OK :) - Ok(()) => Ok(()), - // If the router has already shut down this failure is a - // red-herring and should be ignored -- we _have_ just - // terminated all workers and any message still in the - // system will crash the whole runtime. - Err(_) if !self.state.running() => { - warn!("One (or more) internal I/O failures caused by ungraceful router shutdown!"); - Ok(()) - } - // If we _are_ still actually running then this is a real - // failure and needs to be escalated - e => e, - } + pub fn set_cluster(&self, addr: Address, label: String) -> Result<()> { + self.map.set_cluster(label, addr) } - async fn check_addr_not_exist( - &mut self, - addr: &Address, - reply: &SmallSender, - ) -> Result<()> { - match self.map.address_records_map().get(addr) { - Some(record) => { - if record.check() { - let node = NodeError::Address(addr.clone()); - reply - .send(Err(node.clone().already_exists())) - .await - .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?; - - Err(node.already_exists()) - } else { - self.map.free_address(addr.clone()); - Ok(()) - } - } - None => Ok(()), - } + pub fn list_workers(&self) -> Vec

{ + self.map.list_workers() } - async fn handle_msg(&mut self, msg: NodeMessage) -> Result { - #[cfg(feature = "metrics")] - self.map.update_metrics(); // Possibly remove this from the hot path? - - use NodeMessage::*; - #[cfg(feature = "metrics")] - trace!( - "Current router alloc: {} addresses", - self.map.get_addr_count() - ); - match msg { - // Successful router registration command - Router(tt, addr, sender) if !self.external.contains_key(&tt) => { - // TODO: Remove after other transport implementations are moved to new architecture - trace!("Registering new router for type {}", tt); - - self.external.insert(tt, addr); - sender - .send(RouterReply::ok()) - .await - .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())? - } - // Rejected router registration command - Router(_, _, sender) => { - // TODO: Remove after other transport implementations are moved to new architecture - sender - .send(RouterReply::router_exists()) - .await - .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())? - } - - //// ==! Basic worker control - StartWorker { - addrs, - senders, - detached, - mailbox_count, - ref reply, - addresses_metadata, - } => { - start_worker::exec( - self, - addrs, - senders, - detached, - addresses_metadata, - mailbox_count, - reply, - ) - .await? - } - StopWorker(ref addr, ref detached, ref reply) => { - stop_worker::exec(self, addr, *detached, reply).await? - } - - //// ==! Basic processor control - StartProcessor { - addrs, - senders, - ref reply, - addresses_metadata, - } => start_processor::exec(self, addrs, senders, addresses_metadata, reply).await?, - StopProcessor(ref addr, ref reply) => stop_processor::exec(self, addr, reply).await?, + pub fn is_worker_registered_at(&self, address: &Address) -> bool { + self.map.is_worker_registered_at(address) + } - //// ==! Core node controls - StopNode(ShutdownType::Graceful(timeout), reply) => { - // This sets state to stopping, and the sends the AbortNode message - if shutdown::graceful(self, timeout, reply).await? { - info!("No more workers left. Goodbye!"); - if let Some(sender) = self.state.stop_reply() { - sender - .send(RouterReply::ok()) - .await - .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?; - return Ok(true); - }; - } - } - StopNode(ShutdownType::Immediate, reply) => { - shutdown::immediate(self, reply).await?; - return Ok(true); + pub async fn stop_ack(&self, addr: Address) -> Result<()> { + let running = self.state.running(); + debug!(%running, "Handling shutdown ACK for {}", addr); + self.map.free_address(addr); + + if !running { + // The router is shutting down + if !self.map.cluster_done() { + // We are not done yet. + // The last worker should call another `stop_ack` + return Ok(()); } - AbortNode => { - if let Some(sender) = self.state.stop_reply() { - sender - .send(RouterReply::ok()) - .await - .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?; - self.map.clear_address_records_map(); - return Ok(true); - } + // Check if there is a next cluster + let finished = self.stop_next_cluster().await?; + if finished { + info!("No more workers left. Goodbye!"); + self.state.kill(); } + } + Ok(()) + } - StopAck(addr) if self.state.running() => { - trace!("Received shutdown ACK for address {}", addr); - self.map.free_address(addr); - } + pub fn find_terminal_address(&self, addresses: Vec
) -> Option { + self.map.find_terminal_address(&addresses) + } - StopAck(addr) => { - if shutdown::ack(self, addr).await? { - info!("No more workers left. Goodbye!"); - if let Some(sender) = self.state.stop_reply() { - sender - .send(RouterReply::ok()) - .await - .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?; - return Ok(true); - } - } - } + pub fn get_address_metadata(&self, address: &Address) -> Option { + self.map.get_address_metadata(address) + } - ListWorkers(sender) => sender - .send(RouterReply::workers( - self.map.address_records_map().keys().cloned().collect(), - )) - .await - .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?, + pub fn start_worker( + &self, + addrs: Vec
, + senders: SenderPair, + detached: bool, + addresses_metadata: Vec, + mailbox_count: Arc, + ) -> Result<()> { + start_worker::exec( + self, + addrs, + senders, + detached, + addresses_metadata, + mailbox_count, + ) + } - IsWorkerRegisteredAt(sender, address) => sender - .send(RouterReply::worker_is_registered_at_address( - self.map.address_records_map().contains_key(&address), - )) - .await - .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?, + pub fn register_router(&self, tt: TransportType, addr: Address) -> Result<()> { + let mut guard = self.external.write().unwrap(); + if let alloc::collections::btree_map::Entry::Vacant(e) = guard.entry(tt) { + e.insert(addr); + Ok(()) + } else { + // already exists + Err(Error::new( + Origin::Node, + Kind::AlreadyExists, + "Router already exists", + )) + } + } - SetCluster(addr, label, reply) => { - debug!("Setting cluster on address {}", addr); - let msg = self.map.set_cluster(label, addr); - reply - .send(msg) - .await - .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?; - } + pub fn resolve(&self, addr: &Address) -> Result> { + let addr = match determine_type(addr) { + RouteType::Internal => addr, + // TODO: Remove after other transport implementations are moved to new architecture + RouteType::External(tt) => &utils::router_addr(self, tt)?, + }; + self.map.resolve(addr) + } - SetReady(addr) => { - trace!("Marking address {} as ready!", addr); - match self.map.set_ready(addr) { - Err(e) => warn!("Failed to set address as ready: {}", e), - Ok(waiting) => { - for sender in waiting { - sender.send(RouterReply::ok()).await.map_err(|_| { - NodeError::NodeState(NodeReason::Unknown).internal() - })?; - } - } - } - } + pub fn stop_now(&self) { + shutdown::immediate(self) + } - CheckReady(addr, reply) => { - let ready = self.map.get_ready(addr, reply.clone()); - if ready { - reply - .send(RouterReply::ok()) - .await - .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?; - } - } + pub async fn stop_graceful(self: Arc, timeout: u8) -> Result<()> { + shutdown::graceful(self, timeout).await.map(|_| ()) + } - // Handle route/ sender requests - SenderReq(addr, ref reply) => match determine_type(&addr) { - RouteType::Internal => utils::resolve(self, addr, reply).await?, - // TODO: Remove after other transport implementations are moved to new architecture - RouteType::External(tt) => { - let addr = utils::router_addr(self, tt)?; - utils::resolve(self, addr, reply).await? - } - }, - FindTerminalAddress(ref addresses, ref reply) => { - let terminal_address = self.map.find_terminal_address(addresses); - reply - .send(RouterReply::terminal_address(terminal_address)) - .await - .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?; - } - GetMetadata(ref address, ref reply) => { - let meta = self.map.get_address_metadata(address); - reply - .send(RouterReply::metadata(meta)) - .await - .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?; - } - } + pub async fn stop_worker(&self, addr: Address, detached: bool) -> Result<()> { + stop_worker::exec(self, &addr, detached).await + } - Ok(false) + pub fn start_processor( + &self, + addrs: Vec
, + senders: SenderPair, + addresses_metadata: Vec, + ) -> Result<()> { + start_processor::exec(self, addrs, senders, addresses_metadata) } - /// Block current task running this router. Return fatal errors - async fn run_inner(&mut self) -> Result<()> { - while let Some(msg) = self.get_recv()?.recv().await { - let msg_str = format!("{}", msg); - match self.handle_msg(msg).await { - Ok(should_break) => { - if should_break { - // We drop the receiver end here - self.receiver.take(); - break; - } - } - Err(err) => { - debug!("Router error: {} while handling {}", err, msg_str); - } - } - } + pub async fn stop_processor(&self, addr: Address) -> Result<()> { + stop_processor::exec(self, &addr).await + } - Ok(()) + fn abort_node(&self) { + self.map.clear_address_records_map(); } + + // async fn handle_msg(&mut self, msg: NodeMessage) -> Result { + // #[cfg(feature = "metrics")] + // self.map.update_metrics(); // Possibly remove this from the hot path? + // + // use NodeMessage::*; + // #[cfg(feature = "metrics")] + // trace!( + // "Current router alloc: {} addresses", + // self.map.get_addr_count() + // ); + // match msg { + // // Successful router registration command + // Router(tt, addr, sender) if !self.external.contains_key(&tt) => { + // // TODO: Remove after other transport implementations are moved to new architecture + // trace!("Registering new router for type {}", tt); + // + // self.external.insert(tt, addr); + // sender + // .send(RouterReply::ok()) + // .await + // .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())? + // } + // // Rejected router registration command + // Router(_, _, sender) => { + // // TODO: Remove after other transport implementations are moved to new architecture + // sender + // .send(RouterReply::router_exists()) + // .await + // .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())? + // } + // + // //// ==! Basic worker control + // StartWorker { + // addrs, + // senders, + // detached, + // mailbox_count, + // ref reply, + // addresses_metadata, + // } => { + // start_worker::exec( + // self, + // addrs, + // senders, + // detached, + // addresses_metadata, + // mailbox_count, + // reply, + // ) + // .await? + // } + // StopWorker(ref addr, ref detached, ref reply) => { + // stop_worker::exec(self, addr, *detached, reply).await? + // } + // + // //// ==! Basic processor control + // StartProcessor { + // addrs, + // senders, + // ref reply, + // addresses_metadata, + // } => start_processor::exec(self, addrs, senders, addresses_metadata, reply).await?, + // StopProcessor(ref addr, ref reply) => stop_processor::exec(self, addr, reply).await?, + // + // //// ==! Core node controls + // StopNode(ShutdownType::Graceful(timeout), reply) => { + // // This sets state to stopping, and the sends the AbortNode message + // if shutdown::graceful(self, timeout, reply).await? { + // info!("No more workers left. Goodbye!"); + // if let Some(sender) = self.state.stop_reply() { + // sender + // .send(RouterReply::ok()) + // .await + // .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?; + // return Ok(true); + // }; + // } + // } + // StopNode(ShutdownType::Immediate, reply) => { + // shutdown::immediate(self, reply).await?; + // return Ok(true); + // } + // + // AbortNode => { + // if let Some(sender) = self.state.stop_reply() { + // sender + // .send(RouterReply::ok()) + // .await + // .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?; + // self.map.clear_address_records_map(); + // return Ok(true); + // } + // } + // + // StopAck(addr) if self.state.running() => { + // trace!("Received shutdown ACK for address {}", addr); + // self.map.free_address(addr); + // } + // + // StopAck(addr) => { + // if shutdown::ack(self, addr).await? { + // info!("No more workers left. Goodbye!"); + // if let Some(sender) = self.state.stop_reply() { + // sender + // .send(RouterReply::ok()) + // .await + // .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?; + // return Ok(true); + // } + // } + // } + // + // ListWorkers(sender) => sender + // .send(RouterReply::workers( + // self.map.address_records_map().keys().cloned().collect(), + // )) + // .await + // .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?, + // + // IsWorkerRegisteredAt(sender, address) => sender + // .send(RouterReply::worker_is_registered_at_address( + // self.map.address_records_map().contains_key(&address), + // )) + // .await + // .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?, + // + // SetCluster(addr, label, reply) => { + // debug!("Setting cluster on address {}", addr); + // let msg = self.map.set_cluster(label, addr); + // reply + // .send(msg) + // .await + // .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?; + // } + // + // SetReady(addr) => { + // trace!("Marking address {} as ready!", addr); + // match self.map.set_ready(addr) { + // Err(e) => warn!("Failed to set address as ready: {}", e), + // Ok(waiting) => { + // for sender in waiting { + // sender.send(RouterReply::ok()).await.map_err(|_| { + // NodeError::NodeState(NodeReason::Unknown).internal() + // })?; + // } + // } + // } + // } + // + // CheckReady(addr, reply) => { + // let ready = self.map.get_ready(addr, reply.clone()); + // if ready { + // reply + // .send(RouterReply::ok()) + // .await + // .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?; + // } + // } + // + // // Handle route/ sender requests + // SenderReq(addr, ref reply) => match determine_type(&addr) { + // RouteType::Internal => utils::resolve(self, addr, reply).await?, + // // TODO: Remove after other transport implementations are moved to new architecture + // RouteType::External(tt) => { + // let addr = utils::router_addr(self, tt)?; + // utils::resolve(self, addr, reply).await? + // } + // }, + // FindTerminalAddress(ref addresses, ref reply) => { + // let terminal_address = self.map.find_terminal_address(addresses); + // reply + // .send(RouterReply::terminal_address(terminal_address)) + // .await + // .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?; + // } + // GetMetadata(ref address, ref reply) => { + // let meta = self.map.get_address_metadata(address); + // reply + // .send(RouterReply::metadata(meta)) + // .await + // .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?; + // } + // } + // + // Ok(false) + // } } diff --git a/implementations/rust/ockam/ockam_node/src/router/record.rs b/implementations/rust/ockam/ockam_node/src/router/record.rs index fe7466a4cf4..ea6a5c17f13 100644 --- a/implementations/rust/ockam/ockam_node/src/router/record.rs +++ b/implementations/rust/ockam/ockam_node/src/router/record.rs @@ -1,10 +1,10 @@ use crate::channel_types::{MessageSender, SmallSender}; +use crate::error::{NodeError, NodeReason}; use crate::relay::CtrlSignal; -use crate::{ - error::{NodeError, NodeReason}, - NodeReplyResult, RouterReply, -}; -use core::sync::atomic::{AtomicUsize, Ordering}; +use core::sync::atomic::{AtomicU8, AtomicUsize, Ordering}; +use ockam_core::compat::sync::Mutex as SyncMutex; +use ockam_core::compat::sync::RwLock as SyncRwLock; +use ockam_core::errcode::{Kind, Origin}; use ockam_core::{ compat::{ collections::{BTreeMap, BTreeSet}, @@ -13,36 +13,73 @@ use ockam_core::{ vec::Vec, }, flow_control::FlowControls, - Address, AddressAndMetadata, AddressMetadata, RelayMessage, Result, + Address, AddressAndMetadata, AddressMetadata, Error, RelayMessage, Result, }; -/// Address states and associated logic -pub struct InternalMap { +#[derive(Default)] +struct AddressMaps { /// Registry of primary address to worker address record state address_records_map: BTreeMap, /// Alias-registry to map arbitrary address to primary addresses alias_map: BTreeMap, /// Registry of arbitrary metadata for each address, lazily populated address_metadata_map: BTreeMap, +} + +/// Address states and associated logic +pub struct InternalMap { + address_maps: SyncRwLock, /// The order in which clusters are allocated and de-allocated - cluster_order: Vec, + cluster_order: SyncMutex>, /// Cluster data records - clusters: BTreeMap>, + clusters: SyncMutex>>, /// Track stop information for Clusters - stopping: BTreeSet
, + stopping: SyncMutex>, /// Access to [`FlowControls`] to clean resources flow_controls: FlowControls, /// Metrics collection and sharing #[cfg(feature = "metrics")] metrics: (Arc, Arc), } +impl InternalMap { + pub(crate) fn resolve(&self, addr: &Address) -> Result> { + let guard = self.address_maps.read().unwrap(); + + let address_record = if let Some(primary_address) = guard.alias_map.get(addr) { + guard.address_records_map.get(primary_address) + } else { + trace!("Resolving worker address '{addr}'... FAILED; no such worker"); + return Err(Error::new(Origin::Node, Kind::NotFound, "No such address") + .context("Address", addr.clone())); + }; + + match address_record { + Some(address_record) => { + if let Some(sender) = address_record.sender() { + trace!("Resolving worker address '{addr}'... OK"); + address_record.increment_msg_count(); + Ok(sender) + } else { + trace!("Resolving worker address '{addr}'... REJECTED; worker shutting down"); + Err( + Error::new(Origin::Node, Kind::Shutdown, "Worker shutting down") + .context("Address", addr.clone()), + ) + } + } + None => { + trace!("Resolving worker address '{addr}'... FAILED; no such worker"); + Err(Error::new(Origin::Node, Kind::NotFound, "No such address") + .context("Address", addr.clone())) + } + } + } +} impl InternalMap { pub(super) fn new(flow_controls: &FlowControls) -> Self { Self { - address_records_map: Default::default(), - alias_map: Default::default(), - address_metadata_map: Default::default(), + address_maps: Default::default(), cluster_order: Default::default(), clusters: Default::default(), stopping: Default::default(), @@ -54,39 +91,92 @@ impl InternalMap { } impl InternalMap { - pub(super) fn clear_address_records_map(&mut self) { - self.address_records_map.clear() + pub(super) fn clear_address_records_map(&self) { + self.address_maps + .write() + .unwrap() + .address_records_map + .clear() } - pub(super) fn get_address_record(&self, primary_address: &Address) -> Option<&AddressRecord> { - self.address_records_map.get(primary_address) - } + pub(super) async fn stop(&self, primary_address: &Address) -> Result<()> { + { + let mut guard = self.stopping.lock().unwrap(); + if guard.contains(primary_address) { + return Ok(()); + } else { + guard.insert(primary_address.clone()); + } + } + + let send_signal_future = { + let guard = self.address_maps.read().unwrap(); + if let Some(record) = guard.address_records_map.get(primary_address) { + record.stop() + } else { + return Err(Error::new(Origin::Node, Kind::NotFound, "No such address") + .context("Address", primary_address.clone())); + } + }; - pub(super) fn get_address_record_mut( - &mut self, - primary_address: &Address, - ) -> Option<&mut AddressRecord> { - self.address_records_map.get_mut(primary_address) + // we can't call `.await` while holding the lock + if let Some(send_signal_future) = send_signal_future { + send_signal_future.await + } else { + Ok(()) + } } - pub(super) fn address_records_map(&self) -> &BTreeMap { - &self.address_records_map + pub(super) fn is_worker_registered_at(&self, primary_address: &Address) -> bool { + self.address_maps + .read() + .unwrap() + .address_records_map + .contains_key(primary_address) + // TODO: we should also check aliases } - pub(super) fn remove_address_record( - &mut self, - primary_address: &Address, - ) -> Option { - self.flow_controls.cleanup_address(primary_address); - self.address_records_map.remove(primary_address) + pub(super) fn list_workers(&self) -> Vec
{ + self.address_maps + .read() + .unwrap() + .address_records_map + .keys() + .cloned() + .collect() } pub(super) fn insert_address_record( - &mut self, + &self, primary_address: Address, record: AddressRecord, - ) -> Option { - self.address_records_map.insert(primary_address, record) + addresses_metadata: Vec, + ) { + let mut guard = self.address_maps.write().unwrap(); + + record.address_set.iter().for_each(|addr| { + guard + .alias_map + .insert(addr.clone(), primary_address.clone()); + }); + + for metadata in addresses_metadata { + if !record.address_set().contains(&metadata.address) { + warn!( + "Address {} is not in the set of addresses", + metadata.address + ); + continue; + } + + let entry = guard + .address_metadata_map + .entry(metadata.address) + .or_default(); + *entry = metadata.metadata; + } + + _ = guard.address_records_map.insert(primary_address, record); } pub(super) fn find_terminal_address( @@ -94,7 +184,10 @@ impl InternalMap { addresses: &[Address], ) -> Option { addresses.iter().find_map(|address| { - self.address_metadata_map + self.address_maps + .read() + .unwrap() + .address_metadata_map .get(address) .filter(|&meta| meta.is_terminal) .map(|meta| AddressAndMetadata { @@ -104,37 +197,35 @@ impl InternalMap { }) } - pub(super) fn set_address_metadata(&mut self, meta: AddressAndMetadata) { - let metadata = self.address_metadata_map.entry(meta.address).or_default(); - *metadata = meta.metadata; - } - pub(super) fn get_address_metadata(&self, address: &Address) -> Option { - self.address_metadata_map.get(address).cloned() - } - - pub(super) fn remove_alias(&mut self, alias_address: &Address) -> Option
{ - self.alias_map.remove(alias_address) - } - - pub(super) fn insert_alias(&mut self, alias_address: &Address, primary_address: &Address) { - _ = self + self.address_maps + .read() + .unwrap() + .address_metadata_map + .get(address) + .cloned() + } + + pub(super) fn get_primary_address(&self, alias_address: &Address) -> Option
{ + self.address_maps + .read() + .unwrap() .alias_map - .insert(alias_address.clone(), primary_address.clone()) - } - - pub(super) fn get_primary_address(&self, alias_address: &Address) -> Option<&Address> { - self.alias_map.get(alias_address) + .get(alias_address) + .cloned() } } impl InternalMap { #[cfg(feature = "metrics")] pub(super) fn update_metrics(&self) { + self.metrics.0.store( + self.address_maps.read().unwrap().address_records_map.len(), + Ordering::Release, + ); self.metrics - .0 - .store(self.address_records_map.len(), Ordering::Release); - self.metrics.1.store(self.clusters.len(), Ordering::Release); + .1 + .store(self.clusters.lock().unwrap().len(), Ordering::Release); } #[cfg(feature = "metrics")] @@ -148,56 +239,46 @@ impl InternalMap { } /// Add an address to a particular cluster - pub(super) fn set_cluster(&mut self, label: String, primary: Address) -> NodeReplyResult { - let rec = self + pub(super) fn set_cluster(&self, label: String, primary: Address) -> Result<()> { + let address_records_guard = self.address_maps.read().unwrap(); + let rec = address_records_guard .address_records_map .get(&primary) .ok_or_else(|| NodeError::Address(primary).not_found())?; + let mut clusters_guard = self.clusters.lock().unwrap(); + // If this is the first time we see this cluster ID - if !self.clusters.contains_key(&label) { - self.clusters.insert(label.clone(), BTreeSet::new()); - self.cluster_order.push(label.clone()); + if !clusters_guard.contains_key(&label) { + clusters_guard.insert(label.clone(), BTreeSet::new()); + self.cluster_order.lock().unwrap().push(label.clone()); } // Add all addresses to the cluster set for addr in rec.address_set() { - self.clusters + clusters_guard .get_mut(&label) .expect("No such cluster??") .insert(addr.clone()); } - RouterReply::ok() - } - - /// Set an address as ready and return the list of waiting pollers - pub(super) fn set_ready(&mut self, addr: Address) -> Result>> { - let addr_record = self - .address_records_map - .get_mut(&addr) - .ok_or_else(|| NodeError::Address(addr).not_found())?; - Ok(addr_record.set_ready()) - } - - /// Get the ready state of an address - pub(super) fn get_ready(&mut self, addr: Address, reply: SmallSender) -> bool { - self.address_records_map - .get_mut(&addr) - .map_or(false, |rec| rec.ready(reply)) + Ok(()) } /// Retrieve the next cluster in reverse-initialisation order /// Return None if there is no next cluster or if the cluster /// contained no more active addresses - pub(super) fn next_cluster(&mut self) -> Option> { + pub(super) fn next_cluster(&self) -> Option> { // loop until either: // - there are no more clusters // - we found a non-empty list of active addresses in a cluster loop { - let name = self.cluster_order.pop()?; - let addrs = self.clusters.remove(&name)?; + let name = self.cluster_order.lock().unwrap().pop()?; + let addrs = self.clusters.lock().unwrap().remove(&name)?; let active_addresses: Vec
= self + .address_maps + .read() + .unwrap() .address_records_map .iter() .filter_map(|(primary, _)| { @@ -217,46 +298,97 @@ impl InternalMap { } /// Mark this address as "having started to stop" - pub(super) fn init_stop(&mut self, addr: Address) { - self.stopping.insert(addr); + pub(super) fn init_stop(&self, addr: Address) { + self.stopping.lock().unwrap().insert(addr); } /// Check whether the current cluster of addresses was stopped pub(super) fn cluster_done(&self) -> bool { - self.stopping.is_empty() - } + self.stopping.lock().unwrap().is_empty() + } + + /// Stop all workers not in a cluster, returns their primary addresses + pub(super) async fn stop_all_non_cluster_workers(&self) -> Vec
{ + let mut futures = Vec::new(); + let mut addresses = Vec::new(); + { + let clustered = + self.clusters + .lock() + .unwrap() + .iter() + .fold(BTreeSet::new(), |mut acc, (_, set)| { + acc.append(&mut set.clone()); + acc + }); + + let guard = self.address_maps.read().unwrap(); + let records: Vec<&AddressRecord> = guard + .address_records_map + .iter() + .filter_map(|(addr, rec)| { + if clustered.contains(addr) { + None + } else { + Some(rec) + } + }) + // Filter all detached workers because they don't matter :( + .filter(|rec| !rec.meta.detached) + .collect(); - /// Get all addresses of workers not in a cluster - pub(super) fn non_cluster_workers(&mut self) -> Vec<&mut AddressRecord> { - let clustered = self - .clusters - .iter() - .fold(BTreeSet::new(), |mut acc, (_, set)| { - acc.append(&mut set.clone()); - acc + for record in records { + if let Some(first_address) = record.address_set.first() { + debug!("Stopping address {}", first_address); + addresses.push(first_address.clone()); + let send_stop_signal_future = record.stop(); + if let Some(send_stop_signal_future) = send_stop_signal_future { + futures.push((first_address.clone(), send_stop_signal_future)); + } + } else { + error!("Empty Address Set during graceful shutdown") + } + } + } + + // We can only call `.await` outside the lock + for (first_address, send_stop_signal_future) in futures { + send_stop_signal_future.await.unwrap_or_else(|e| { + error!("Failed to stop address {}: {}", first_address, e); }); + } - self.address_records_map - .iter_mut() - .filter_map(|(addr, rec)| { - if clustered.contains(addr) { - None + addresses + } + + pub(super) fn check_addr_not_exist(&self, addr: &Address) -> Result<()> { + let guard = self.address_maps.read().unwrap(); + match guard.address_records_map.get(addr) { + Some(record) => { + if record.check_integrity() { + let node = NodeError::Address(addr.clone()); + Err(node.already_exists())? } else { - Some(rec) + drop(guard); + self.free_address(addr.clone()); + Ok(()) } - }) - // Filter all detached workers because they don't matter :( - .filter(|rec| !rec.meta.detached) - .collect() + } + None => Ok(()), + } } /// Permanently free all remaining resources associated to a particular address - pub(super) fn free_address(&mut self, primary: Address) { - self.stopping.remove(&primary); - if let Some(record) = self.remove_address_record(&primary) { - for addr in record.address_set { - self.remove_alias(&addr); - self.address_metadata_map.remove(&addr); + pub(super) fn free_address(&self, primary: Address) { + let mut guard = self.address_maps.write().unwrap(); + self.stopping.lock().unwrap().remove(&primary); + self.flow_controls.cleanup_address(&primary); + + let removed = guard.address_records_map.remove(&primary); + if let Some(record) = removed { + for alias_address in record.address_set { + guard.address_metadata_map.remove(&alias_address); + guard.alias_map.remove(&alias_address); } } } @@ -272,10 +404,9 @@ pub struct WorkerMeta { #[derive(Debug)] pub struct AddressRecord { address_set: Vec
, - sender: Option>, + sender: SyncMutex>>, ctrl_tx: SmallSender, // Unused for not-detached workers - state: AddressState, - ready: ReadyState, + state: AtomicU8, meta: WorkerMeta, msg_count: Arc, } @@ -285,12 +416,12 @@ impl AddressRecord { &self.address_set } - pub fn sender(&self) -> MessageSender { - self.sender.clone().expect("No such sender!") + pub fn sender(&self) -> Option> { + self.sender.lock().unwrap().clone() } - pub fn drop_sender(&mut self) { - self.sender = None; + pub fn drop_sender(&self) { + *self.sender.lock().unwrap() = None; } pub fn new( @@ -302,10 +433,9 @@ impl AddressRecord { ) -> Self { AddressRecord { address_set, - sender: Some(sender), + sender: SyncMutex::new(Some(sender)), ctrl_tx, - state: AddressState::Running, - ready: ReadyState::Initialising(vec![]), + state: AtomicU8::new(AddressState::Running as u8), msg_count, meta, } @@ -317,48 +447,50 @@ impl AddressRecord { } /// Signal this worker to stop -- it will no longer be able to receive messages - pub async fn stop(&mut self) -> Result<()> { + /// Since the worker is held behind a lock, we cannot hold the synchronous lock + /// and call an `.await` to send the signal. + /// To avoid this shortcoming, we return a future that can be awaited on instead. + pub fn stop(&self) -> Option>> { + if self.state.load(Ordering::Relaxed) != AddressState::Running as u8 { + return None; + } + + // the stop can be triggered only once + let result = self.state.compare_exchange( + AddressState::Running as u8, + AddressState::Stopping as u8, + Ordering::Relaxed, + Ordering::Relaxed, + ); + if result.is_err() { + return None; + } + if self.meta.processor { - self.ctrl_tx - .send(CtrlSignal::InterruptStop) - .await - .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?; + let ctrl_tx = self.ctrl_tx.clone(); + Some(async move { + ctrl_tx + .send(CtrlSignal::InterruptStop) + .await + .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?; + Ok(()) + }) } else { self.drop_sender(); + None } - self.state = AddressState::Stopping; - Ok(()) } /// Check the integrity of this record #[inline] - pub fn check(&self) -> bool { - self.state == AddressState::Running && self.sender.is_some() - } - - /// Check whether this address has been marked as ready yet and if - /// it hasn't we register our sender for future notification - pub fn ready(&mut self, reply: SmallSender) -> bool { - match self.ready { - ReadyState::Ready => true, - ReadyState::Initialising(ref mut vec) => { - vec.push(reply); - false - } - } - } - - /// Mark this address as 'ready' and return the list of active pollers - pub fn set_ready(&mut self) -> Vec> { - let waiting = core::mem::replace(&mut self.ready, ReadyState::Ready); - match waiting { - ReadyState::Initialising(vec) => vec, - ReadyState::Ready => vec![], - } + pub fn check_integrity(&self) -> bool { + self.state.load(Ordering::Relaxed) == AddressState::Running as u8 + && self.sender.lock().unwrap().is_some() } } /// Encode the run states a worker or processor can be in +#[repr(u8)] #[derive(Debug, PartialEq, Eq)] pub enum AddressState { /// The runner is looping in its main body (either handling messages or a manual run-loop) @@ -370,15 +502,6 @@ pub enum AddressState { Faulty, } -/// Encode the ready state of a worker or processor -#[derive(Debug)] -pub enum ReadyState { - /// THe runner is fully ready - Ready, - /// The runner is still processing user init code and contains a list of waiting polling addresses - Initialising(Vec>), -} - #[cfg(test)] mod test { use super::*; @@ -387,22 +510,31 @@ mod test { #[test] fn test_next_cluster() { - let mut map = InternalMap::new(&FlowControls::new()); + let map = InternalMap::new(&FlowControls::new()); // create 3 clusters // cluster 1 has one active address // cluster 2 has no active address // cluster 3 has one active address - map.address_records_map + map.address_maps + .write() + .unwrap() + .address_records_map .insert("address1".into(), create_address_record("address1")); let _ = map.set_cluster("CLUSTER1".into(), "address1".into()); - map.address_records_map + map.address_maps + .write() + .unwrap() + .address_records_map .insert("address2".into(), create_address_record("address2")); let _ = map.set_cluster("CLUSTER2".into(), "address2".into()); map.free_address("address2".into()); - map.address_records_map + map.address_maps + .write() + .unwrap() + .address_records_map .insert("address3".into(), create_address_record("address3")); let _ = map.set_cluster("CLUSTER3".into(), "address3".into()); diff --git a/implementations/rust/ockam/ockam_node/src/router/shutdown.rs b/implementations/rust/ockam/ockam_node/src/router/shutdown.rs index 5d426d70464..e6dbd891d77 100644 --- a/implementations/rust/ockam/ockam_node/src/router/shutdown.rs +++ b/implementations/rust/ockam/ockam_node/src/router/shutdown.rs @@ -1,33 +1,15 @@ use super::Router; -use crate::channel_types::SmallSender; -use crate::{ - error::{NodeError, NodeReason}, - NodeReplyResult, RouterReply, -}; use ockam_core::compat::vec::Vec; use ockam_core::{Address, Result}; +use std::sync::Arc; /// Register a stop ACK /// /// For every ACK we re-test whether the current cluster has stopped. /// If not, we do nothing. If so, we trigger the next cluster to stop. -pub(super) async fn ack(router: &mut Router, addr: Address) -> Result { - debug!("Handling shutdown ACK for {}", addr); - - // Permanently remove the address and corresponding worker - router.map.free_address(addr); - - // If there are workers left in the cluster: keep waiting - if !router.map.cluster_done() { - return Ok(false); - } - - // Check if there is a next cluster - router.stop_next_cluster().await -} impl Router { - async fn stop_next_cluster(&mut self) -> Result { + pub(super) async fn stop_next_cluster(&self) -> Result { let next_cluster_addresses = self.map.next_cluster(); match next_cluster_addresses { @@ -40,52 +22,32 @@ impl Router { } } - async fn stop_cluster_addresses(&mut self, addresses: Vec
) -> Result<()> { - let mut addrs = vec![]; - + async fn stop_cluster_addresses(&self, addresses: Vec
) -> Result<()> { for address in addresses.iter() { - if let Some(record) = self.map.get_address_record_mut(address) { - record.stop().await?; - if let Some(first_address) = record.address_set().first().cloned() { - addrs.push(first_address); - } else { - error!("Empty Address Set during cluster stop"); - } - } + self.map.stop(address).await?; } - - addrs.into_iter().for_each(|addr| self.map.init_stop(addr)); Ok(()) } } /// Implement the graceful shutdown strategy #[cfg_attr(not(feature = "std"), allow(unused_variables))] -pub(super) async fn graceful( - router: &mut Router, - seconds: u8, - reply: SmallSender, -) -> Result { +pub(super) async fn graceful(router: Arc, seconds: u8) -> Result { // Mark the router as shutting down to prevent spawning info!("Initiate graceful node shutdown"); // This changes the router state to `Stopping` - router.state.shutdown(reply); + router.state.shutdown(); // Start by shutting down clusterless workers - let mut cluster = vec![]; - for rec in router.map.non_cluster_workers().iter_mut() { - if let Some(first_address) = rec.address_set().first().cloned() { - debug!("Stopping address {}", first_address); - rec.stop().await?; - cluster.push(first_address); - } else { - error!("Empty Address Set during graceful shutdown"); - } - } + let cluster = router.map.stop_all_non_cluster_workers().await; // If there _are_ no clusterless workers we go to the next cluster if cluster.is_empty() { - return router.stop_next_cluster().await; + let result = router.stop_next_cluster().await; + if let Ok(true) = result { + info!("No more workers left. Goodbye!"); + return Ok(true); + } } // Otherwise: keep track of addresses we are stopping @@ -96,23 +58,17 @@ pub(super) async fn graceful( // Start a timeout task to interrupt us... #[cfg(feature = "std")] { - use crate::NodeMessage; use core::time::Duration; use tokio::{task, time}; - let sender = router.sender(); let dur = Duration::from_secs(seconds as u64); task::spawn(async move { time::sleep(dur).await; warn!("Shutdown timeout reached; aborting node!"); - // This works only because the state of the router is `Stopping` - if sender.send(NodeMessage::AbortNode).await.is_err() { - error!("Failed to send node abort signal to router"); - } + router.abort_node(); }); } - // Return but DO NOT stop the router Ok(false) } @@ -123,15 +79,7 @@ pub(super) async fn graceful( /// shutdown(...)` hook. However: the router will not wait for them! /// Messages sent during the shutdown phase may not be delivered and /// shutdown hooks may be suddenly interrupted by thread-deallocation. -pub(super) async fn immediate( - router: &mut Router, - reply: SmallSender, -) -> Result<()> { +pub(super) fn immediate(router: &Router) { router.map.clear_address_records_map(); router.state.kill(); - reply - .send(RouterReply::ok()) - .await - .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?; - Ok(()) } diff --git a/implementations/rust/ockam/ockam_node/src/router/start_processor.rs b/implementations/rust/ockam/ockam_node/src/router/start_processor.rs index d92c501e2d5..473fd88f6cf 100644 --- a/implementations/rust/ockam/ockam_node/src/router/start_processor.rs +++ b/implementations/rust/ockam/ockam_node/src/router/start_processor.rs @@ -1,42 +1,39 @@ use super::{AddressRecord, NodeState, Router, SenderPair, WorkerMeta}; -use crate::channel_types::SmallSender; -use crate::{ - error::{NodeError, NodeReason}, - NodeReplyResult, RouterReason, RouterReply, -}; +use crate::{error::NodeError, RouterReason}; use ockam_core::compat::{sync::Arc, vec::Vec}; -#[cfg(feature = "std")] -use ockam_core::env::get_env; +use ockam_core::errcode::{Kind, Origin}; use ockam_core::{Address, AddressAndMetadata, Result}; /// Execute a `StartWorker` command -pub(super) async fn exec( - router: &mut Router, +pub(super) fn exec( + router: &Router, addrs: Vec
, senders: SenderPair, addresses_metadata: Vec, - reply: &SmallSender, ) -> Result<()> { match router.state.node_state() { - NodeState::Running => start(router, addrs, senders, addresses_metadata, reply).await, - NodeState::Stopping(_) => reject(reply).await, + NodeState::Running => start(router, addrs, senders, addresses_metadata), + NodeState::Stopping => Err(ockam_core::Error::new( + Origin::Node, + Kind::Shutdown, + "The node is shutting down", + ))?, NodeState::Dead => unreachable!(), }?; Ok(()) } -async fn start( - router: &mut Router, +fn start( + router: &Router, addrs: Vec
, senders: SenderPair, addresses_metadata: Vec, - reply: &SmallSender, ) -> Result<()> { let primary_addr = addrs .first() .ok_or_else(|| NodeError::RouterState(RouterReason::EmptyAddressSet).internal())?; - router.check_addr_not_exist(primary_addr, reply).await?; + router.check_addr_not_exist(primary_addr)?; debug!("Starting new processor '{}'", &primary_addr); @@ -60,47 +57,7 @@ async fn start( router .map - .insert_address_record(primary_addr.clone(), record); + .insert_address_record(primary_addr.clone(), record, addresses_metadata); - for metadata in addresses_metadata { - if !addrs.contains(&metadata.address) { - warn!( - "Address {} is not in the set of addresses for this processor", - metadata.address - ); - continue; - } - - router.map.set_address_metadata(metadata); - } - - #[cfg(feature = "std")] - if let Ok(Some(dump_internals)) = get_env::("OCKAM_DUMP_INTERNALS") { - if dump_internals { - trace!("{:#?}", router.map.address_records_map()); - } - } - #[cfg(all(not(feature = "std"), feature = "dump_internals"))] - trace!("{:#?}", router.map.address_records_map()); - - addrs.iter().for_each(|addr| { - router.map.insert_alias(addr, primary_addr); - }); - - // For now we just send an OK back -- in the future we need to - // communicate the current executor state - reply - .send(RouterReply::ok()) - .await - .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?; - Ok(()) -} - -async fn reject(reply: &SmallSender) -> Result<()> { - trace!("StartProcessor command rejected: node shutting down"); - reply - .send(RouterReply::node_rejected(NodeReason::Shutdown)) - .await - .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?; Ok(()) } diff --git a/implementations/rust/ockam/ockam_node/src/router/start_worker.rs b/implementations/rust/ockam/ockam_node/src/router/start_worker.rs index ce33930a1e1..d5c3f4b492c 100644 --- a/implementations/rust/ockam/ockam_node/src/router/start_worker.rs +++ b/implementations/rust/ockam/ockam_node/src/router/start_worker.rs @@ -1,60 +1,53 @@ use super::{AddressRecord, NodeState, Router, SenderPair, WorkerMeta}; -use crate::channel_types::SmallSender; -use crate::{ - error::{NodeError, NodeReason}, - NodeReplyResult, RouterReason, RouterReply, -}; +use crate::{error::NodeError, RouterReason}; use core::sync::atomic::AtomicUsize; -#[cfg(feature = "std")] -use ockam_core::env::get_env; +use ockam_core::errcode::{Kind, Origin}; use ockam_core::{ compat::{sync::Arc, vec::Vec}, Address, AddressAndMetadata, Result, }; /// Execute a `StartWorker` command -pub(super) async fn exec( - router: &mut Router, +pub(super) fn exec( + router: &Router, addrs: Vec
, senders: SenderPair, detached: bool, addresses_metadata: Vec, metrics: Arc, - reply: &SmallSender, ) -> Result<()> { match router.state.node_state() { - NodeState::Running => { - start( - router, - addrs, - senders, - detached, - addresses_metadata, - metrics, - reply, - ) - .await - } - NodeState::Stopping(_) => reject(reply).await, + NodeState::Running => start( + router, + addrs, + senders, + detached, + addresses_metadata, + metrics, + ), + NodeState::Stopping => Err(ockam_core::Error::new( + Origin::Node, + Kind::Shutdown, + "The node is shutting down", + ))?, NodeState::Dead => unreachable!(), }?; Ok(()) } -async fn start( - router: &mut Router, +fn start( + router: &Router, addrs: Vec
, senders: SenderPair, detached: bool, addresses_metadata: Vec, metrics: Arc, - reply: &SmallSender, ) -> Result<()> { let primary_addr = addrs .first() .ok_or_else(|| NodeError::RouterState(RouterReason::EmptyAddressSet).internal())?; - router.check_addr_not_exist(primary_addr, reply).await?; + router.check_addr_not_exist(primary_addr)?; debug!("Starting new worker '{}'", primary_addr); @@ -76,45 +69,7 @@ async fn start( router .map - .insert_address_record(primary_addr.clone(), address_record); - - for metadata in addresses_metadata { - if !addrs.contains(&metadata.address) { - warn!( - "Address {} is not in the set of addresses for this worker", - metadata.address - ); - continue; - } - - router.map.set_address_metadata(metadata); - } - - #[cfg(feature = "std")] - if let Ok(Some(_)) = get_env::("OCKAM_DUMP_INTERNALS") { - trace!("{:#?}", router.map.address_records_map()); - } - #[cfg(all(not(feature = "std"), feature = "dump_internals"))] - trace!("{:#?}", router.map.internal); - - addrs.iter().for_each(|addr| { - router.map.insert_alias(addr, primary_addr); - }); - - // For now we just send an OK back -- in the future we need to - // communicate the current executor state - reply - .send(RouterReply::ok()) - .await - .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?; - Ok(()) -} + .insert_address_record(primary_addr.clone(), address_record, addresses_metadata); -async fn reject(reply: &SmallSender) -> Result<()> { - trace!("StartWorker command rejected: node shutting down"); - reply - .send(RouterReply::node_rejected(NodeReason::Shutdown)) - .await - .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?; Ok(()) } diff --git a/implementations/rust/ockam/ockam_node/src/router/state.rs b/implementations/rust/ockam/ockam_node/src/router/state.rs index 4d7f4a4d130..f0bd3228342 100644 --- a/implementations/rust/ockam/ockam_node/src/router/state.rs +++ b/implementations/rust/ockam/ockam_node/src/router/state.rs @@ -1,51 +1,51 @@ //! Router run state utilities +use core::sync::atomic::AtomicU8; -use crate::channel_types::SmallSender; -use crate::messages::{NodeMessage, NodeReplyResult}; - +#[repr(u8)] pub enum NodeState { Running, - Stopping(SmallSender), + Stopping, Dead, } pub struct RouterState { - pub(super) sender: SmallSender, - node_state: NodeState, + node_state: AtomicU8, } impl RouterState { - pub fn new(sender: SmallSender) -> Self { + pub fn new() -> Self { Self { - sender, - node_state: NodeState::Running, + node_state: AtomicU8::new(NodeState::Running as u8), } } /// Toggle this router to shut down soon - pub(super) fn shutdown(&mut self, reply: SmallSender) { - self.node_state = NodeState::Stopping(reply) + pub(super) fn shutdown(&self) { + self.node_state.store( + NodeState::Stopping as u8, + core::sync::atomic::Ordering::Relaxed, + ); } /// Ungracefully kill the router - pub(super) fn kill(&mut self) { - self.node_state = NodeState::Dead; - } - - pub(super) fn stop_reply(&self) -> Option> { - match &self.node_state { - NodeState::Stopping(sender) => Some(sender.clone()), - _ => None, - } + pub(super) fn kill(&self) { + self.node_state + .store(NodeState::Dead as u8, core::sync::atomic::Ordering::Relaxed); } pub fn running(&self) -> bool { - core::matches!(self.node_state, NodeState::Running) + self.node_state.load(core::sync::atomic::Ordering::Relaxed) == NodeState::Running as u8 } /// Check if this router is still `running`, meaning allows /// spawning new workers and processors pub fn node_state(&self) -> &NodeState { - &self.node_state + let state = self.node_state.load(core::sync::atomic::Ordering::Relaxed); + match state { + 0 => &NodeState::Running, + 1 => &NodeState::Stopping, + 2 => &NodeState::Dead, + _ => unreachable!(), + } } } diff --git a/implementations/rust/ockam/ockam_node/src/router/stop_processor.rs b/implementations/rust/ockam/ockam_node/src/router/stop_processor.rs index b5f1484bbd3..9c2fb4546fb 100644 --- a/implementations/rust/ockam/ockam_node/src/router/stop_processor.rs +++ b/implementations/rust/ockam/ockam_node/src/router/stop_processor.rs @@ -1,53 +1,21 @@ use super::Router; -use crate::channel_types::SmallSender; -use crate::{ - error::{NodeError, NodeReason}, - NodeReplyResult, RouterReply, -}; -use ockam_core::{Address, Result}; +use ockam_core::errcode::{Kind, Origin}; +use ockam_core::{Address, Error, Result}; -pub(super) async fn exec( - router: &mut Router, - addr: &Address, - reply: &SmallSender, -) -> Result<()> { +pub(super) async fn exec(router: &Router, addr: &Address) -> Result<()> { trace!("Stopping processor '{}'", addr); // Resolve any secondary address to the primary address let primary_address = match router.map.get_primary_address(addr) { Some(p) => p.clone(), None => { - reply - .send(RouterReply::no_such_address(addr.clone())) - .await - .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?; - - return Ok(()); - } - }; - - // Get the internal address record - let record = match router.map.get_address_record_mut(&primary_address) { - Some(r) => r, - None => { - // Actually should not happen - reply - .send(RouterReply::no_such_address(addr.clone())) - .await - .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?; - - return Ok(()); + return Err(Error::new(Origin::Node, Kind::NotFound, "No such address") + .context("Address", addr.clone())) } }; // Then send processor shutdown signal - record.stop().await?; - - // Signal back that everything went OK - reply - .send(RouterReply::ok()) - .await - .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?; + router.map.stop(&primary_address).await?; Ok(()) } diff --git a/implementations/rust/ockam/ockam_node/src/router/stop_worker.rs b/implementations/rust/ockam/ockam_node/src/router/stop_worker.rs index 317e7353037..a3357e8a357 100644 --- a/implementations/rust/ockam/ockam_node/src/router/stop_worker.rs +++ b/implementations/rust/ockam/ockam_node/src/router/stop_worker.rs @@ -1,43 +1,16 @@ use super::Router; -use crate::channel_types::SmallSender; -use crate::{ - error::{NodeError, NodeReason}, - NodeReplyResult, RouterReply, -}; -use ockam_core::{Address, Result}; +use ockam_core::errcode::{Kind, Origin}; +use ockam_core::{Address, Error, Result}; -pub(super) async fn exec( - router: &mut Router, - addr: &Address, - detached: bool, - reply: &SmallSender, -) -> Result<()> { +pub(super) async fn exec(router: &Router, addr: &Address, detached: bool) -> Result<()> { trace!("Stopping worker '{}'", addr); // Resolve any secondary address to the primary address let primary_address = match router.map.get_primary_address(addr) { - Some(p) => p.clone(), + Some(p) => p, None => { - reply - .send(RouterReply::no_such_address(addr.clone())) - .await - .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?; - - return Ok(()); - } - }; - - // Get the internal address record - let record = match router.map.get_address_record_mut(&primary_address) { - Some(r) => r, - None => { - // Actually should not happen - reply - .send(RouterReply::no_such_address(addr.clone())) - .await - .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?; - - return Ok(()); + return Err(Error::new(Origin::Node, Kind::NotFound, "No such address") + .context("Address", addr)) } }; @@ -46,16 +19,11 @@ pub(super) async fn exec( // // For detached workers (i.e. Context's without a mailbox relay // running) we simply drop the record - if !detached { - record.stop().await? - } else { + if detached { router.map.free_address(primary_address); + } else { + router.map.stop(&primary_address).await?; } - reply - .send(RouterReply::ok()) - .await - .map_err(|_| NodeError::NodeState(NodeReason::Unknown).internal())?; - Ok(()) } diff --git a/implementations/rust/ockam/ockam_node/src/router/utils.rs b/implementations/rust/ockam/ockam_node/src/router/utils.rs index f53a0fd07a7..47a45711ef5 100644 --- a/implementations/rust/ockam/ockam_node/src/router/utils.rs +++ b/implementations/rust/ockam/ockam_node/src/router/utils.rs @@ -1,58 +1,10 @@ use super::Router; -use crate::channel_types::SmallSender; -use crate::{ - error::{NodeError, NodeReason, WorkerReason}, - NodeReplyResult, RouterReply, -}; +use crate::error::{NodeError, NodeReason}; use ockam_core::{Address, Result, TransportType}; -/// Receive an address and resolve it to a sender -/// -/// This function only applies to local address types, and will -/// fail to resolve a correct address if it given a remote -/// address. -pub(super) async fn resolve( - router: &mut Router, - addr: Address, - reply: &SmallSender, -) -> Result<()> { - let base = format!("Resolving worker address '{}'...", addr); - - let address_record = if let Some(primary_address) = router.map.get_primary_address(&addr) { - router.map.get_address_record(primary_address) - } else { - trace!("{} FAILED; no such worker", base); - reply - .send(RouterReply::no_such_address(addr)) - .await - .map_err(NodeError::from_send_err)?; - - return Ok(()); - }; - - match address_record { - Some(record) if record.check() => { - trace!("{} OK", base); - record.increment_msg_count(); - reply.send(RouterReply::sender(addr, record.sender())) - } - Some(_) => { - trace!("{} REJECTED; worker shutting down", base); - reply.send(RouterReply::worker_rejected(WorkerReason::Shutdown)) - } - None => { - trace!("{} FAILED; no such worker", base); - reply.send(RouterReply::no_such_address(addr)) - } - } - .await - .map_err(NodeError::from_send_err)?; - Ok(()) -} - -pub(super) fn router_addr(router: &mut Router, tt: TransportType) -> Result
{ - router - .external +pub(super) fn router_addr(router: &Router, tt: TransportType) -> Result
{ + let guard = router.external.read().unwrap(); + guard .get(&tt) .cloned() .ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal()) diff --git a/implementations/rust/ockam/ockam_node/src/shutdown.rs b/implementations/rust/ockam/ockam_node/src/shutdown.rs new file mode 100644 index 00000000000..a75b1f8281b --- /dev/null +++ b/implementations/rust/ockam/ockam_node/src/shutdown.rs @@ -0,0 +1,42 @@ +/// Specify the type of node shutdown +/// +/// For most users `ShutdownType::Graceful()` is recommended. The +/// `Default` implementation uses a 1 second timeout. +#[derive(Debug, Copy, Clone)] +#[non_exhaustive] +pub enum ShutdownType { + /// Execute a graceful shutdown given a maximum timeout + /// + /// The following steps will be taken by the internal router + /// during graceful shutdown procedure: + /// + /// * Signal clusterless workers to stop + /// * Wait for shutdown ACK hooks from worker set + /// * Signal worker clusters in reverse-creation order to stop + /// * Wait for shutdown ACK hooks from each cluster before moving onto the + /// next + /// * All shutdown-signaled workers may process their entire mailbox, + /// while not allowing new messages to be queued + /// + /// Graceful shutdown procedure will be pre-maturely terminated + /// when reaching the timeout (failover into `Immediate` + /// strategy). **A given timeout of `0` will wait forever!** + Graceful(u8), + /// Immediately shutdown workers and run shutdown hooks + /// + /// This strategy can lead to data loss: + /// + /// * Unhandled mailbox messages will be dropped + /// * Shutdown hooks may not be able to send messages + /// + /// This strategy is not recommended for general use, but will be + /// selected as a failover, if the `Graceful` strategy reaches its + /// timeout limit. + Immediate, +} + +impl Default for ShutdownType { + fn default() -> Self { + Self::Graceful(1) + } +} diff --git a/implementations/rust/ockam/ockam_node/src/worker_builder.rs b/implementations/rust/ockam/ockam_node/src/worker_builder.rs index 1e683dd8526..3d980757551 100644 --- a/implementations/rust/ockam/ockam_node/src/worker_builder.rs +++ b/implementations/rust/ockam/ockam_node/src/worker_builder.rs @@ -1,12 +1,10 @@ use crate::debugger; -use crate::error::{NodeError, NodeReason}; -use crate::{relay::WorkerRelay, Context, NodeMessage}; +use crate::{relay::WorkerRelay, Context}; use alloc::string::String; use ockam_core::compat::{sync::Arc, vec::Vec}; use ockam_core::{ - errcode::{Kind, Origin}, - Address, AddressAndMetadata, AddressMetadata, AllowAll, Error, IncomingAccessControl, - Mailboxes, OutgoingAccessControl, Result, Worker, + Address, AddressAndMetadata, AddressMetadata, AllowAll, IncomingAccessControl, Mailboxes, + OutgoingAccessControl, Result, Worker, }; /// Start a [`Worker`] with a custom configuration @@ -267,19 +265,8 @@ where debugger::log_inherit_context("WORKER", context, &ctx); - // Send start request to router - let (msg, mut rx) = - NodeMessage::start_worker(addresses, sender, false, context.mailbox_count(), metadata); - context - .sender() - .send(msg) - .await - .map_err(|e| Error::new(Origin::Node, Kind::Invalid, e))?; - - // Wait for the actual return code - rx.recv() - .await - .ok_or_else(|| NodeError::NodeState(NodeReason::Unknown).internal())??; + let router = context.router(); + router.start_worker(addresses, sender, false, metadata, context.mailbox_count())?; // Then initialise the worker message relay WorkerRelay::init(context.runtime(), worker, ctx, ctrl_rx); diff --git a/implementations/rust/ockam/ockam_node/tests/router.rs b/implementations/rust/ockam/ockam_node/tests/router.rs index dda11d40c93..4ea18dba91a 100644 --- a/implementations/rust/ockam/ockam_node/tests/router.rs +++ b/implementations/rust/ockam/ockam_node/tests/router.rs @@ -94,7 +94,7 @@ async fn provide_and_read_processor_address_metadata(context: &mut Context) -> R .start(context) .await?; - let meta = context.get_metadata("processor_address").await?.unwrap(); + let meta = context.get_metadata("processor_address").unwrap(); assert!(!meta.is_terminal); @@ -106,11 +106,11 @@ async fn provide_and_read_processor_address_metadata(context: &mut Context) -> R ] ); - assert_eq!(context.get_metadata("non-existing-worker").await?, None); + assert_eq!(context.get_metadata("non-existing-worker"), None); context.stop_processor("processor_address").await?; ockam_node::compat::tokio::time::sleep(std::time::Duration::from_millis(10)).await; - assert_eq!(context.get_metadata("processor_address",).await?, None); + assert_eq!(context.get_metadata("processor_address"), None); context.stop().await } @@ -209,7 +209,7 @@ async fn provide_and_read_address_metadata(context: &mut Context) -> Result<()> .start(context) .await?; - let meta = context.get_metadata("worker_address").await?.unwrap(); + let meta = context.get_metadata("worker_address").unwrap(); assert!(!meta.is_terminal); @@ -221,11 +221,11 @@ async fn provide_and_read_address_metadata(context: &mut Context) -> Result<()> ] ); - assert_eq!(context.get_metadata("non-existing-worker").await?, None); + assert_eq!(context.get_metadata("non-existing-worker"), None); context.stop_worker("worker_address").await?; ockam_node::compat::tokio::time::sleep(std::time::Duration::from_millis(10)).await; - assert_eq!(context.get_metadata("worker_address").await?, None); + assert_eq!(context.get_metadata("worker_address"), None); context.stop().await } @@ -242,7 +242,7 @@ async fn provide_and_read_address_metadata_worker_alias(context: &mut Context) - .start(context) .await?; - let meta = context.get_metadata("alias").await?.unwrap(); + let meta = context.get_metadata("alias").unwrap(); assert!(!meta.is_terminal); @@ -253,7 +253,7 @@ async fn provide_and_read_address_metadata_worker_alias(context: &mut Context) - context.stop_worker("main").await?; ockam_node::compat::tokio::time::sleep(std::time::Duration::from_millis(10)).await; - assert_eq!(context.get_metadata("alias").await?, None); + assert_eq!(context.get_metadata("alias"), None); context.stop().await } diff --git a/implementations/rust/ockam/ockam_node/tests/tests.rs b/implementations/rust/ockam/ockam_node/tests/tests.rs index 575d3e7aee9..025949223db 100644 --- a/implementations/rust/ockam/ockam_node/tests/tests.rs +++ b/implementations/rust/ockam/ockam_node/tests/tests.rs @@ -14,7 +14,6 @@ use serde::{Deserialize, Serialize}; use std::sync::atomic::{AtomicI8, AtomicU32}; use std::time::{SystemTime, UNIX_EPOCH}; use tokio::time::sleep; -use tracing::info; #[allow(non_snake_case)] #[ockam_macros::test] @@ -173,7 +172,7 @@ async fn worker_initialize_fail_should_shutdown(ctx: &mut Context) -> Result<()> sleep(Duration::new(1, 0)).await; assert!(shutdown_was_called.load(Ordering::Relaxed)); - assert!(!ctx.list_workers().await?.contains(&address)); + assert!(!ctx.list_workers().contains(&address)); Ok(()) } @@ -208,7 +207,7 @@ async fn processor_initialize_fail_should_shutdown(ctx: &mut Context) -> Result< assert!(res.is_ok()); sleep(Duration::new(1, 0)).await; assert!(shutdown_was_called.load(Ordering::Relaxed)); - assert!(!ctx.list_workers().await?.contains(&address)); + assert!(!ctx.list_workers().contains(&address)); Ok(()) } @@ -288,7 +287,7 @@ async fn counting_processor__run_node_lifecycle__processor_lifecycle_should_be_f }; ctx.start_processor("counting_processor", processor).await?; - sleep(Duration::new(1, 0)).await; + sleep(Duration::from_secs(1)).await; assert!(initialize_was_called.load(Ordering::Relaxed)); assert!(shutdown_was_called.load(Ordering::Relaxed)); @@ -322,8 +321,7 @@ impl Processor for WaitingProcessor { } async fn process(&mut self, _ctx: &mut Self::Context) -> Result { - sleep(Duration::new(1, 0)).await; - + sleep(Duration::from_secs(1)).await; Ok(true) } } @@ -343,11 +341,11 @@ async fn waiting_processor__shutdown__should_be_interrupted(ctx: &mut Context) - }; ctx.start_processor("waiting_processor", processor).await?; - sleep(Duration::new(1, 0)).await; + sleep(Duration::from_secs(1)).await; ctx.stop_processor("waiting_processor").await?; // Wait till tokio Runtime is shut down - std::thread::sleep(Duration::new(1, 0)); + std::thread::sleep(Duration::from_secs(1)); assert!(initialize_was_called.load(Ordering::Relaxed)); assert!(shutdown_was_called.load(Ordering::Relaxed)); @@ -461,41 +459,6 @@ async fn abort_blocked_shutdown(ctx: &mut Context) -> Result<()> { .unwrap() } -struct WaitForWorker; - -#[ockam_core::worker] -impl Worker for WaitForWorker { - type Context = Context; - type Message = (); - - async fn initialize(&mut self, ctx: &mut Context) -> Result<()> { - info!("This worker initialises a bit slow"); - ctx.sleep(Duration::from_secs(1)).await; - info!("Worker done"); - Ok(()) - } -} - -#[ockam_macros::test] -async fn wait_for_worker(ctx: &mut Context) -> Result<()> { - let t1 = tokio::time::Instant::now(); - ctx.start_worker_with_access_control("slow", WaitForWorker, DenyAll, DenyAll) - .await - .unwrap(); - - info!("Waiting for worker..."); - ctx.wait_for("slow").await.unwrap(); - info!("Done waiting :)"); - - let t2 = tokio::time::Instant::now(); - assert!((t2 - t1) > Duration::from_secs(1)); - - if let Err(e) = ctx.stop().await { - println!("Unclean stop: {}", e) - } - Ok(()) -} - struct StopFromHandleMessageWorker { counter_a: Arc, counter_b: Arc, diff --git a/implementations/rust/ockam/ockam_transport_ble/src/router/mod.rs b/implementations/rust/ockam/ockam_transport_ble/src/router/mod.rs index 682e35dadea..cfe606eb3b1 100644 --- a/implementations/rust/ockam/ockam_transport_ble/src/router/mod.rs +++ b/implementations/rust/ockam/ockam_transport_ble/src/router/mod.rs @@ -180,7 +180,7 @@ impl BleRouter { .await?; trace!("Registering Ble router for type = {}", crate::BLE); - ctx.register(crate::BLE, main_addr).await?; + ctx.register(crate::BLE, main_addr)?; Ok(handle) } diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_message.rs b/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_message.rs index 6c2ba0fb5b0..5662bec4d1c 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_message.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/portal/portal_message.rs @@ -108,7 +108,7 @@ pub enum PortalInternalMessage { } /// Maximum allowed size for a payload -pub const MAX_PAYLOAD_SIZE: usize = 48 * 1024; +pub const MAX_PAYLOAD_SIZE: usize = 8 * 1024; #[cfg(test)] mod test { diff --git a/implementations/rust/ockam/ockam_transport_tcp/src/transport/lifecycle.rs b/implementations/rust/ockam/ockam_transport_tcp/src/transport/lifecycle.rs index 5cfe5c626d0..96e555c1019 100644 --- a/implementations/rust/ockam/ockam_transport_tcp/src/transport/lifecycle.rs +++ b/implementations/rust/ockam/ockam_transport_tcp/src/transport/lifecycle.rs @@ -145,7 +145,7 @@ mod tests { async fn test_resolve_address(ctx: &mut Context) -> Result<()> { let tcp = TcpTransport::create(ctx).await?; let tcp_address = "127.0.0.1:0"; - let initial_workers = ctx.list_workers().await?; + let initial_workers = ctx.list_workers(); let listener = TcpListener::bind(tcp_address) .await .map_err(TransportError::from)?; @@ -163,7 +163,7 @@ mod tests { .await?; // there are 2 additional workers - let mut additional_workers = ctx.list_workers().await?; + let mut additional_workers = ctx.list_workers(); additional_workers.retain(|w| !initial_workers.contains(w)); assert_eq!(additional_workers.len(), 2); diff --git a/implementations/rust/ockam/ockam_transport_udp/src/transport/lifecycle.rs b/implementations/rust/ockam/ockam_transport_udp/src/transport/lifecycle.rs index e9b1bf43bb3..ae94e0d3e91 100644 --- a/implementations/rust/ockam/ockam_transport_udp/src/transport/lifecycle.rs +++ b/implementations/rust/ockam/ockam_transport_udp/src/transport/lifecycle.rs @@ -84,7 +84,7 @@ mod tests { async fn test_resolve_address(ctx: &mut Context) -> Result<()> { let udp = UdpTransport::create(ctx).await?; let udp_address = "127.0.0.1:0"; - let initial_workers = ctx.list_workers().await?; + let initial_workers = ctx.list_workers(); let socket = UdpSocket::bind(udp_address) .await .map_err(TransportError::from)?; @@ -95,7 +95,7 @@ mod tests { .await?; // there are 2 additional workers - let mut additional_workers = ctx.list_workers().await?; + let mut additional_workers = ctx.list_workers(); additional_workers.retain(|w| !initial_workers.contains(w)); assert_eq!(additional_workers.len(), 2); diff --git a/implementations/rust/ockam/ockam_transport_uds/src/router/uds_router.rs b/implementations/rust/ockam/ockam_transport_uds/src/router/uds_router.rs index 96b7f503680..695970cc5e3 100644 --- a/implementations/rust/ockam/ockam_transport_uds/src/router/uds_router.rs +++ b/implementations/rust/ockam/ockam_transport_uds/src/router/uds_router.rs @@ -60,7 +60,7 @@ impl UdsRouter { .await?; trace!("Registering UDS router for type = {}", UDS); - ctx.register(UDS, main_addr).await?; + ctx.register(UDS, main_addr)?; Ok(handle) } diff --git a/implementations/rust/ockam/ockam_transport_websocket/src/router/mod.rs b/implementations/rust/ockam/ockam_transport_websocket/src/router/mod.rs index ac1d88de82e..e5dba483a40 100644 --- a/implementations/rust/ockam/ockam_transport_websocket/src/router/mod.rs +++ b/implementations/rust/ockam/ockam_transport_websocket/src/router/mod.rs @@ -95,7 +95,7 @@ impl WebSocketRouter { .start(ctx) .await?; trace!("Registering WS router for type = {}", WS); - ctx.register(WS, main_addr).await?; + ctx.register(WS, main_addr)?; Ok(handle) } diff --git a/tools/stress-test/src/main.rs b/tools/stress-test/src/main.rs index 135c98febf8..2ddc048ec3f 100644 --- a/tools/stress-test/src/main.rs +++ b/tools/stress-test/src/main.rs @@ -133,17 +133,9 @@ impl State { } else { NodeBuilder::new().no_logging() }; - let (context, mut executor) = builder.with_runtime(rt.clone()).build(); + let (context, _executor) = builder.with_runtime(rt.clone()).build(); let context = Arc::new(context); - // start the router, it is needed for the node manager creation - rt.spawn(async move { - executor - .start_router() - .await - .expect("cannot start executor") - }); - let runtime = context.runtime().clone(); let node_manager = runtime .block_on(Self::make_node_manager(context.clone(), &cli_state))