Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 26 additions & 26 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[workspace]
members = [
"msg",
"libmsg",
"msg-socket",
"msg-wire",
"msg-transport",
Expand All @@ -10,9 +10,9 @@ members = [
resolver = "2"

[workspace.package]
version = "0.1.5"
version = "0.1.6"
edition = "2024"
rust-version = "1.86"
rust-version = "1.89"
license = "MIT"
description = "A flexible and lightweight messaging library for distributed systems"
authors = ["Chainbound Developers <dev@chainbound.io>"]
Expand All @@ -39,7 +39,6 @@ msg-wire = { path = "./msg-wire" }
msg-socket = { path = "./msg-socket" }
msg-transport = { path = "./msg-transport" }
msg-common = { path = "./msg-common" }
# linkem = { path = "./linkem" }

# async
async-trait = "0.1"
Expand Down
2 changes: 1 addition & 1 deletion msg/Cargo.toml → libmsg/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "msg"
name = "libmsg"

description.workspace = true
version.workspace = true
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion msg/benches/pubsub.rs → libmsg/benches/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use criterion::{
measurement::WallTime,
};
use futures::StreamExt;
use msg::{Address, ipc::Ipc};
use pprof::criterion::{Output, PProfProfiler};
use rand::Rng;
use std::{
Expand All @@ -13,6 +12,7 @@ use std::{
};
use tokio::runtime::Runtime;

use libmsg::{Address, ipc::Ipc};
use msg_socket::{PubOptions, PubSocket, SubOptions, SubSocket};
use msg_transport::{Transport, quic::Quic, tcp::Tcp};

Expand Down
4 changes: 2 additions & 2 deletions msg/benches/reqrep.rs → libmsg/benches/reqrep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ use criterion::{
use futures::StreamExt;
use pprof::criterion::Output;
use rand::Rng;
use tokio::runtime::Runtime;

use msg::{
use libmsg::{
Address, Profile, RepOptions, Transport,
ipc::Ipc,
tcp_tls::{TcpTls, config},
};
use msg_socket::{RepSocket, ReqOptions, ReqSocket};
use msg_transport::tcp::Tcp;
use tokio::runtime::Runtime;

const N_REQS: usize = 10_000;
const PAR_FACTOR: usize = 2048;
Expand Down
2 changes: 1 addition & 1 deletion msg/examples/durable.rs → libmsg/examples/durable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tokio::sync::oneshot;
use tokio_stream::StreamExt;
use tracing::{Instrument, error, info, info_span, instrument, warn};

use msg::{RepSocket, ReqOptions, ReqSocket, hooks, tcp::Tcp};
use libmsg::{RepSocket, ReqOptions, ReqSocket, hooks, tcp::Tcp};

#[instrument(name = "RepSocket")]
async fn start_rep() {
Expand Down
2 changes: 1 addition & 1 deletion msg/examples/ipc.rs → libmsg/examples/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::env::temp_dir;
use bytes::Bytes;
use tokio_stream::StreamExt;

use msg::{RepSocket, ReqSocket, ipc::Ipc};
use libmsg::{RepSocket, ReqSocket, ipc::Ipc};

#[tokio::main]
async fn main() {
Expand Down
2 changes: 1 addition & 1 deletion msg/examples/pubsub.rs → libmsg/examples/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::time::Duration;
use tokio::time::timeout;
use tracing::{Instrument, info, info_span, warn};

use msg::{PubOptions, PubSocket, SubOptions, SubSocket, tcp::Tcp};
use libmsg::{PubOptions, PubSocket, SubOptions, SubSocket, tcp::Tcp};

#[tokio::main]
async fn main() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tokio::time::timeout;
use tokio_stream::StreamExt;
use tracing::{Instrument, info, info_span, warn};

use msg::{PubSocket, SubSocket, hooks, tcp::Tcp};
use libmsg::{PubSocket, SubSocket, hooks, tcp::Tcp};

#[tokio::main]
async fn main() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use tokio::time::timeout;
use tokio_stream::StreamExt;
use tracing::{Instrument, info, info_span, warn};

use msg::{PubSocket, SubSocket, compression::GzipCompressor, tcp::Tcp};
use libmsg::{PubSocket, SubSocket, compression::GzipCompressor, tcp::Tcp};

#[tokio::main]
async fn main() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use msg_transport::{Transport, quic::Quic};
use std::time::{Duration, Instant};
use tracing::info;

use msg::{Address, PubOptions, PubSocket, SubOptions, SubSocket, tcp::Tcp};
use libmsg::{Address, PubOptions, PubSocket, SubOptions, SubSocket, tcp::Tcp};

#[tokio::main]
async fn main() {
Expand Down
2 changes: 1 addition & 1 deletion msg/examples/reqrep.rs → libmsg/examples/reqrep.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use bytes::Bytes;
use tokio_stream::StreamExt;

use msg::{RepSocket, ReqOptions, ReqSocket, tcp::Tcp};
use libmsg::{RepSocket, ReqOptions, ReqSocket, tcp::Tcp};

#[tokio::main]
async fn main() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use bytes::Bytes;
use tokio_stream::StreamExt;

use msg::{RepSocket, ReqSocket, hooks, tcp::Tcp};
use libmsg::{RepSocket, ReqSocket, hooks, tcp::Tcp};

#[tokio::main]
async fn main() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use bytes::Bytes;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio_stream::StreamExt;

use msg::{
use libmsg::{
RepSocket, ReqSocket,
hooks::{ConnectionHook, Error, HookResult},
tcp::Tcp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use msg_socket::{RepOptions, ReqOptions};
use msg_wire::compression::GzipCompressor;
use tokio_stream::StreamExt;

use msg::{RepSocket, ReqSocket, tcp::Tcp};
use libmsg::{RepSocket, ReqSocket, tcp::Tcp};

#[tokio::main]
async fn main() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! mutual TLS (mTLS).

use bytes::Bytes;
use msg::{
use libmsg::{
RepSocket, ReqSocket,
tcp_tls::{self, TcpTls},
};
Expand Down
File renamed without changes.
4 changes: 2 additions & 2 deletions linkem/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
[package]
name = "linkem"
version.workspace = true
version = "0.2.0"
description = "A realistic network emulation library"
edition.workspace = true
rust-version.workspace = true
license.workspace = true
description.workspace = true
authors.workspace = true
homepage.workspace = true
repository.workspace = true
Expand Down
8 changes: 4 additions & 4 deletions msg-socket/src/connection/backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ impl Stream for ExponentialBackoff {
this.retry_count += 1;

// Close the stream
if let Some(max_retries) = this.max_retries {
if this.retry_count >= max_retries {
return Poll::Ready(None);
}
if let Some(max_retries) = this.max_retries &&
this.retry_count >= max_retries
{
return Poll::Ready(None);
}

this.reset_timeout();
Expand Down
10 changes: 5 additions & 5 deletions msg-socket/src/pub/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,11 @@ where
// Finally, poll the transport for new incoming connection futures and push them to the
// incoming connection tasks.
if let Poll::Ready(accept) = Pin::new(&mut this.transport).poll_accept(cx) {
if let Some(max) = this.options.max_clients {
if this.state.stats.specific.active_clients() >= max {
warn!("Max connections reached ({}), rejecting incoming connection", max);
continue;
}
if let Some(max) = this.options.max_clients &&
this.state.stats.specific.active_clients() >= max
{
warn!("Max connections reached ({}), rejecting incoming connection", max);
continue;
}

// Increment the active clients counter. If the hook fails,
Expand Down
18 changes: 9 additions & 9 deletions msg-socket/src/pub/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,15 @@ impl<Io: AsyncRead + AsyncWrite + Unpin> Future for SubscriberSession<Io> {
}
}

if let Some(ref mut linger_timer) = this.linger_timer {
if !this.conn.write_buffer().is_empty() && linger_timer.poll_tick(cx).is_ready() {
if let Poll::Ready(Err(e)) = this.conn.poll_flush_unpin(cx) {
tracing::error!(err = ?e, "Failed to flush connection");
let _ = this.conn.poll_close_unpin(cx);
// End this stream as we can't send any more messages
return Poll::Ready(());
}
}
if let Some(ref mut linger_timer) = this.linger_timer &&
!this.conn.write_buffer().is_empty() &&
linger_timer.poll_tick(cx).is_ready() &&
let Poll::Ready(Err(e)) = this.conn.poll_flush_unpin(cx)
{
tracing::error!(err = ?e, "Failed to flush connection");
let _ = this.conn.poll_close_unpin(cx);
// End this stream as we can't send any more messages
return Poll::Ready(());
}

// Handle incoming messages from the socket
Expand Down
10 changes: 5 additions & 5 deletions msg-socket/src/pub/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,11 @@ where
// Compression is only done if the message is larger than the
// configured minimum payload size.
let len_before = msg.payload().len();
if len_before > self.options.min_compress_size {
if let Some(ref compressor) = self.compressor {
msg.compress(compressor.as_ref())?;
trace!("Compressed message from {} to {} bytes", len_before, msg.payload().len());
}
if len_before > self.options.min_compress_size &&
let Some(ref compressor) = self.compressor
{
msg.compress(compressor.as_ref())?;
trace!("Compressed message from {} to {} bytes", len_before, msg.payload().len());
}

// Broadcast the message directly to all active sessions.
Expand Down
10 changes: 5 additions & 5 deletions msg-socket/src/pub/trie.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ impl PrefixTrie {
}

let token = tokens.remove(0);
if let Entry::Occupied(mut entry) = current.children.entry(token.to_string()) {
if Self::inner_remove(entry.get_mut(), tokens) {
entry.remove_entry();
return current.children.is_empty() && !current.topic_end;
}
if let Entry::Occupied(mut entry) = current.children.entry(token.to_string()) &&
Self::inner_remove(entry.get_mut(), tokens)
{
entry.remove_entry();
return current.children.is_empty() && !current.topic_end;
}

false
Expand Down
Loading
Loading