From 0f2dd88921601627bfdddd102c3a4e29ddde3688 Mon Sep 17 00:00:00 2001 From: Jonas Bostoen Date: Tue, 17 Feb 2026 08:38:28 +0100 Subject: [PATCH 1/6] refactor: bump v, libmsg, linkem separate versioning --- Cargo.lock | 42 +++++++++---------- Cargo.toml | 7 ++-- {msg => libmsg}/Cargo.toml | 2 +- {msg => libmsg}/benches/README.md | 0 {msg => libmsg}/benches/pubsub.rs | 0 {msg => libmsg}/benches/reqrep.rs | 0 {msg => libmsg}/examples/durable.rs | 0 {msg => libmsg}/examples/ipc.rs | 0 {msg => libmsg}/examples/pubsub.rs | 0 {msg => libmsg}/examples/pubsub_auth.rs | 0 .../examples/pubsub_compression.rs | 0 {msg => libmsg}/examples/quic_vs_tcp.rs | 0 {msg => libmsg}/examples/reqrep.rs | 0 {msg => libmsg}/examples/reqrep_auth.rs | 0 {msg => libmsg}/examples/reqrep_challenge.rs | 0 .../examples/reqrep_compression.rs | 0 {msg => libmsg}/examples/reqrep_mtls.rs | 0 {msg => libmsg}/src/lib.rs | 0 linkem/Cargo.toml | 4 +- 19 files changed, 27 insertions(+), 28 deletions(-) rename {msg => libmsg}/Cargo.toml (98%) rename {msg => libmsg}/benches/README.md (100%) rename {msg => libmsg}/benches/pubsub.rs (100%) rename {msg => libmsg}/benches/reqrep.rs (100%) rename {msg => libmsg}/examples/durable.rs (100%) rename {msg => libmsg}/examples/ipc.rs (100%) rename {msg => libmsg}/examples/pubsub.rs (100%) rename {msg => libmsg}/examples/pubsub_auth.rs (100%) rename {msg => libmsg}/examples/pubsub_compression.rs (100%) rename {msg => libmsg}/examples/quic_vs_tcp.rs (100%) rename {msg => libmsg}/examples/reqrep.rs (100%) rename {msg => libmsg}/examples/reqrep_auth.rs (100%) rename {msg => libmsg}/examples/reqrep_challenge.rs (100%) rename {msg => libmsg}/examples/reqrep_compression.rs (100%) rename {msg => libmsg}/examples/reqrep_mtls.rs (100%) rename {msg => libmsg}/src/lib.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index b4a8d291..6f33e48a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -815,6 +815,27 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" +[[package]] +name = "libmsg" +version = "0.1.5" +dependencies = [ + "bytes", + "criterion", + "futures", + "msg-socket", + "msg-transport", + "msg-wire", + "openssl", + "pprof", + "rand", + "thiserror 2.0.18", + "tikv-jemallocator", + "tokio", + "tokio-stream", + "tracing", + "tracing-subscriber", +] + [[package]] name = "linkem" version = "0.1.5" @@ -917,27 +938,6 @@ dependencies = [ "windows-sys 0.61.2", ] -[[package]] -name = "msg" -version = "0.1.5" -dependencies = [ - "bytes", - "criterion", - "futures", - "msg-socket", - "msg-transport", - "msg-wire", - "openssl", - "pprof", - "rand", - "thiserror 2.0.18", - "tikv-jemallocator", - "tokio", - "tokio-stream", - "tracing", - "tracing-subscriber", -] - [[package]] name = "msg-common" version = "0.1.5" diff --git a/Cargo.toml b/Cargo.toml index e5df5350..152fce6d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] members = [ - "msg", + "libmsg", "msg-socket", "msg-wire", "msg-transport", @@ -10,9 +10,9 @@ members = [ resolver = "2" [workspace.package] -version = "0.1.5" +version = "0.1.6" edition = "2024" -rust-version = "1.86" +rust-version = "1.89" license = "MIT" description = "A flexible and lightweight messaging library for distributed systems" authors = ["Chainbound Developers "] @@ -39,7 +39,6 @@ msg-wire = { path = "./msg-wire" } msg-socket = { path = "./msg-socket" } msg-transport = { path = "./msg-transport" } msg-common = { path = "./msg-common" } -# linkem = { path = "./linkem" } # async async-trait = "0.1" diff --git a/msg/Cargo.toml b/libmsg/Cargo.toml similarity index 98% rename from msg/Cargo.toml rename to libmsg/Cargo.toml index 547028fc..25cd22e4 100644 --- a/msg/Cargo.toml +++ b/libmsg/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "msg" +name = "libmsg" description.workspace = true version.workspace = true diff --git a/msg/benches/README.md b/libmsg/benches/README.md similarity index 100% rename from msg/benches/README.md rename to libmsg/benches/README.md diff --git a/msg/benches/pubsub.rs b/libmsg/benches/pubsub.rs similarity index 100% rename from msg/benches/pubsub.rs rename to libmsg/benches/pubsub.rs diff --git a/msg/benches/reqrep.rs b/libmsg/benches/reqrep.rs similarity index 100% rename from msg/benches/reqrep.rs rename to libmsg/benches/reqrep.rs diff --git a/msg/examples/durable.rs b/libmsg/examples/durable.rs similarity index 100% rename from msg/examples/durable.rs rename to libmsg/examples/durable.rs diff --git a/msg/examples/ipc.rs b/libmsg/examples/ipc.rs similarity index 100% rename from msg/examples/ipc.rs rename to libmsg/examples/ipc.rs diff --git a/msg/examples/pubsub.rs b/libmsg/examples/pubsub.rs similarity index 100% rename from msg/examples/pubsub.rs rename to libmsg/examples/pubsub.rs diff --git a/msg/examples/pubsub_auth.rs b/libmsg/examples/pubsub_auth.rs similarity index 100% rename from msg/examples/pubsub_auth.rs rename to libmsg/examples/pubsub_auth.rs diff --git a/msg/examples/pubsub_compression.rs b/libmsg/examples/pubsub_compression.rs similarity index 100% rename from msg/examples/pubsub_compression.rs rename to libmsg/examples/pubsub_compression.rs diff --git a/msg/examples/quic_vs_tcp.rs b/libmsg/examples/quic_vs_tcp.rs similarity index 100% rename from msg/examples/quic_vs_tcp.rs rename to libmsg/examples/quic_vs_tcp.rs diff --git a/msg/examples/reqrep.rs b/libmsg/examples/reqrep.rs similarity index 100% rename from msg/examples/reqrep.rs rename to libmsg/examples/reqrep.rs diff --git a/msg/examples/reqrep_auth.rs b/libmsg/examples/reqrep_auth.rs similarity index 100% rename from msg/examples/reqrep_auth.rs rename to libmsg/examples/reqrep_auth.rs diff --git a/msg/examples/reqrep_challenge.rs b/libmsg/examples/reqrep_challenge.rs similarity index 100% rename from msg/examples/reqrep_challenge.rs rename to libmsg/examples/reqrep_challenge.rs diff --git a/msg/examples/reqrep_compression.rs b/libmsg/examples/reqrep_compression.rs similarity index 100% rename from msg/examples/reqrep_compression.rs rename to libmsg/examples/reqrep_compression.rs diff --git a/msg/examples/reqrep_mtls.rs b/libmsg/examples/reqrep_mtls.rs similarity index 100% rename from msg/examples/reqrep_mtls.rs rename to libmsg/examples/reqrep_mtls.rs diff --git a/msg/src/lib.rs b/libmsg/src/lib.rs similarity index 100% rename from msg/src/lib.rs rename to libmsg/src/lib.rs diff --git a/linkem/Cargo.toml b/linkem/Cargo.toml index d7154312..1cffca8d 100644 --- a/linkem/Cargo.toml +++ b/linkem/Cargo.toml @@ -1,10 +1,10 @@ [package] name = "linkem" -version.workspace = true +version = "0.2.0" +description = "A realistic network emulation library" edition.workspace = true rust-version.workspace = true license.workspace = true -description.workspace = true authors.workspace = true homepage.workspace = true repository.workspace = true From 4c41f3e5dd602d2340f46ef9b712b364d5404021 Mon Sep 17 00:00:00 2001 From: Jonas Bostoen Date: Tue, 17 Feb 2026 08:40:10 +0100 Subject: [PATCH 2/6] fix(libmsg): examples --- Cargo.lock | 12 ++++++------ libmsg/examples/durable.rs | 2 +- libmsg/examples/ipc.rs | 2 +- libmsg/examples/pubsub.rs | 2 +- libmsg/examples/pubsub_auth.rs | 2 +- libmsg/examples/pubsub_compression.rs | 2 +- libmsg/examples/reqrep.rs | 2 +- libmsg/examples/reqrep_auth.rs | 2 +- libmsg/examples/reqrep_challenge.rs | 2 +- libmsg/examples/reqrep_compression.rs | 2 +- 10 files changed, 15 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6f33e48a..8104a9d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -817,7 +817,7 @@ checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" [[package]] name = "libmsg" -version = "0.1.5" +version = "0.1.6" dependencies = [ "bytes", "criterion", @@ -838,7 +838,7 @@ dependencies = [ [[package]] name = "linkem" -version = "0.1.5" +version = "0.2.0" dependencies = [ "futures", "msg-socket", @@ -940,7 +940,7 @@ dependencies = [ [[package]] name = "msg-common" -version = "0.1.5" +version = "0.1.6" dependencies = [ "derive_more", "futures", @@ -951,7 +951,7 @@ dependencies = [ [[package]] name = "msg-socket" -version = "0.1.5" +version = "0.1.6" dependencies = [ "arc-swap", "bytes", @@ -974,7 +974,7 @@ dependencies = [ [[package]] name = "msg-transport" -version = "0.1.5" +version = "0.1.6" dependencies = [ "arc-swap", "async-trait", @@ -994,7 +994,7 @@ dependencies = [ [[package]] name = "msg-wire" -version = "0.1.5" +version = "0.1.6" dependencies = [ "bytes", "flate2", diff --git a/libmsg/examples/durable.rs b/libmsg/examples/durable.rs index 23b840e1..ddfdf659 100644 --- a/libmsg/examples/durable.rs +++ b/libmsg/examples/durable.rs @@ -10,7 +10,7 @@ use tokio::sync::oneshot; use tokio_stream::StreamExt; use tracing::{Instrument, error, info, info_span, instrument, warn}; -use msg::{RepSocket, ReqOptions, ReqSocket, hooks, tcp::Tcp}; +use libmsg::{RepSocket, ReqOptions, ReqSocket, hooks, tcp::Tcp}; #[instrument(name = "RepSocket")] async fn start_rep() { diff --git a/libmsg/examples/ipc.rs b/libmsg/examples/ipc.rs index fc04e6d2..18f7b0fa 100644 --- a/libmsg/examples/ipc.rs +++ b/libmsg/examples/ipc.rs @@ -3,7 +3,7 @@ use std::env::temp_dir; use bytes::Bytes; use tokio_stream::StreamExt; -use msg::{RepSocket, ReqSocket, ipc::Ipc}; +use libmsg::{RepSocket, ReqSocket, ipc::Ipc}; #[tokio::main] async fn main() { diff --git a/libmsg/examples/pubsub.rs b/libmsg/examples/pubsub.rs index a83050fb..4703c567 100644 --- a/libmsg/examples/pubsub.rs +++ b/libmsg/examples/pubsub.rs @@ -4,7 +4,7 @@ use std::time::Duration; use tokio::time::timeout; use tracing::{Instrument, info, info_span, warn}; -use msg::{PubOptions, PubSocket, SubOptions, SubSocket, tcp::Tcp}; +use libmsg::{PubOptions, PubSocket, SubOptions, SubSocket, tcp::Tcp}; #[tokio::main] async fn main() { diff --git a/libmsg/examples/pubsub_auth.rs b/libmsg/examples/pubsub_auth.rs index 551e89c5..6f0dc37d 100644 --- a/libmsg/examples/pubsub_auth.rs +++ b/libmsg/examples/pubsub_auth.rs @@ -9,7 +9,7 @@ use tokio::time::timeout; use tokio_stream::StreamExt; use tracing::{Instrument, info, info_span, warn}; -use msg::{PubSocket, SubSocket, hooks, tcp::Tcp}; +use libmsg::{PubSocket, SubSocket, hooks, tcp::Tcp}; #[tokio::main] async fn main() { diff --git a/libmsg/examples/pubsub_compression.rs b/libmsg/examples/pubsub_compression.rs index 98c26dc1..e688a296 100644 --- a/libmsg/examples/pubsub_compression.rs +++ b/libmsg/examples/pubsub_compression.rs @@ -4,7 +4,7 @@ use tokio::time::timeout; use tokio_stream::StreamExt; use tracing::{Instrument, info, info_span, warn}; -use msg::{PubSocket, SubSocket, compression::GzipCompressor, tcp::Tcp}; +use libmsg::{PubSocket, SubSocket, compression::GzipCompressor, tcp::Tcp}; #[tokio::main] async fn main() { diff --git a/libmsg/examples/reqrep.rs b/libmsg/examples/reqrep.rs index c73bf8f6..963cd185 100644 --- a/libmsg/examples/reqrep.rs +++ b/libmsg/examples/reqrep.rs @@ -1,7 +1,7 @@ use bytes::Bytes; use tokio_stream::StreamExt; -use msg::{RepSocket, ReqOptions, ReqSocket, tcp::Tcp}; +use libmsg::{RepSocket, ReqOptions, ReqSocket, tcp::Tcp}; #[tokio::main] async fn main() { diff --git a/libmsg/examples/reqrep_auth.rs b/libmsg/examples/reqrep_auth.rs index a8f4fbad..6e3cdc53 100644 --- a/libmsg/examples/reqrep_auth.rs +++ b/libmsg/examples/reqrep_auth.rs @@ -6,7 +6,7 @@ use bytes::Bytes; use tokio_stream::StreamExt; -use msg::{RepSocket, ReqSocket, hooks, tcp::Tcp}; +use libmsg::{RepSocket, ReqSocket, hooks, tcp::Tcp}; #[tokio::main] async fn main() { diff --git a/libmsg/examples/reqrep_challenge.rs b/libmsg/examples/reqrep_challenge.rs index db8203c2..b3683b80 100644 --- a/libmsg/examples/reqrep_challenge.rs +++ b/libmsg/examples/reqrep_challenge.rs @@ -9,7 +9,7 @@ use bytes::Bytes; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio_stream::StreamExt; -use msg::{ +use libmsg::{ RepSocket, ReqSocket, hooks::{ConnectionHook, Error, HookResult}, tcp::Tcp, diff --git a/libmsg/examples/reqrep_compression.rs b/libmsg/examples/reqrep_compression.rs index f77b9cd4..8b3cb40a 100644 --- a/libmsg/examples/reqrep_compression.rs +++ b/libmsg/examples/reqrep_compression.rs @@ -3,7 +3,7 @@ use msg_socket::{RepOptions, ReqOptions}; use msg_wire::compression::GzipCompressor; use tokio_stream::StreamExt; -use msg::{RepSocket, ReqSocket, tcp::Tcp}; +use libmsg::{RepSocket, ReqSocket, tcp::Tcp}; #[tokio::main] async fn main() { From 6bd5793ffc6dadf133d76bc53c25b59c0b4e5bae Mon Sep 17 00:00:00 2001 From: Jonas Bostoen Date: Tue, 17 Feb 2026 08:43:25 +0100 Subject: [PATCH 3/6] fix(libmsg): examples --- libmsg/examples/quic_vs_tcp.rs | 2 +- libmsg/examples/reqrep_mtls.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/libmsg/examples/quic_vs_tcp.rs b/libmsg/examples/quic_vs_tcp.rs index 3b8e0a44..4feb3989 100644 --- a/libmsg/examples/quic_vs_tcp.rs +++ b/libmsg/examples/quic_vs_tcp.rs @@ -4,7 +4,7 @@ use msg_transport::{Transport, quic::Quic}; use std::time::{Duration, Instant}; use tracing::info; -use msg::{Address, PubOptions, PubSocket, SubOptions, SubSocket, tcp::Tcp}; +use libmsg::{Address, PubOptions, PubSocket, SubOptions, SubSocket, tcp::Tcp}; #[tokio::main] async fn main() { diff --git a/libmsg/examples/reqrep_mtls.rs b/libmsg/examples/reqrep_mtls.rs index 4a96d1ad..7763d954 100644 --- a/libmsg/examples/reqrep_mtls.rs +++ b/libmsg/examples/reqrep_mtls.rs @@ -2,7 +2,7 @@ //! mutual TLS (mTLS). use bytes::Bytes; -use msg::{ +use libmsg::{ RepSocket, ReqSocket, tcp_tls::{self, TcpTls}, }; From d007057ed63a925888df93bc8b458aa8b4c86331 Mon Sep 17 00:00:00 2001 From: Jonas Bostoen Date: Tue, 17 Feb 2026 08:44:15 +0100 Subject: [PATCH 4/6] chore: clippy --- msg-socket/src/connection/backoff.rs | 5 ++--- msg-socket/src/pub/driver.rs | 5 ++--- msg-socket/src/pub/session.rs | 8 +++----- msg-socket/src/pub/socket.rs | 5 ++--- msg-socket/src/pub/trie.rs | 5 ++--- msg-socket/src/rep/driver.rs | 18 +++++++----------- msg-socket/src/req/conn_manager.rs | 5 ++--- msg-socket/src/req/driver.rs | 13 +++++-------- 8 files changed, 25 insertions(+), 39 deletions(-) diff --git a/msg-socket/src/connection/backoff.rs b/msg-socket/src/connection/backoff.rs index fbca13e3..9d6e0e9d 100644 --- a/msg-socket/src/connection/backoff.rs +++ b/msg-socket/src/connection/backoff.rs @@ -68,11 +68,10 @@ impl Stream for ExponentialBackoff { this.retry_count += 1; // Close the stream - if let Some(max_retries) = this.max_retries { - if this.retry_count >= max_retries { + if let Some(max_retries) = this.max_retries + && this.retry_count >= max_retries { return Poll::Ready(None); } - } this.reset_timeout(); diff --git a/msg-socket/src/pub/driver.rs b/msg-socket/src/pub/driver.rs index 931ba85f..752b8fdf 100644 --- a/msg-socket/src/pub/driver.rs +++ b/msg-socket/src/pub/driver.rs @@ -113,12 +113,11 @@ where // Finally, poll the transport for new incoming connection futures and push them to the // incoming connection tasks. if let Poll::Ready(accept) = Pin::new(&mut this.transport).poll_accept(cx) { - if let Some(max) = this.options.max_clients { - if this.state.stats.specific.active_clients() >= max { + if let Some(max) = this.options.max_clients + && this.state.stats.specific.active_clients() >= max { warn!("Max connections reached ({}), rejecting incoming connection", max); continue; } - } // Increment the active clients counter. If the hook fails, // this counter will be decremented. diff --git a/msg-socket/src/pub/session.rs b/msg-socket/src/pub/session.rs index d3711a6d..7244567b 100644 --- a/msg-socket/src/pub/session.rs +++ b/msg-socket/src/pub/session.rs @@ -175,16 +175,14 @@ impl Future for SubscriberSession { } } - if let Some(ref mut linger_timer) = this.linger_timer { - if !this.conn.write_buffer().is_empty() && linger_timer.poll_tick(cx).is_ready() { - if let Poll::Ready(Err(e)) = this.conn.poll_flush_unpin(cx) { + if let Some(ref mut linger_timer) = this.linger_timer + && !this.conn.write_buffer().is_empty() && linger_timer.poll_tick(cx).is_ready() + && let Poll::Ready(Err(e)) = this.conn.poll_flush_unpin(cx) { tracing::error!(err = ?e, "Failed to flush connection"); let _ = this.conn.poll_close_unpin(cx); // End this stream as we can't send any more messages return Poll::Ready(()); } - } - } // Handle incoming messages from the socket if let Poll::Ready(item) = this.conn.poll_next_unpin(cx) { diff --git a/msg-socket/src/pub/socket.rs b/msg-socket/src/pub/socket.rs index ca7331f7..f7fc7fc2 100644 --- a/msg-socket/src/pub/socket.rs +++ b/msg-socket/src/pub/socket.rs @@ -170,12 +170,11 @@ where // Compression is only done if the message is larger than the // configured minimum payload size. let len_before = msg.payload().len(); - if len_before > self.options.min_compress_size { - if let Some(ref compressor) = self.compressor { + if len_before > self.options.min_compress_size + && let Some(ref compressor) = self.compressor { msg.compress(compressor.as_ref())?; trace!("Compressed message from {} to {} bytes", len_before, msg.payload().len()); } - } // Broadcast the message directly to all active sessions. if self.to_sessions_bcast.as_ref().ok_or(PubError::SocketClosed)?.send(msg).is_err() { diff --git a/msg-socket/src/pub/trie.rs b/msg-socket/src/pub/trie.rs index ae9c9bec..7c05cd9e 100644 --- a/msg-socket/src/pub/trie.rs +++ b/msg-socket/src/pub/trie.rs @@ -63,12 +63,11 @@ impl PrefixTrie { } let token = tokens.remove(0); - if let Entry::Occupied(mut entry) = current.children.entry(token.to_string()) { - if Self::inner_remove(entry.get_mut(), tokens) { + if let Entry::Occupied(mut entry) = current.children.entry(token.to_string()) + && Self::inner_remove(entry.get_mut(), tokens) { entry.remove_entry(); return current.children.is_empty() && !current.topic_end; } - } false } diff --git a/msg-socket/src/rep/driver.rs b/msg-socket/src/rep/driver.rs index ca73d53b..4d3b1104 100644 --- a/msg-socket/src/rep/driver.rs +++ b/msg-socket/src/rep/driver.rs @@ -344,12 +344,11 @@ impl PeerState { debug!(has_pending = ?pending_msg.is_some(), write_buffer_size = ?buffer_size, "found data to send"); - if let Some(msg) = pending_msg { - if let Err(e) = self.conn.start_send_unpin(msg.inner) { + if let Some(msg) = pending_msg + && let Err(e) = self.conn.start_send_unpin(msg.inner) { error!(?e, "failed to send final message to socket, closing"); return Poll::Ready(()); } - } if let Err(e) = ready!(self.conn.poll_flush_unpin(cx)) { error!(?e, "failed to flush on shutdown, giving up"); @@ -399,18 +398,16 @@ impl Stream for PeerState } } - if let Some(ref mut linger_timer) = this.linger_timer { - if !this.conn.write_buffer().is_empty() && linger_timer.poll_tick(cx).is_ready() { - if let Poll::Ready(Err(e)) = this.conn.poll_flush_unpin(cx) { + if let Some(ref mut linger_timer) = this.linger_timer + && !this.conn.write_buffer().is_empty() && linger_timer.poll_tick(cx).is_ready() + && let Poll::Ready(Err(e)) = this.conn.poll_flush_unpin(cx) { error!(err = ?e, peer = ?this.addr, "failed to flush connection, closing..."); return Poll::Ready(None); } - } - } // Check for completed requests, and set pending_egress (only if empty). - if this.pending_egress.is_none() { - if let Poll::Ready(Some(result)) = this.pending_requests.poll_next_unpin(cx).enter() + if this.pending_egress.is_none() + && let Poll::Ready(Some(result)) = this.pending_requests.poll_next_unpin(cx).enter() { match result.inner { Err(_) => tracing::error!("response channel closed unexpectedly"), @@ -446,7 +443,6 @@ impl Stream for PeerState } } } - } // Accept incoming requests from the peer. // Only accept new requests if we're under the HWM for pending responses. diff --git a/msg-socket/src/req/conn_manager.rs b/msg-socket/src/req/conn_manager.rs index 6edeb994..58d7b1b3 100644 --- a/msg-socket/src/req/conn_manager.rs +++ b/msg-socket/src/req/conn_manager.rs @@ -152,8 +152,8 @@ where ) -> Poll>> { loop { // Poll the active connection task, if any - if let Some(ref mut conn_task) = self.conn_task { - if let Poll::Ready(result) = conn_task.poll_unpin(cx).enter() { + if let Some(ref mut conn_task) = self.conn_task + && let Poll::Ready(result) = conn_task.poll_unpin(cx).enter() { // As soon as the connection task finishes, set it to `None`. // - If it was successful, set the connection to active // - If it failed, it will be re-tried until the backoff limit is reached. @@ -172,7 +172,6 @@ where } } } - } // If the connection is inactive, try to connect to the server or poll the backoff // timer if we're already trying to connect. diff --git a/msg-socket/src/req/driver.rs b/msg-socket/src/req/driver.rs index 259fffc1..e4cef14f 100644 --- a/msg-socket/src/req/driver.rs +++ b/msg-socket/src/req/driver.rs @@ -112,8 +112,8 @@ where // Compress the message if it's larger than the minimum size let size_before = message.payload().len(); - if size_before > self.options.min_compress_size { - if let Some(ref compressor) = self.compressor { + if size_before > self.options.min_compress_size + && let Some(ref compressor) = self.compressor { let start = Instant::now(); if let Err(e) = message.compress(compressor.as_ref()) { tracing::error!(?e, "failed to compress message"); @@ -126,7 +126,6 @@ where "compressed message", ); } - } let msg = message.inner.into_wire(self.id_counter); let msg_id = msg.id(); @@ -248,14 +247,12 @@ where } // Flush if we have some data and `linger_timer` is ready - if let Some(ref mut linger_timer) = this.linger_timer { - if !channel.write_buffer().is_empty() && linger_timer.poll_tick(cx).is_ready() { - if let Poll::Ready(Err(e)) = channel.poll_flush_unpin(cx) { + if let Some(ref mut linger_timer) = this.linger_timer + && !channel.write_buffer().is_empty() && linger_timer.poll_tick(cx).is_ready() + && let Poll::Ready(Err(e)) = channel.poll_flush_unpin(cx) { tracing::error!(err = ?e, "Failed to flush connection"); this.conn_manager.reset_connection(); } - } - } // Check for request timeouts while this.timeout_check_interval.poll_tick(cx).is_ready() { From f55215c6fdc05946d93e9281de9fbfea9a62a483 Mon Sep 17 00:00:00 2001 From: Jonas Bostoen Date: Tue, 17 Feb 2026 08:46:18 +0100 Subject: [PATCH 5/6] chore: fmt --- msg-socket/src/connection/backoff.rs | 9 +-- msg-socket/src/pub/driver.rs | 11 ++-- msg-socket/src/pub/session.rs | 18 +++--- msg-socket/src/pub/socket.rs | 11 ++-- msg-socket/src/pub/trie.rs | 11 ++-- msg-socket/src/rep/driver.rs | 85 ++++++++++++++-------------- msg-socket/src/req/conn_manager.rs | 37 ++++++------ msg-socket/src/req/driver.rs | 41 +++++++------- 8 files changed, 118 insertions(+), 105 deletions(-) diff --git a/msg-socket/src/connection/backoff.rs b/msg-socket/src/connection/backoff.rs index 9d6e0e9d..1aa05410 100644 --- a/msg-socket/src/connection/backoff.rs +++ b/msg-socket/src/connection/backoff.rs @@ -68,10 +68,11 @@ impl Stream for ExponentialBackoff { this.retry_count += 1; // Close the stream - if let Some(max_retries) = this.max_retries - && this.retry_count >= max_retries { - return Poll::Ready(None); - } + if let Some(max_retries) = this.max_retries && + this.retry_count >= max_retries + { + return Poll::Ready(None); + } this.reset_timeout(); diff --git a/msg-socket/src/pub/driver.rs b/msg-socket/src/pub/driver.rs index 752b8fdf..6d4a9cac 100644 --- a/msg-socket/src/pub/driver.rs +++ b/msg-socket/src/pub/driver.rs @@ -113,11 +113,12 @@ where // Finally, poll the transport for new incoming connection futures and push them to the // incoming connection tasks. if let Poll::Ready(accept) = Pin::new(&mut this.transport).poll_accept(cx) { - if let Some(max) = this.options.max_clients - && this.state.stats.specific.active_clients() >= max { - warn!("Max connections reached ({}), rejecting incoming connection", max); - continue; - } + if let Some(max) = this.options.max_clients && + this.state.stats.specific.active_clients() >= max + { + warn!("Max connections reached ({}), rejecting incoming connection", max); + continue; + } // Increment the active clients counter. If the hook fails, // this counter will be decremented. diff --git a/msg-socket/src/pub/session.rs b/msg-socket/src/pub/session.rs index 7244567b..64d230aa 100644 --- a/msg-socket/src/pub/session.rs +++ b/msg-socket/src/pub/session.rs @@ -175,14 +175,16 @@ impl Future for SubscriberSession { } } - if let Some(ref mut linger_timer) = this.linger_timer - && !this.conn.write_buffer().is_empty() && linger_timer.poll_tick(cx).is_ready() - && let Poll::Ready(Err(e)) = this.conn.poll_flush_unpin(cx) { - tracing::error!(err = ?e, "Failed to flush connection"); - let _ = this.conn.poll_close_unpin(cx); - // End this stream as we can't send any more messages - return Poll::Ready(()); - } + if let Some(ref mut linger_timer) = this.linger_timer && + !this.conn.write_buffer().is_empty() && + linger_timer.poll_tick(cx).is_ready() && + let Poll::Ready(Err(e)) = this.conn.poll_flush_unpin(cx) + { + tracing::error!(err = ?e, "Failed to flush connection"); + let _ = this.conn.poll_close_unpin(cx); + // End this stream as we can't send any more messages + return Poll::Ready(()); + } // Handle incoming messages from the socket if let Poll::Ready(item) = this.conn.poll_next_unpin(cx) { diff --git a/msg-socket/src/pub/socket.rs b/msg-socket/src/pub/socket.rs index f7fc7fc2..5982943e 100644 --- a/msg-socket/src/pub/socket.rs +++ b/msg-socket/src/pub/socket.rs @@ -170,11 +170,12 @@ where // Compression is only done if the message is larger than the // configured minimum payload size. let len_before = msg.payload().len(); - if len_before > self.options.min_compress_size - && let Some(ref compressor) = self.compressor { - msg.compress(compressor.as_ref())?; - trace!("Compressed message from {} to {} bytes", len_before, msg.payload().len()); - } + if len_before > self.options.min_compress_size && + let Some(ref compressor) = self.compressor + { + msg.compress(compressor.as_ref())?; + trace!("Compressed message from {} to {} bytes", len_before, msg.payload().len()); + } // Broadcast the message directly to all active sessions. if self.to_sessions_bcast.as_ref().ok_or(PubError::SocketClosed)?.send(msg).is_err() { diff --git a/msg-socket/src/pub/trie.rs b/msg-socket/src/pub/trie.rs index 7c05cd9e..3c6fd570 100644 --- a/msg-socket/src/pub/trie.rs +++ b/msg-socket/src/pub/trie.rs @@ -63,11 +63,12 @@ impl PrefixTrie { } let token = tokens.remove(0); - if let Entry::Occupied(mut entry) = current.children.entry(token.to_string()) - && Self::inner_remove(entry.get_mut(), tokens) { - entry.remove_entry(); - return current.children.is_empty() && !current.topic_end; - } + if let Entry::Occupied(mut entry) = current.children.entry(token.to_string()) && + Self::inner_remove(entry.get_mut(), tokens) + { + entry.remove_entry(); + return current.children.is_empty() && !current.topic_end; + } false } diff --git a/msg-socket/src/rep/driver.rs b/msg-socket/src/rep/driver.rs index 4d3b1104..66ece2de 100644 --- a/msg-socket/src/rep/driver.rs +++ b/msg-socket/src/rep/driver.rs @@ -344,11 +344,12 @@ impl PeerState { debug!(has_pending = ?pending_msg.is_some(), write_buffer_size = ?buffer_size, "found data to send"); - if let Some(msg) = pending_msg - && let Err(e) = self.conn.start_send_unpin(msg.inner) { - error!(?e, "failed to send final message to socket, closing"); - return Poll::Ready(()); - } + if let Some(msg) = pending_msg && + let Err(e) = self.conn.start_send_unpin(msg.inner) + { + error!(?e, "failed to send final message to socket, closing"); + return Poll::Ready(()); + } if let Err(e) = ready!(self.conn.poll_flush_unpin(cx)) { error!(?e, "failed to flush on shutdown, giving up"); @@ -398,51 +399,53 @@ impl Stream for PeerState } } - if let Some(ref mut linger_timer) = this.linger_timer - && !this.conn.write_buffer().is_empty() && linger_timer.poll_tick(cx).is_ready() - && let Poll::Ready(Err(e)) = this.conn.poll_flush_unpin(cx) { - error!(err = ?e, peer = ?this.addr, "failed to flush connection, closing..."); - return Poll::Ready(None); - } + if let Some(ref mut linger_timer) = this.linger_timer && + !this.conn.write_buffer().is_empty() && + linger_timer.poll_tick(cx).is_ready() && + let Poll::Ready(Err(e)) = this.conn.poll_flush_unpin(cx) + { + error!(err = ?e, peer = ?this.addr, "failed to flush connection, closing..."); + return Poll::Ready(None); + } // Check for completed requests, and set pending_egress (only if empty). - if this.pending_egress.is_none() - && let Poll::Ready(Some(result)) = this.pending_requests.poll_next_unpin(cx).enter() - { - match result.inner { - Err(_) => tracing::error!("response channel closed unexpectedly"), - Ok(Response { msg_id, mut response }) => { - let mut compression_type = 0; - let len_before = response.len(); - if let Some(ref compressor) = this.compressor { - match compressor.compress(&response) { - Ok(compressed) => { - response = compressed; - compression_type = compressor.compression_type() as u8; - } - Err(e) => { - error!(?e, "failed to compress message"); - continue; - } + if this.pending_egress.is_none() && + let Poll::Ready(Some(result)) = this.pending_requests.poll_next_unpin(cx).enter() + { + match result.inner { + Err(_) => tracing::error!("response channel closed unexpectedly"), + Ok(Response { msg_id, mut response }) => { + let mut compression_type = 0; + let len_before = response.len(); + if let Some(ref compressor) = this.compressor { + match compressor.compress(&response) { + Ok(compressed) => { + response = compressed; + compression_type = compressor.compression_type() as u8; + } + Err(e) => { + error!(?e, "failed to compress message"); + continue; } - - debug!( - msg_id, - len_before, - len_after = response.len(), - "compressed message" - ) } - debug!(msg_id, "received response to send"); + debug!( + msg_id, + len_before, + len_after = response.len(), + "compressed message" + ) + } - let msg = reqrep::Message::new(msg_id, compression_type, response); - this.pending_egress = Some(msg.with_span(result.span)); + debug!(msg_id, "received response to send"); - continue; - } + let msg = reqrep::Message::new(msg_id, compression_type, response); + this.pending_egress = Some(msg.with_span(result.span)); + + continue; } } + } // Accept incoming requests from the peer. // Only accept new requests if we're under the HWM for pending responses. diff --git a/msg-socket/src/req/conn_manager.rs b/msg-socket/src/req/conn_manager.rs index 58d7b1b3..73ad6b1c 100644 --- a/msg-socket/src/req/conn_manager.rs +++ b/msg-socket/src/req/conn_manager.rs @@ -152,26 +152,27 @@ where ) -> Poll>> { loop { // Poll the active connection task, if any - if let Some(ref mut conn_task) = self.conn_task - && let Poll::Ready(result) = conn_task.poll_unpin(cx).enter() { - // As soon as the connection task finishes, set it to `None`. - // - If it was successful, set the connection to active - // - If it failed, it will be re-tried until the backoff limit is reached. - self.conn_task = None; - - match result.inner { - Ok(io) => { - tracing::info!("connected"); - - let metered = MeteredIo::new(io, self.transport_stats.clone()); - let framed = Framed::new(metered, reqrep::Codec::new()); - self.conn_ctl = ConnectionState::Active { channel: framed }; - } - Err(e) => { - tracing::error!(?e, "failed to connect"); - } + if let Some(ref mut conn_task) = self.conn_task && + let Poll::Ready(result) = conn_task.poll_unpin(cx).enter() + { + // As soon as the connection task finishes, set it to `None`. + // - If it was successful, set the connection to active + // - If it failed, it will be re-tried until the backoff limit is reached. + self.conn_task = None; + + match result.inner { + Ok(io) => { + tracing::info!("connected"); + + let metered = MeteredIo::new(io, self.transport_stats.clone()); + let framed = Framed::new(metered, reqrep::Codec::new()); + self.conn_ctl = ConnectionState::Active { channel: framed }; + } + Err(e) => { + tracing::error!(?e, "failed to connect"); } } + } // If the connection is inactive, try to connect to the server or poll the backoff // timer if we're already trying to connect. diff --git a/msg-socket/src/req/driver.rs b/msg-socket/src/req/driver.rs index e4cef14f..e76973c1 100644 --- a/msg-socket/src/req/driver.rs +++ b/msg-socket/src/req/driver.rs @@ -112,21 +112,22 @@ where // Compress the message if it's larger than the minimum size let size_before = message.payload().len(); - if size_before > self.options.min_compress_size - && let Some(ref compressor) = self.compressor { - let start = Instant::now(); - if let Err(e) = message.compress(compressor.as_ref()) { - tracing::error!(?e, "failed to compress message"); - } - - tracing::debug!( - size_before, - size_after = message.payload().len(), - elapsed = ?start.elapsed(), - "compressed message", - ); + if size_before > self.options.min_compress_size && + let Some(ref compressor) = self.compressor + { + let start = Instant::now(); + if let Err(e) = message.compress(compressor.as_ref()) { + tracing::error!(?e, "failed to compress message"); } + tracing::debug!( + size_before, + size_after = message.payload().len(), + elapsed = ?start.elapsed(), + "compressed message", + ); + } + let msg = message.inner.into_wire(self.id_counter); let msg_id = msg.id(); self.id_counter = self.id_counter.wrapping_add(1); @@ -247,12 +248,14 @@ where } // Flush if we have some data and `linger_timer` is ready - if let Some(ref mut linger_timer) = this.linger_timer - && !channel.write_buffer().is_empty() && linger_timer.poll_tick(cx).is_ready() - && let Poll::Ready(Err(e)) = channel.poll_flush_unpin(cx) { - tracing::error!(err = ?e, "Failed to flush connection"); - this.conn_manager.reset_connection(); - } + if let Some(ref mut linger_timer) = this.linger_timer && + !channel.write_buffer().is_empty() && + linger_timer.poll_tick(cx).is_ready() && + let Poll::Ready(Err(e)) = channel.poll_flush_unpin(cx) + { + tracing::error!(err = ?e, "Failed to flush connection"); + this.conn_manager.reset_connection(); + } // Check for request timeouts while this.timeout_check_interval.poll_tick(cx).is_ready() { From 41081d838f1c71bbe7a03ea524974a65b64e7fe6 Mon Sep 17 00:00:00 2001 From: Jonas Bostoen Date: Tue, 17 Feb 2026 08:48:28 +0100 Subject: [PATCH 6/6] fix(libmsg): benchers --- libmsg/benches/pubsub.rs | 2 +- libmsg/benches/reqrep.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/libmsg/benches/pubsub.rs b/libmsg/benches/pubsub.rs index 2fee6fca..3d2d55fa 100644 --- a/libmsg/benches/pubsub.rs +++ b/libmsg/benches/pubsub.rs @@ -4,7 +4,6 @@ use criterion::{ measurement::WallTime, }; use futures::StreamExt; -use msg::{Address, ipc::Ipc}; use pprof::criterion::{Output, PProfProfiler}; use rand::Rng; use std::{ @@ -13,6 +12,7 @@ use std::{ }; use tokio::runtime::Runtime; +use libmsg::{Address, ipc::Ipc}; use msg_socket::{PubOptions, PubSocket, SubOptions, SubSocket}; use msg_transport::{Transport, quic::Quic, tcp::Tcp}; diff --git a/libmsg/benches/reqrep.rs b/libmsg/benches/reqrep.rs index e69f116a..93922484 100644 --- a/libmsg/benches/reqrep.rs +++ b/libmsg/benches/reqrep.rs @@ -8,15 +8,15 @@ use criterion::{ use futures::StreamExt; use pprof::criterion::Output; use rand::Rng; +use tokio::runtime::Runtime; -use msg::{ +use libmsg::{ Address, Profile, RepOptions, Transport, ipc::Ipc, tcp_tls::{TcpTls, config}, }; use msg_socket::{RepSocket, ReqOptions, ReqSocket}; use msg_transport::tcp::Tcp; -use tokio::runtime::Runtime; const N_REQS: usize = 10_000; const PAR_FACTOR: usize = 2048;