diff --git a/Cargo.lock b/Cargo.lock index b4a8d291..8104a9d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -815,9 +815,30 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" +[[package]] +name = "libmsg" +version = "0.1.6" +dependencies = [ + "bytes", + "criterion", + "futures", + "msg-socket", + "msg-transport", + "msg-wire", + "openssl", + "pprof", + "rand", + "thiserror 2.0.18", + "tikv-jemallocator", + "tokio", + "tokio-stream", + "tracing", + "tracing-subscriber", +] + [[package]] name = "linkem" -version = "0.1.5" +version = "0.2.0" dependencies = [ "futures", "msg-socket", @@ -917,30 +938,9 @@ dependencies = [ "windows-sys 0.61.2", ] -[[package]] -name = "msg" -version = "0.1.5" -dependencies = [ - "bytes", - "criterion", - "futures", - "msg-socket", - "msg-transport", - "msg-wire", - "openssl", - "pprof", - "rand", - "thiserror 2.0.18", - "tikv-jemallocator", - "tokio", - "tokio-stream", - "tracing", - "tracing-subscriber", -] - [[package]] name = "msg-common" -version = "0.1.5" +version = "0.1.6" dependencies = [ "derive_more", "futures", @@ -951,7 +951,7 @@ dependencies = [ [[package]] name = "msg-socket" -version = "0.1.5" +version = "0.1.6" dependencies = [ "arc-swap", "bytes", @@ -974,7 +974,7 @@ dependencies = [ [[package]] name = "msg-transport" -version = "0.1.5" +version = "0.1.6" dependencies = [ "arc-swap", "async-trait", @@ -994,7 +994,7 @@ dependencies = [ [[package]] name = "msg-wire" -version = "0.1.5" +version = "0.1.6" dependencies = [ "bytes", "flate2", diff --git a/Cargo.toml b/Cargo.toml index e5df5350..152fce6d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] members = [ - "msg", + "libmsg", "msg-socket", "msg-wire", "msg-transport", @@ -10,9 +10,9 @@ members = [ resolver = "2" [workspace.package] -version = "0.1.5" +version = "0.1.6" edition = "2024" -rust-version = "1.86" +rust-version = "1.89" license = "MIT" description = "A flexible and lightweight messaging library for distributed systems" authors = ["Chainbound Developers "] @@ -39,7 +39,6 @@ msg-wire = { path = "./msg-wire" } msg-socket = { path = "./msg-socket" } msg-transport = { path = "./msg-transport" } msg-common = { path = "./msg-common" } -# linkem = { path = "./linkem" } # async async-trait = "0.1" diff --git a/msg/Cargo.toml b/libmsg/Cargo.toml similarity index 98% rename from msg/Cargo.toml rename to libmsg/Cargo.toml index 547028fc..25cd22e4 100644 --- a/msg/Cargo.toml +++ b/libmsg/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "msg" +name = "libmsg" description.workspace = true version.workspace = true diff --git a/msg/benches/README.md b/libmsg/benches/README.md similarity index 100% rename from msg/benches/README.md rename to libmsg/benches/README.md diff --git a/msg/benches/pubsub.rs b/libmsg/benches/pubsub.rs similarity index 99% rename from msg/benches/pubsub.rs rename to libmsg/benches/pubsub.rs index 2fee6fca..3d2d55fa 100644 --- a/msg/benches/pubsub.rs +++ b/libmsg/benches/pubsub.rs @@ -4,7 +4,6 @@ use criterion::{ measurement::WallTime, }; use futures::StreamExt; -use msg::{Address, ipc::Ipc}; use pprof::criterion::{Output, PProfProfiler}; use rand::Rng; use std::{ @@ -13,6 +12,7 @@ use std::{ }; use tokio::runtime::Runtime; +use libmsg::{Address, ipc::Ipc}; use msg_socket::{PubOptions, PubSocket, SubOptions, SubSocket}; use msg_transport::{Transport, quic::Quic, tcp::Tcp}; diff --git a/msg/benches/reqrep.rs b/libmsg/benches/reqrep.rs similarity index 99% rename from msg/benches/reqrep.rs rename to libmsg/benches/reqrep.rs index e69f116a..93922484 100644 --- a/msg/benches/reqrep.rs +++ b/libmsg/benches/reqrep.rs @@ -8,15 +8,15 @@ use criterion::{ use futures::StreamExt; use pprof::criterion::Output; use rand::Rng; +use tokio::runtime::Runtime; -use msg::{ +use libmsg::{ Address, Profile, RepOptions, Transport, ipc::Ipc, tcp_tls::{TcpTls, config}, }; use msg_socket::{RepSocket, ReqOptions, ReqSocket}; use msg_transport::tcp::Tcp; -use tokio::runtime::Runtime; const N_REQS: usize = 10_000; const PAR_FACTOR: usize = 2048; diff --git a/msg/examples/durable.rs b/libmsg/examples/durable.rs similarity index 98% rename from msg/examples/durable.rs rename to libmsg/examples/durable.rs index 23b840e1..ddfdf659 100644 --- a/msg/examples/durable.rs +++ b/libmsg/examples/durable.rs @@ -10,7 +10,7 @@ use tokio::sync::oneshot; use tokio_stream::StreamExt; use tracing::{Instrument, error, info, info_span, instrument, warn}; -use msg::{RepSocket, ReqOptions, ReqSocket, hooks, tcp::Tcp}; +use libmsg::{RepSocket, ReqOptions, ReqSocket, hooks, tcp::Tcp}; #[instrument(name = "RepSocket")] async fn start_rep() { diff --git a/msg/examples/ipc.rs b/libmsg/examples/ipc.rs similarity index 96% rename from msg/examples/ipc.rs rename to libmsg/examples/ipc.rs index fc04e6d2..18f7b0fa 100644 --- a/msg/examples/ipc.rs +++ b/libmsg/examples/ipc.rs @@ -3,7 +3,7 @@ use std::env::temp_dir; use bytes::Bytes; use tokio_stream::StreamExt; -use msg::{RepSocket, ReqSocket, ipc::Ipc}; +use libmsg::{RepSocket, ReqSocket, ipc::Ipc}; #[tokio::main] async fn main() { diff --git a/msg/examples/pubsub.rs b/libmsg/examples/pubsub.rs similarity index 97% rename from msg/examples/pubsub.rs rename to libmsg/examples/pubsub.rs index a83050fb..4703c567 100644 --- a/msg/examples/pubsub.rs +++ b/libmsg/examples/pubsub.rs @@ -4,7 +4,7 @@ use std::time::Duration; use tokio::time::timeout; use tracing::{Instrument, info, info_span, warn}; -use msg::{PubOptions, PubSocket, SubOptions, SubSocket, tcp::Tcp}; +use libmsg::{PubOptions, PubSocket, SubOptions, SubSocket, tcp::Tcp}; #[tokio::main] async fn main() { diff --git a/msg/examples/pubsub_auth.rs b/libmsg/examples/pubsub_auth.rs similarity index 98% rename from msg/examples/pubsub_auth.rs rename to libmsg/examples/pubsub_auth.rs index 551e89c5..6f0dc37d 100644 --- a/msg/examples/pubsub_auth.rs +++ b/libmsg/examples/pubsub_auth.rs @@ -9,7 +9,7 @@ use tokio::time::timeout; use tokio_stream::StreamExt; use tracing::{Instrument, info, info_span, warn}; -use msg::{PubSocket, SubSocket, hooks, tcp::Tcp}; +use libmsg::{PubSocket, SubSocket, hooks, tcp::Tcp}; #[tokio::main] async fn main() { diff --git a/msg/examples/pubsub_compression.rs b/libmsg/examples/pubsub_compression.rs similarity index 97% rename from msg/examples/pubsub_compression.rs rename to libmsg/examples/pubsub_compression.rs index 98c26dc1..e688a296 100644 --- a/msg/examples/pubsub_compression.rs +++ b/libmsg/examples/pubsub_compression.rs @@ -4,7 +4,7 @@ use tokio::time::timeout; use tokio_stream::StreamExt; use tracing::{Instrument, info, info_span, warn}; -use msg::{PubSocket, SubSocket, compression::GzipCompressor, tcp::Tcp}; +use libmsg::{PubSocket, SubSocket, compression::GzipCompressor, tcp::Tcp}; #[tokio::main] async fn main() { diff --git a/msg/examples/quic_vs_tcp.rs b/libmsg/examples/quic_vs_tcp.rs similarity index 97% rename from msg/examples/quic_vs_tcp.rs rename to libmsg/examples/quic_vs_tcp.rs index 3b8e0a44..4feb3989 100644 --- a/msg/examples/quic_vs_tcp.rs +++ b/libmsg/examples/quic_vs_tcp.rs @@ -4,7 +4,7 @@ use msg_transport::{Transport, quic::Quic}; use std::time::{Duration, Instant}; use tracing::info; -use msg::{Address, PubOptions, PubSocket, SubOptions, SubSocket, tcp::Tcp}; +use libmsg::{Address, PubOptions, PubSocket, SubOptions, SubSocket, tcp::Tcp}; #[tokio::main] async fn main() { diff --git a/msg/examples/reqrep.rs b/libmsg/examples/reqrep.rs similarity index 95% rename from msg/examples/reqrep.rs rename to libmsg/examples/reqrep.rs index c73bf8f6..963cd185 100644 --- a/msg/examples/reqrep.rs +++ b/libmsg/examples/reqrep.rs @@ -1,7 +1,7 @@ use bytes::Bytes; use tokio_stream::StreamExt; -use msg::{RepSocket, ReqOptions, ReqSocket, tcp::Tcp}; +use libmsg::{RepSocket, ReqOptions, ReqSocket, tcp::Tcp}; #[tokio::main] async fn main() { diff --git a/msg/examples/reqrep_auth.rs b/libmsg/examples/reqrep_auth.rs similarity index 96% rename from msg/examples/reqrep_auth.rs rename to libmsg/examples/reqrep_auth.rs index a8f4fbad..6e3cdc53 100644 --- a/msg/examples/reqrep_auth.rs +++ b/libmsg/examples/reqrep_auth.rs @@ -6,7 +6,7 @@ use bytes::Bytes; use tokio_stream::StreamExt; -use msg::{RepSocket, ReqSocket, hooks, tcp::Tcp}; +use libmsg::{RepSocket, ReqSocket, hooks, tcp::Tcp}; #[tokio::main] async fn main() { diff --git a/msg/examples/reqrep_challenge.rs b/libmsg/examples/reqrep_challenge.rs similarity index 99% rename from msg/examples/reqrep_challenge.rs rename to libmsg/examples/reqrep_challenge.rs index db8203c2..b3683b80 100644 --- a/msg/examples/reqrep_challenge.rs +++ b/libmsg/examples/reqrep_challenge.rs @@ -9,7 +9,7 @@ use bytes::Bytes; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio_stream::StreamExt; -use msg::{ +use libmsg::{ RepSocket, ReqSocket, hooks::{ConnectionHook, Error, HookResult}, tcp::Tcp, diff --git a/msg/examples/reqrep_compression.rs b/libmsg/examples/reqrep_compression.rs similarity index 97% rename from msg/examples/reqrep_compression.rs rename to libmsg/examples/reqrep_compression.rs index f77b9cd4..8b3cb40a 100644 --- a/msg/examples/reqrep_compression.rs +++ b/libmsg/examples/reqrep_compression.rs @@ -3,7 +3,7 @@ use msg_socket::{RepOptions, ReqOptions}; use msg_wire::compression::GzipCompressor; use tokio_stream::StreamExt; -use msg::{RepSocket, ReqSocket, tcp::Tcp}; +use libmsg::{RepSocket, ReqSocket, tcp::Tcp}; #[tokio::main] async fn main() { diff --git a/msg/examples/reqrep_mtls.rs b/libmsg/examples/reqrep_mtls.rs similarity index 99% rename from msg/examples/reqrep_mtls.rs rename to libmsg/examples/reqrep_mtls.rs index 4a96d1ad..7763d954 100644 --- a/msg/examples/reqrep_mtls.rs +++ b/libmsg/examples/reqrep_mtls.rs @@ -2,7 +2,7 @@ //! mutual TLS (mTLS). use bytes::Bytes; -use msg::{ +use libmsg::{ RepSocket, ReqSocket, tcp_tls::{self, TcpTls}, }; diff --git a/msg/src/lib.rs b/libmsg/src/lib.rs similarity index 100% rename from msg/src/lib.rs rename to libmsg/src/lib.rs diff --git a/linkem/Cargo.toml b/linkem/Cargo.toml index d7154312..1cffca8d 100644 --- a/linkem/Cargo.toml +++ b/linkem/Cargo.toml @@ -1,10 +1,10 @@ [package] name = "linkem" -version.workspace = true +version = "0.2.0" +description = "A realistic network emulation library" edition.workspace = true rust-version.workspace = true license.workspace = true -description.workspace = true authors.workspace = true homepage.workspace = true repository.workspace = true diff --git a/msg-socket/src/connection/backoff.rs b/msg-socket/src/connection/backoff.rs index fbca13e3..1aa05410 100644 --- a/msg-socket/src/connection/backoff.rs +++ b/msg-socket/src/connection/backoff.rs @@ -68,10 +68,10 @@ impl Stream for ExponentialBackoff { this.retry_count += 1; // Close the stream - if let Some(max_retries) = this.max_retries { - if this.retry_count >= max_retries { - return Poll::Ready(None); - } + if let Some(max_retries) = this.max_retries && + this.retry_count >= max_retries + { + return Poll::Ready(None); } this.reset_timeout(); diff --git a/msg-socket/src/pub/driver.rs b/msg-socket/src/pub/driver.rs index 931ba85f..6d4a9cac 100644 --- a/msg-socket/src/pub/driver.rs +++ b/msg-socket/src/pub/driver.rs @@ -113,11 +113,11 @@ where // Finally, poll the transport for new incoming connection futures and push them to the // incoming connection tasks. if let Poll::Ready(accept) = Pin::new(&mut this.transport).poll_accept(cx) { - if let Some(max) = this.options.max_clients { - if this.state.stats.specific.active_clients() >= max { - warn!("Max connections reached ({}), rejecting incoming connection", max); - continue; - } + if let Some(max) = this.options.max_clients && + this.state.stats.specific.active_clients() >= max + { + warn!("Max connections reached ({}), rejecting incoming connection", max); + continue; } // Increment the active clients counter. If the hook fails, diff --git a/msg-socket/src/pub/session.rs b/msg-socket/src/pub/session.rs index d3711a6d..64d230aa 100644 --- a/msg-socket/src/pub/session.rs +++ b/msg-socket/src/pub/session.rs @@ -175,15 +175,15 @@ impl Future for SubscriberSession { } } - if let Some(ref mut linger_timer) = this.linger_timer { - if !this.conn.write_buffer().is_empty() && linger_timer.poll_tick(cx).is_ready() { - if let Poll::Ready(Err(e)) = this.conn.poll_flush_unpin(cx) { - tracing::error!(err = ?e, "Failed to flush connection"); - let _ = this.conn.poll_close_unpin(cx); - // End this stream as we can't send any more messages - return Poll::Ready(()); - } - } + if let Some(ref mut linger_timer) = this.linger_timer && + !this.conn.write_buffer().is_empty() && + linger_timer.poll_tick(cx).is_ready() && + let Poll::Ready(Err(e)) = this.conn.poll_flush_unpin(cx) + { + tracing::error!(err = ?e, "Failed to flush connection"); + let _ = this.conn.poll_close_unpin(cx); + // End this stream as we can't send any more messages + return Poll::Ready(()); } // Handle incoming messages from the socket diff --git a/msg-socket/src/pub/socket.rs b/msg-socket/src/pub/socket.rs index ca7331f7..5982943e 100644 --- a/msg-socket/src/pub/socket.rs +++ b/msg-socket/src/pub/socket.rs @@ -170,11 +170,11 @@ where // Compression is only done if the message is larger than the // configured minimum payload size. let len_before = msg.payload().len(); - if len_before > self.options.min_compress_size { - if let Some(ref compressor) = self.compressor { - msg.compress(compressor.as_ref())?; - trace!("Compressed message from {} to {} bytes", len_before, msg.payload().len()); - } + if len_before > self.options.min_compress_size && + let Some(ref compressor) = self.compressor + { + msg.compress(compressor.as_ref())?; + trace!("Compressed message from {} to {} bytes", len_before, msg.payload().len()); } // Broadcast the message directly to all active sessions. diff --git a/msg-socket/src/pub/trie.rs b/msg-socket/src/pub/trie.rs index ae9c9bec..3c6fd570 100644 --- a/msg-socket/src/pub/trie.rs +++ b/msg-socket/src/pub/trie.rs @@ -63,11 +63,11 @@ impl PrefixTrie { } let token = tokens.remove(0); - if let Entry::Occupied(mut entry) = current.children.entry(token.to_string()) { - if Self::inner_remove(entry.get_mut(), tokens) { - entry.remove_entry(); - return current.children.is_empty() && !current.topic_end; - } + if let Entry::Occupied(mut entry) = current.children.entry(token.to_string()) && + Self::inner_remove(entry.get_mut(), tokens) + { + entry.remove_entry(); + return current.children.is_empty() && !current.topic_end; } false diff --git a/msg-socket/src/rep/driver.rs b/msg-socket/src/rep/driver.rs index ca73d53b..66ece2de 100644 --- a/msg-socket/src/rep/driver.rs +++ b/msg-socket/src/rep/driver.rs @@ -344,11 +344,11 @@ impl PeerState { debug!(has_pending = ?pending_msg.is_some(), write_buffer_size = ?buffer_size, "found data to send"); - if let Some(msg) = pending_msg { - if let Err(e) = self.conn.start_send_unpin(msg.inner) { - error!(?e, "failed to send final message to socket, closing"); - return Poll::Ready(()); - } + if let Some(msg) = pending_msg && + let Err(e) = self.conn.start_send_unpin(msg.inner) + { + error!(?e, "failed to send final message to socket, closing"); + return Poll::Ready(()); } if let Err(e) = ready!(self.conn.poll_flush_unpin(cx)) { @@ -399,51 +399,50 @@ impl Stream for PeerState } } - if let Some(ref mut linger_timer) = this.linger_timer { - if !this.conn.write_buffer().is_empty() && linger_timer.poll_tick(cx).is_ready() { - if let Poll::Ready(Err(e)) = this.conn.poll_flush_unpin(cx) { - error!(err = ?e, peer = ?this.addr, "failed to flush connection, closing..."); - return Poll::Ready(None); - } - } + if let Some(ref mut linger_timer) = this.linger_timer && + !this.conn.write_buffer().is_empty() && + linger_timer.poll_tick(cx).is_ready() && + let Poll::Ready(Err(e)) = this.conn.poll_flush_unpin(cx) + { + error!(err = ?e, peer = ?this.addr, "failed to flush connection, closing..."); + return Poll::Ready(None); } // Check for completed requests, and set pending_egress (only if empty). - if this.pending_egress.is_none() { - if let Poll::Ready(Some(result)) = this.pending_requests.poll_next_unpin(cx).enter() - { - match result.inner { - Err(_) => tracing::error!("response channel closed unexpectedly"), - Ok(Response { msg_id, mut response }) => { - let mut compression_type = 0; - let len_before = response.len(); - if let Some(ref compressor) = this.compressor { - match compressor.compress(&response) { - Ok(compressed) => { - response = compressed; - compression_type = compressor.compression_type() as u8; - } - Err(e) => { - error!(?e, "failed to compress message"); - continue; - } + if this.pending_egress.is_none() && + let Poll::Ready(Some(result)) = this.pending_requests.poll_next_unpin(cx).enter() + { + match result.inner { + Err(_) => tracing::error!("response channel closed unexpectedly"), + Ok(Response { msg_id, mut response }) => { + let mut compression_type = 0; + let len_before = response.len(); + if let Some(ref compressor) = this.compressor { + match compressor.compress(&response) { + Ok(compressed) => { + response = compressed; + compression_type = compressor.compression_type() as u8; + } + Err(e) => { + error!(?e, "failed to compress message"); + continue; } - - debug!( - msg_id, - len_before, - len_after = response.len(), - "compressed message" - ) } - debug!(msg_id, "received response to send"); + debug!( + msg_id, + len_before, + len_after = response.len(), + "compressed message" + ) + } - let msg = reqrep::Message::new(msg_id, compression_type, response); - this.pending_egress = Some(msg.with_span(result.span)); + debug!(msg_id, "received response to send"); - continue; - } + let msg = reqrep::Message::new(msg_id, compression_type, response); + this.pending_egress = Some(msg.with_span(result.span)); + + continue; } } } diff --git a/msg-socket/src/req/conn_manager.rs b/msg-socket/src/req/conn_manager.rs index 6edeb994..73ad6b1c 100644 --- a/msg-socket/src/req/conn_manager.rs +++ b/msg-socket/src/req/conn_manager.rs @@ -152,24 +152,24 @@ where ) -> Poll>> { loop { // Poll the active connection task, if any - if let Some(ref mut conn_task) = self.conn_task { - if let Poll::Ready(result) = conn_task.poll_unpin(cx).enter() { - // As soon as the connection task finishes, set it to `None`. - // - If it was successful, set the connection to active - // - If it failed, it will be re-tried until the backoff limit is reached. - self.conn_task = None; - - match result.inner { - Ok(io) => { - tracing::info!("connected"); - - let metered = MeteredIo::new(io, self.transport_stats.clone()); - let framed = Framed::new(metered, reqrep::Codec::new()); - self.conn_ctl = ConnectionState::Active { channel: framed }; - } - Err(e) => { - tracing::error!(?e, "failed to connect"); - } + if let Some(ref mut conn_task) = self.conn_task && + let Poll::Ready(result) = conn_task.poll_unpin(cx).enter() + { + // As soon as the connection task finishes, set it to `None`. + // - If it was successful, set the connection to active + // - If it failed, it will be re-tried until the backoff limit is reached. + self.conn_task = None; + + match result.inner { + Ok(io) => { + tracing::info!("connected"); + + let metered = MeteredIo::new(io, self.transport_stats.clone()); + let framed = Framed::new(metered, reqrep::Codec::new()); + self.conn_ctl = ConnectionState::Active { channel: framed }; + } + Err(e) => { + tracing::error!(?e, "failed to connect"); } } } diff --git a/msg-socket/src/req/driver.rs b/msg-socket/src/req/driver.rs index 259fffc1..e76973c1 100644 --- a/msg-socket/src/req/driver.rs +++ b/msg-socket/src/req/driver.rs @@ -112,20 +112,20 @@ where // Compress the message if it's larger than the minimum size let size_before = message.payload().len(); - if size_before > self.options.min_compress_size { - if let Some(ref compressor) = self.compressor { - let start = Instant::now(); - if let Err(e) = message.compress(compressor.as_ref()) { - tracing::error!(?e, "failed to compress message"); - } - - tracing::debug!( - size_before, - size_after = message.payload().len(), - elapsed = ?start.elapsed(), - "compressed message", - ); + if size_before > self.options.min_compress_size && + let Some(ref compressor) = self.compressor + { + let start = Instant::now(); + if let Err(e) = message.compress(compressor.as_ref()) { + tracing::error!(?e, "failed to compress message"); } + + tracing::debug!( + size_before, + size_after = message.payload().len(), + elapsed = ?start.elapsed(), + "compressed message", + ); } let msg = message.inner.into_wire(self.id_counter); @@ -248,13 +248,13 @@ where } // Flush if we have some data and `linger_timer` is ready - if let Some(ref mut linger_timer) = this.linger_timer { - if !channel.write_buffer().is_empty() && linger_timer.poll_tick(cx).is_ready() { - if let Poll::Ready(Err(e)) = channel.poll_flush_unpin(cx) { - tracing::error!(err = ?e, "Failed to flush connection"); - this.conn_manager.reset_connection(); - } - } + if let Some(ref mut linger_timer) = this.linger_timer && + !channel.write_buffer().is_empty() && + linger_timer.poll_tick(cx).is_ready() && + let Poll::Ready(Err(e)) = channel.poll_flush_unpin(cx) + { + tracing::error!(err = ?e, "Failed to flush connection"); + this.conn_manager.reset_connection(); } // Check for request timeouts