Skip to content

Commit

Permalink
feat(rust): rewrite ockam_node
Browse files Browse the repository at this point in the history
  • Loading branch information
davide-baldo authored and SanjoDeundiak committed Jan 12, 2025
1 parent 0c4d9cb commit 80e114f
Show file tree
Hide file tree
Showing 324 changed files with 3,647 additions and 5,489 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 10 additions & 9 deletions examples/rust/file_transfer/examples/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl Worker for FileReception {
Ok(n) => {
self.written_size += n;
if self.written_size == self.size {
ctx.stop().await?;
ctx.shutdown_node().await?;
}
}
Err(e) => {
Expand All @@ -71,7 +71,7 @@ impl Worker for FileReception {
);
}
}
FileData::Quit => ctx.stop().await?,
FileData::Quit => ctx.shutdown_node().await?,
}

Ok(())
Expand All @@ -81,7 +81,7 @@ impl Worker for FileReception {
#[ockam::node]
async fn main(ctx: Context) -> Result<()> {
let node = node(ctx).await?;
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;

// Create an Identity to represent Receiver.
let receiver = node.create_identity().await?;
Expand All @@ -90,13 +90,14 @@ async fn main(ctx: Context) -> Result<()> {
let secure_channel_listener_options =
SecureChannelListenerOptions::new().as_consumer(&tcp_options.flow_control_id());

node.flow_controls()
.add_consumer("receiver", &secure_channel_listener_options.spawner_flow_control_id());
node.flow_controls().add_consumer(
&"receiver".into(),
&secure_channel_listener_options.spawner_flow_control_id(),
);

// Create a secure channel listener for Receiver that will wait for requests to
// initiate an Authenticated Key Exchange.
node.create_secure_channel_listener(&receiver, "listener", secure_channel_listener_options)
.await?;
node.create_secure_channel_listener(&receiver, "listener", secure_channel_listener_options)?;

// The computer that is running this program is likely within a private network and
// not accessible over the internet.
Expand All @@ -116,8 +117,8 @@ async fn main(ctx: Context) -> Result<()> {
println!("{}", relay.remote_address());

// Start a worker, of type FileReception, at address "receiver".
node.start_worker("receiver", FileReception::default()).await?;
node.start_worker("receiver", FileReception::default())?;

// We won't call ctx.stop() here, this program will quit when the file will be entirely received
// We won't call ctx.shutdown_node() here, this program will quit when the file will be entirely received
Ok(())
}
4 changes: 2 additions & 2 deletions examples/rust/file_transfer/examples/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn main(ctx: Context) -> Result<()> {
let opt = Sender::parse();

let node = node(ctx).await?;
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;

// Create an Identity to represent Sender.
let sender = node.create_identity().await?;
Expand Down Expand Up @@ -94,6 +94,6 @@ async fn main(ctx: Context) -> Result<()> {
}
}

// We won't call ctx.stop() here, this program will run until you stop it with Ctrl-C
// We won't call ctx.shutdown_node() here, this program will run until you stop it with Ctrl-C
Ok(())
}
2 changes: 1 addition & 1 deletion examples/rust/get_started/examples/01-node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ async fn main(ctx: Context) -> Result<()> {
let mut node = node(ctx).await?;

// Stop the node as soon as it starts.
node.stop().await
node.shutdown().await
}
4 changes: 2 additions & 2 deletions examples/rust/get_started/examples/02-worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ async fn main(ctx: Context) -> Result<()> {
let mut node = node(ctx).await?;

// Start a worker, of type Echoer, at address "echoer"
node.start_worker("echoer", Echoer).await?;
node.start_worker("echoer", Echoer)?;

// Send a message to the worker at address "echoer".
node.send("echoer", "Hello Ockam!".to_string()).await?;
Expand All @@ -19,5 +19,5 @@ async fn main(ctx: Context) -> Result<()> {
println!("App Received: {}", reply.into_body()?); // should print "Hello Ockam!"

// Stop all workers, stop the node, cleanup and return.
node.stop().await
node.shutdown().await
}
10 changes: 5 additions & 5 deletions examples/rust/get_started/examples/03-routing-many-hops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ async fn main(ctx: Context) -> Result<()> {
let mut node = node(ctx).await?;

// Start an Echoer worker at address "echoer"
node.start_worker("echoer", Echoer).await?;
node.start_worker("echoer", Echoer)?;

// Start 3 hop workers at addresses "h1", "h2" and "h3".
node.start_worker("h1", Hop).await?;
node.start_worker("h2", Hop).await?;
node.start_worker("h3", Hop).await?;
node.start_worker("h1", Hop)?;
node.start_worker("h2", Hop)?;
node.start_worker("h3", Hop)?;

// Send a message to the echoer worker via the "h1", "h2", and "h3" workers
let r = route!["h1", "h2", "h3", "echoer"];
Expand All @@ -25,5 +25,5 @@ async fn main(ctx: Context) -> Result<()> {
println!("App Received: {}", reply.into_body()?); // should print "Hello Ockam!"

// Stop all workers, stop the node, cleanup and return.
node.stop().await
node.shutdown().await
}
6 changes: 3 additions & 3 deletions examples/rust/get_started/examples/03-routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ async fn main(ctx: Context) -> Result<()> {
let mut node = node(ctx).await?;

// Start a worker, of type Echoer, at address "echoer"
node.start_worker("echoer", Echoer).await?;
node.start_worker("echoer", Echoer)?;

// Start a worker, of type Hop, at address "h1"
node.start_worker("h1", Hop).await?;
node.start_worker("h1", Hop)?;

// Send a message to the worker at address "echoer",
// via the worker at address "h1"
Expand All @@ -23,5 +23,5 @@ async fn main(ctx: Context) -> Result<()> {
println!("App Received: {}", reply.into_body()?); // should print "Hello Ockam!"

// Stop all workers, stop the node, cleanup and return.
node.stop().await
node.shutdown().await
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ async fn main(ctx: Context) -> Result<()> {
let mut node = node(ctx).await?;

// Initialize the TCP Transport.
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;

// Create a TCP connection to a different node.
let connection_to_responder = tcp.connect("localhost:4000", TcpConnectionOptions::new()).await?;
Expand All @@ -22,5 +22,5 @@ async fn main(ctx: Context) -> Result<()> {
println!("App Received: {}", reply); // should print "Hello Ockam!"

// Stop all workers, stop the node, cleanup and return.
node.stop().await
node.shutdown().await
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@ async fn main(ctx: Context) -> Result<()> {
let node = node(ctx).await?;

// Initialize the TCP Transport
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;

// Create an echoer worker
node.start_worker("echoer", Echoer).await?;
node.start_worker("echoer", Echoer)?;

// Create a TCP listener and wait for incoming connections.
let listener = tcp.listen("127.0.0.1:4000", TcpListenerOptions::new()).await?;

// Allow access to the Echoer via TCP connections from the TCP listener
node.flow_controls().add_consumer("echoer", listener.flow_control_id());
node.flow_controls()
.add_consumer(&"echoer".into(), listener.flow_control_id());

// Don't call node.stop() here so this node runs forever.
// Don't call node.shutdown() here so this node runs forever.
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ async fn main(ctx: Context) -> Result<()> {
let mut node = node(ctx).await?;

// Initialize the TCP Transport
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;

// Create a TCP connection to the middle node.
let connection_to_middle_node = tcp.connect("localhost:3000", TcpConnectionOptions::new()).await?;
Expand All @@ -21,5 +21,5 @@ async fn main(ctx: Context) -> Result<()> {
println!("App Received: {}", reply); // should print "Hello Ockam!"

// Stop all workers, stop the node, cleanup and return.
node.stop().await
node.shutdown().await
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,21 @@ async fn main(ctx: Context) -> Result<()> {
let node = node(ctx).await?;

// Initialize the TCP Transport
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;

// Create a TCP connection to the responder node.
let connection_to_responder = tcp.connect("127.0.0.1:4000", TcpConnectionOptions::new()).await?;

// Create and start a Relay worker
node.start_worker("forward_to_responder", Relay::new(connection_to_responder))
.await?;
node.start_worker("forward_to_responder", Relay::new(connection_to_responder))?;

// Create a TCP listener and wait for incoming connections.
let listener = tcp.listen("127.0.0.1:3000", TcpListenerOptions::new()).await?;

// Allow access to the Relay via TCP connections from the TCP listener
node.flow_controls()
.add_consumer("forward_to_responder", listener.flow_control_id());
.add_consumer(&"forward_to_responder".into(), listener.flow_control_id());

// Don't call node.stop() here so this node runs forever.
// Don't call node.shutdown() here so this node runs forever.
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@ async fn main(ctx: Context) -> Result<()> {
let node = node(ctx).await?;

// Initialize the TCP Transport
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;

// Create an echoer worker
node.start_worker("echoer", Echoer).await?;
node.start_worker("echoer", Echoer)?;

// Create a TCP listener and wait for incoming connections.
let listener = tcp.listen("127.0.0.1:4000", TcpListenerOptions::new()).await?;

// Allow access to the Echoer via TCP connections from the TCP listener
node.flow_controls().add_consumer("echoer", listener.flow_control_id());
node.flow_controls()
.add_consumer(&"echoer".into(), listener.flow_control_id());

// Don't call node.stop() here so this node runs forever.
// Don't call node.shutdown() here so this node runs forever.
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ async fn main(ctx: Context) -> Result<()> {
println!("App Received: {}", reply); // should print "Hello Ockam!"

// Stop all workers, stop the node, cleanup and return.
node.stop().await
node.shutdown().await
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ async fn main(ctx: Context) -> Result<()> {
.await?;

// Create an echoer worker
node.start_worker("echoer", Echoer).await?;
node.start_worker("echoer", Echoer)?;

node.flow_controls().add_consumer("echoer", bind.flow_control_id());
node.flow_controls()
.add_consumer(&"echoer".into(), bind.flow_control_id());

// Don't call node.stop() here so this node runs forever.
// Don't call node.shutdown() here so this node runs forever.
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ async fn main(ctx: Context) -> Result<()> {
println!("App Received: {}", reply.into_body()?); // should print "Hello Ockam!"

// Stop all workers, stop the node, cleanup and return.
node.stop().await
node.shutdown().await
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ async fn main(ctx: Context) -> Result<()> {
uds.listen("/tmp/ockam-example-echoer").await?;

// Create an echoer worker
node.start_worker("echoer", Echoer).await?;
node.start_worker("echoer", Echoer)?;

// Don't call node.stop() here so this node runs forever.
// Don't call node.shutdown() here so this node runs forever.
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async fn main(ctx: Context) -> Result<()> {
let alice = node.create_identity().await?;

// Create a TCP connection to the middle node.
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;
let connection_to_middle_node = tcp.connect("localhost:3000", TcpConnectionOptions::new()).await?;

// Connect to a secure channel listener and perform a handshake.
Expand All @@ -31,5 +31,5 @@ async fn main(ctx: Context) -> Result<()> {
println!("App Received: {}", reply); // should print "Hello Ockam!"

// Stop all workers, stop the node, cleanup and return.
node.stop().await
node.shutdown().await
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,20 @@ async fn main(ctx: Context) -> Result<()> {
let node = node(ctx).await?;

// Initialize the TCP Transport
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;

// Create a TCP connection to Bob.
let connection_to_bob = tcp.connect("127.0.0.1:4000", TcpConnectionOptions::new()).await?;

// Start a Relay to forward messages to Bob using the TCP connection.
node.start_worker("forward_to_bob", Relay::new(route![connection_to_bob]))
.await?;
node.start_worker("forward_to_bob", Relay::new(route![connection_to_bob]))?;

// Create a TCP listener and wait for incoming connections.
let listener = tcp.listen("127.0.0.1:3000", TcpListenerOptions::new()).await?;

node.flow_controls()
.add_consumer("forward_to_bob", listener.flow_control_id());
.add_consumer(&"forward_to_bob".into(), listener.flow_control_id());

// Don't call node.stop() here so this node runs forever.
// Don't call node.shutdown() here so this node runs forever.
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ async fn main(ctx: Context) -> Result<()> {
let node = node(ctx).await?;

// Initialize the TCP Transport.
let tcp = node.create_tcp_transport().await?;
let tcp = node.create_tcp_transport()?;

node.start_worker("echoer", Echoer).await?;
node.start_worker("echoer", Echoer)?;

let bob = node.create_identity().await?;

Expand All @@ -23,18 +23,16 @@ async fn main(ctx: Context) -> Result<()> {

// Create a secure channel listener for Bob that will wait for requests to
// initiate an Authenticated Key Exchange.
let secure_channel_listener = node
.create_secure_channel_listener(
&bob,
"bob_listener",
SecureChannelListenerOptions::new().as_consumer(listener.flow_control_id()),
)
.await?;
let secure_channel_listener = node.create_secure_channel_listener(
&bob,
"bob_listener",
SecureChannelListenerOptions::new().as_consumer(listener.flow_control_id()),
)?;

// Allow access to the Echoer via Secure Channels
node.flow_controls()
.add_consumer("echoer", secure_channel_listener.flow_control_id());
.add_consumer(&"echoer".into(), secure_channel_listener.flow_control_id());

// Don't call node.stop() here so this node runs forever.
// Don't call node.shutdown() here so this node runs forever.
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,5 @@ async fn main(ctx: Context) -> Result<()> {
println!("App Received: {}", reply); // should print "Hello Ockam!"

// Stop all workers, stop the node, cleanup and return.
node.stop().await
node.shutdown().await
}
Loading

0 comments on commit 80e114f

Please sign in to comment.