An ultra-low latency, disk-backed persistent message queue written in native Rust, inspired by Chronicle Queue.
khata-rs is designed for latency-sensitive applications that require durable message storage with near-nanosecond read/write performance. It uses memory-mapped I/O and lock-free data structures to achieve ultra-low latency while persisting all messages to disk, while also abiding by the Single Producer Multiple Consumer (SPMC) paradigm.
- Ultra-low latency - 3-50 nanoseconds per operation using memory-mapped I/O
- Lock-free writes - No locks on the hot path for writers
- Zero-copy reads - Readers get direct references to memory-mapped data
- Time-based rolling - Automatic file rotation (daily, hourly, or custom intervals)
- Data integrity - Optional SIMD-accelerated CRC-16 checksums
Add to your Cargo.toml:
[dependencies]
khata-rs = "0.1"use khata_rs::Queue;
fn main() -> khata_rs::Result<()> {
// Create or open a queue
let queue = Queue::new("/tmp/my-queue").build()?;
// Write messages
{
let mut writer = queue.writer()?;
let index = writer.write(b"Hello, World!")?;
println!("Written at index: {index:#x}");
}
// Read messages
{
let mut reader = queue.reader()?;
reader.rewind()?;
while let Some(data) = reader.read()? {
println!("Read: {}", String::from_utf8_lossy(data));
}
}
Ok(())
}Use the builder pattern to customize queue behavior:
use khata_rs::{Queue, RollCycle::FastHourly};
let queue = Queue::new("/tmp/queue")
.roll_cycle(FastHourly) // New file every hour
.block_size(256 * 1024 * 1024) // 256 MB memory blocks
.checksums(true) // Enable CRC-16 verification
.index_spacing(256) // Index every 256th message
.index_count(4096) // Index array size
.read_only(false) // Read-write mode
.build()?;| Cycle | Duration | Max Messages | Use Case |
|---|---|---|---|
FastDaily |
24 hours | ~4.3 billion | General purpose (default) |
FastHourly |
1 hour | ~4.3 billion | Higher granularity |
HalfHourly |
30 min | ~4.3 billion | Moderate frequency |
TenMinutely |
10 min | ~4.3 billion | High frequency |
FiveMinutely |
5 min | ~1 billion | Very high frequency |
The main entry point. Thread-safe (Send + Sync).
let queue = Queue::new(path).build()?;
let writer = queue.writer()?; // One per thread
let reader = queue.reader()?; // Multiple allowedAppends messages with automatic cycle rolling.
let mut writer = queue.writer()?;
// Write and get index
let index = writer.write(b"message")?;
// Explicit flush (also happens on drop)
writer.flush()?;Consumes messages with flexible positioning.
let mut reader = queue.reader()?;
// Position at start
reader.rewind()?;
// Sequential read
while let Some(data) = reader.read()? {
// Process data
}
// Random access
reader.seek(index)?;
// Zero-copy read
reader.read_with(|data| {
// Process without copying
})?;
// Bidirectional
reader.set_direction(Direction::Backward);Designed for scenarios requiring both persistence and low latency:
| Operation | Latency |
|---|---|
| Append | ~3-50 ns |
| Sequential read | Sub-microsecond |
| Random access | O(1) for indexed messages |
Run benchmarks:
cargo bench
# or
cargo run --release --bin benchmarkQueue
├── Writer (single-producer per thread)
│ └── Store (memory-mapped cycle file)
│ └── Two-level lock-free index
└── Reader (multiple consumers)
└── Position cache for O(1) sequential reads
Messages are stored with an 8-byte header (4-byte length + 2-byte CRC + 2-byte padding) followed by the payload. Each message receives a unique 64-bit index encoding the cycle and sequence number.
Apache-2.0