Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): merge Processor into Worker #8740

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion examples/rust/file_transfer/examples/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ struct FileReception {

#[ockam::worker]
impl Worker for FileReception {
type Context = Context;
type Message = FileData;

async fn handle_message(&mut self, ctx: &mut Context, msg: Routed<Self::Message>) -> Result<()> {
Expand Down
1 change: 0 additions & 1 deletion examples/rust/get_started/examples/bob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ struct Echoer;
// echoes it back on its return route.
#[ockam::worker]
impl Worker for Echoer {
type Context = Context;
type Message = String;

async fn handle_message(&mut self, ctx: &mut Context, msg: Routed<String>) -> Result<()> {
Expand Down
1 change: 0 additions & 1 deletion examples/rust/get_started/src/echoer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ pub struct Echoer;

#[ockam::worker]
impl Worker for Echoer {
type Context = Context;
type Message = String;

async fn handle_message(&mut self, ctx: &mut Context, msg: Routed<String>) -> Result<()> {
Expand Down
1 change: 0 additions & 1 deletion examples/rust/get_started/src/hop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ pub struct Hop;

#[ockam::worker]
impl Worker for Hop {
type Context = Context;
type Message = Any;

/// This handle function takes any incoming message and forwards
Expand Down
1 change: 0 additions & 1 deletion examples/rust/get_started/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ pub struct Logger;

#[ockam::worker]
impl Worker for Logger {
type Context = Context;
type Message = Any;

/// This handle function takes any incoming message and print its content as a UTF-8 string
Expand Down
1 change: 0 additions & 1 deletion examples/rust/get_started/src/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ impl Relay {

#[ockam::worker]
impl Worker for Relay {
type Context = Context;
type Message = Any;

/// This handle function takes any incoming message and forwards
Expand Down
4 changes: 2 additions & 2 deletions examples/rust/get_started/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,12 @@ fn run_05_secure_channel_over_two_transport_hops() -> Result<(), Error> {
// Launch responder, wait for it to start up
let resp =
CmdBuilder::new("cargo run --locked --example 05-secure-channel-over-two-transport-hops-responder").spawn()?;
resp.match_stdout("Initializing ockam processor")?;
resp.match_stdout("Initializing ockam worker")?;

// Launch middle, wait for it to start up
let mid =
CmdBuilder::new("cargo run --locked --example 05-secure-channel-over-two-transport-hops-middle").spawn()?;
mid.match_stdout("Initializing ockam processor")?;
mid.match_stdout("Initializing ockam worker")?;

// Run initiator to completion
let (exitcode, stdout) =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::tcp_interceptor::{Role, TcpMitmProcessor, TcpMitmRegistry, TcpMitmTransport};
use ockam_core::{async_trait, compat::net::SocketAddr};
use ockam_core::{Address, Processor, Result};
use ockam_node::Context;
use ockam_core::{Address, Result};
use ockam_node::{Context, Worker};
use ockam_transport_core::TransportError;
use tokio::net::{TcpListener, TcpStream};
use tracing::debug;
Expand Down Expand Up @@ -34,29 +34,29 @@ impl TcpMitmListenProcessor {
target_addr,
};

ctx.start_processor(address.clone(), processor)?;
ctx.start_worker(address.clone(), processor)?;

Ok((saddr, address))
}
}

#[async_trait]
impl Processor for TcpMitmListenProcessor {
type Context = Context;
impl Worker for TcpMitmListenProcessor {
type Message = ();

async fn initialize(&mut self, ctx: &mut Context) -> Result<()> {
self.registry.add_listener(ctx.primary_address());

Ok(())
}

async fn shutdown(&mut self, ctx: &mut Self::Context) -> Result<()> {
async fn shutdown(&mut self, ctx: &mut Context) -> Result<()> {
self.registry.remove_listener(ctx.primary_address());

Ok(())
}

async fn process(&mut self, ctx: &mut Self::Context) -> Result<bool> {
async fn process(&mut self, ctx: &mut Context) -> Result<bool> {
debug!("Waiting for incoming TCP connection...");

let (stream, _peer) = self.inner.accept().await.map_err(TransportError::from)?;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::tcp_interceptor::{Role, TcpMitmRegistry};
use ockam_core::compat::sync::Arc;
use ockam_core::Result;
use ockam_core::{async_trait, Address, AllowAll};
use ockam_core::{Processor, Result};
use ockam_node::compat::asynchronous::Mutex;
use ockam_node::Context;
use ockam_node::{Context, Worker};
use tokio::io::AsyncWriteExt;
use tokio::net::tcp::OwnedWriteHalf;
use tokio::{io::AsyncReadExt, net::tcp::OwnedReadHalf};
Expand Down Expand Up @@ -47,15 +47,15 @@ impl TcpMitmProcessor {

let receiver = Self::new(address_of_other_processor, role, read_half, write_half, registry);

ctx.start_processor_with_access_control(address, receiver, AllowAll, AllowAll)?;
ctx.start_worker_with_access_control(address, receiver, AllowAll, AllowAll)?;

Ok(())
}
}

#[async_trait]
impl Processor for TcpMitmProcessor {
type Context = Context;
impl Worker for TcpMitmProcessor {
type Message = ();

async fn initialize(&mut self, ctx: &mut Context) -> Result<()> {
self.registry
Expand All @@ -66,7 +66,7 @@ impl Processor for TcpMitmProcessor {
Ok(())
}

async fn shutdown(&mut self, ctx: &mut Self::Context) -> Result<()> {
async fn shutdown(&mut self, ctx: &mut Context) -> Result<()> {
self.registry.remove_processor(ctx.primary_address());

debug!("Shutdown {}", ctx.primary_address());
Expand Down
1 change: 0 additions & 1 deletion examples/rust/no_std/src/echoer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ pub struct Echoer;

#[ockam::worker]
impl Worker for Echoer {
type Context = Context;
type Message = String;

async fn handle_message(&mut self, ctx: &mut Context, msg: Routed<String>) -> Result<()> {
Expand Down
1 change: 0 additions & 1 deletion examples/rust/no_std/src/hop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ pub struct Hop;

#[ockam::worker]
impl Worker for Hop {
type Context = Context;
type Message = Any;

/// This handle function takes any incoming message and forwards
Expand Down
2 changes: 1 addition & 1 deletion examples/rust/tcp_inlet_and_outlet/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ fn run_01_inlet_outlet_one_process() -> Result<(), Error> {
"cargo run --locked --example 01-inlet-outlet 127.0.0.1:{port} ockam.io:80"
))
.spawn()?;
runner.match_stdout(r"(?i)Starting new processor")?;
runner.match_stdout(r"(?i)Starting new worker")?;

// Run curl and check for a successful run
let (exitcode, stdout) =
Expand Down
4 changes: 2 additions & 2 deletions implementations/rust/ockam/ockam/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub use ockam_core::processor;
pub use ockam_core::worker;
pub use ockam_core::{
allow, deny, errcode, route, Address, Any, Encoded, Error, LocalMessage, Mailbox, Mailboxes,
Message, Processor, ProtocolId, Result, Route, Routed, TransportMessage, TryClone, Worker,
Message, ProtocolId, Result, Route, Routed, TransportMessage, TryClone,
};
pub use ockam_identity as identity;
// ---
Expand All @@ -78,7 +78,7 @@ pub use ockam_macros::{node, test};
pub use ockam_node::database::*;
pub use ockam_node::{
debugger, Context, DelayedEvent, Executor, MessageReceiveOptions, MessageSendReceiveOptions,
NodeBuilder, WorkerBuilder,
NodeBuilder, Worker, WorkerBuilder,
};
#[cfg(feature = "ockam_transport_tcp")]
/// TCP transport
Expand Down
40 changes: 5 additions & 35 deletions implementations/rust/ockam/ockam/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ use ockam_core::compat::string::String;
use ockam_core::compat::sync::Arc;
use ockam_core::flow_control::FlowControls;
use ockam_core::{
Address, IncomingAccessControl, Message, OutgoingAccessControl, Processor, Result, Route,
Routed, TryClone, Worker,
Address, IncomingAccessControl, Message, OutgoingAccessControl, Result, Route, Routed, TryClone,
};
use ockam_identity::{
CredentialRepository, IdentitiesAttributes, IdentitiesVerification,
IdentityAttributesRepository, PurposeKeys, Vault,
};
use ockam_node::{Context, HasContext, MessageReceiveOptions, MessageSendReceiveOptions};
use ockam_node::{Context, HasContext, MessageReceiveOptions, MessageSendReceiveOptions, Worker};
use ockam_vault::storage::SecretsRepository;
use ockam_vault::SigningSecretKeyHandle;

Expand Down Expand Up @@ -161,51 +160,22 @@ impl Node {
}

/// Start a new worker instance at the given address. Default Access Control is AllowAll
pub fn start_worker<W>(&self, address: impl Into<Address>, worker: W) -> Result<()>
where
W: Worker<Context = Context>,
{
pub fn start_worker<W: Worker>(&self, address: impl Into<Address>, worker: W) -> Result<()> {
self.context.start_worker(address, worker)
}

/// Start a new worker instance at the given address with given Access Controls
pub fn start_worker_with_access_control<W>(
pub fn start_worker_with_access_control<W: Worker>(
&self,
address: impl Into<Address>,
worker: W,
incoming: impl IncomingAccessControl,
outgoing: impl OutgoingAccessControl,
) -> Result<()>
where
W: Worker<Context = Context>,
{
) -> Result<()> {
self.context
.start_worker_with_access_control(address, worker, incoming, outgoing)
}

/// Start a new processor instance at the given address. Default Access Control is DenyAll
pub fn start_processor<P>(&self, address: impl Into<Address>, processor: P) -> Result<()>
where
P: Processor<Context = Context>,
{
self.context.start_processor(address, processor)
}

/// Start a new processor instance at the given address with given Access Controls
pub fn start_processor_with_access_control<P>(
&self,
address: impl Into<Address>,
processor: P,
incoming: impl IncomingAccessControl,
outgoing: impl OutgoingAccessControl,
) -> Result<()>
where
P: Processor<Context = Context>,
{
self.context
.start_processor_with_access_control(address, processor, incoming, outgoing)
}

/// Signal to the local runtime to shut down
pub async fn shutdown(&mut self) -> Result<()> {
self.context.shutdown_node().await
Expand Down
9 changes: 4 additions & 5 deletions implementations/rust/ockam/ockam/src/relay_service/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use ockam_core::compat::sync::Arc;
use ockam_core::compat::{boxed::Box, vec::Vec};
use ockam_core::{
route, Address, AllowAll, AllowOnwardAddress, Any, IncomingAccessControl, LocalMessage,
OutgoingAccessControl, Result, Route, Routed, Worker,
OutgoingAccessControl, Result, Route, Routed,
};
use ockam_node::WorkerBuilder;
use ockam_node::{Worker, WorkerBuilder};
use tracing::info;

pub(super) struct Relay {
Expand Down Expand Up @@ -52,10 +52,9 @@ impl Relay {

#[crate::worker]
impl Worker for Relay {
type Context = Context;
type Message = Any;

async fn initialize(&mut self, ctx: &mut Self::Context) -> Result<()> {
async fn initialize(&mut self, ctx: &mut Context) -> Result<()> {
let payload = self
.payload
.take()
Expand All @@ -77,7 +76,7 @@ impl Worker for Relay {

async fn handle_message(
&mut self,
ctx: &mut Self::Context,
ctx: &mut Context,
msg: Routed<Self::Message>,
) -> Result<()> {
let mut local_message = msg.into_local_message();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use alloc::string::String;
use ockam_core::compat::boxed::Box;
use ockam_core::compat::sync::Arc;
use ockam_core::{
Address, DenyAll, Encodable, Mailbox, Mailboxes, Result, Routed, SecureChannelLocalInfo, Worker,
Address, DenyAll, Encodable, Mailbox, Mailboxes, Result, Routed, SecureChannelLocalInfo,
};
use ockam_node::WorkerBuilder;
use ockam_node::{Worker, WorkerBuilder};

/// Alias worker to register remote workers under local names.
///
Expand Down Expand Up @@ -62,12 +62,11 @@ impl RelayService {

#[crate::worker]
impl Worker for RelayService {
type Context = Context;
type Message = String;

async fn handle_message(
&mut self,
ctx: &mut Self::Context,
ctx: &mut Context,
message: Routed<Self::Message>,
) -> Result<()> {
let secure_channel_local_info =
Expand Down
6 changes: 3 additions & 3 deletions implementations/rust/ockam/ockam/src/remote/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ use ockam_core::compat::{
boxed::Box,
string::{String, ToString},
};
use ockam_core::{Any, Decodable, Result, Routed, Worker};
use ockam_core::{Any, Decodable, Result, Routed};
use ockam_node::Worker;
use tracing::{debug, info};

#[crate::worker]
impl Worker for RemoteRelay {
type Context = Context;
type Message = Any;

async fn initialize(&mut self, ctx: &mut Self::Context) -> Result<()> {
async fn initialize(&mut self, ctx: &mut Context) -> Result<()> {
debug!(registration_route = %self.registration_route, "RemoteRelay initializing...");

ctx.send_from_address(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use ockam_core::api::{Method, RequestHeader, Response};
use ockam_core::compat::boxed::Box;
use ockam_core::compat::sync::Arc;
use ockam_core::compat::vec::Vec;
use ockam_core::{Result, Routed, SecureChannelLocalInfo, Worker};
use ockam_node::Context;
use ockam_core::{Result, Routed, SecureChannelLocalInfo};
use ockam_node::{Context, Worker};

/// This struct runs as a Worker to issue credentials based on a request/response protocol
pub struct CredentialIssuerWorker {
Expand Down Expand Up @@ -48,7 +48,6 @@ impl CredentialIssuerWorker {

#[ockam_core::worker]
impl Worker for CredentialIssuerWorker {
type Context = Context;
type Message = Vec<u8>;

async fn handle_message(&mut self, c: &mut Context, m: Routed<Self::Message>) -> Result<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use tracing::trace;
use ockam::identity::{Identifier, IdentitiesAttributes};
use ockam_core::api::{Method, RequestHeader, Response};
use ockam_core::compat::sync::Arc;
use ockam_core::{Result, Routed, SecureChannelLocalInfo, Worker};
use ockam_node::Context;
use ockam_core::{Result, Routed, SecureChannelLocalInfo};
use ockam_node::{Context, Worker};

use crate::authenticator::direct::types::AddMember;
use crate::authenticator::direct::DirectAuthenticator;
Expand Down Expand Up @@ -37,7 +37,6 @@ impl DirectAuthenticatorWorker {
#[ockam_core::worker]
impl Worker for DirectAuthenticatorWorker {
type Message = Vec<u8>;
type Context = Context;

async fn handle_message(&mut self, c: &mut Context, m: Routed<Self::Message>) -> Result<()> {
let secure_channel_info = match SecureChannelLocalInfo::find_info(m.local_message()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use minicbor::Decoder;
use ockam::identity::Identifier;
use ockam_core::api::{Method, RequestHeader, Response};
use ockam_core::compat::sync::Arc;
use ockam_core::{Result, Routed, SecureChannelLocalInfo, Worker};
use ockam_node::Context;
use ockam_core::{Result, Routed, SecureChannelLocalInfo};
use ockam_node::{Context, Worker};
use tracing::trace;

pub struct EnrollmentTokenAcceptorWorker {
Expand All @@ -27,7 +27,6 @@ impl EnrollmentTokenAcceptorWorker {

#[ockam_core::worker]
impl Worker for EnrollmentTokenAcceptorWorker {
type Context = Context;
type Message = Vec<u8>;

async fn handle_message(&mut self, c: &mut Context, m: Routed<Self::Message>) -> Result<()> {
Expand Down
Loading
Loading