Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions crates/azoth-core/src/config/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ pub struct CanonicalConfig {
/// `LockTimeout` error instead of blocking indefinitely.
#[serde(default = "default_lock_timeout")]
pub lock_timeout_ms: u64,

/// Maximum size of a single event payload in bytes (default: 4MB)
#[serde(default = "default_event_max_size")]
pub event_max_size_bytes: usize,

/// Maximum total size for a single event batch append in bytes (default: 64MB)
#[serde(default = "default_event_batch_max_bytes")]
pub event_batch_max_bytes: usize,
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
Expand Down Expand Up @@ -178,6 +186,14 @@ fn default_lock_timeout() -> u64 {
5000
}

fn default_event_max_size() -> usize {
4 * 1024 * 1024
}

fn default_event_batch_max_bytes() -> usize {
64 * 1024 * 1024
}

impl CanonicalConfig {
pub fn new(path: PathBuf) -> Self {
Self {
Expand All @@ -193,6 +209,8 @@ impl CanonicalConfig {
preflight_cache_ttl_secs: default_preflight_cache_ttl(),
read_pool: ReadPoolConfig::default(),
lock_timeout_ms: default_lock_timeout(),
event_max_size_bytes: default_event_max_size(),
event_batch_max_bytes: default_event_batch_max_bytes(),
}
}

Expand Down Expand Up @@ -246,4 +264,16 @@ impl CanonicalConfig {
self.lock_timeout_ms = timeout_ms;
self
}

/// Set maximum event payload size in bytes.
pub fn with_event_max_size(mut self, size_bytes: usize) -> Self {
self.event_max_size_bytes = size_bytes;
self
}

/// Set maximum event batch size in bytes.
pub fn with_event_batch_max_bytes(mut self, size_bytes: usize) -> Self {
self.event_batch_max_bytes = size_bytes;
self
}
}
72 changes: 68 additions & 4 deletions crates/azoth-file-log/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,23 @@ pub struct FileEventLogConfig {

/// Maximum size for batch write buffer (bytes)
pub batch_buffer_size: usize,

/// Maximum size of a single event payload (bytes)
pub max_event_size: usize,

/// Maximum total size for a single append batch (bytes)
pub max_batch_bytes: usize,
}

impl Default for FileEventLogConfig {
fn default() -> Self {
Self {
base_dir: PathBuf::from("./data/event-log"),
max_file_size: 512 * 1024 * 1024, // 512MB
write_buffer_size: 256 * 1024, // 256KB (increased from 64KB)
batch_buffer_size: 1024 * 1024, // 1MB for batch writes
max_file_size: 512 * 1024 * 1024, // 512MB
write_buffer_size: 256 * 1024, // 256KB (increased from 64KB)
batch_buffer_size: 1024 * 1024, // 1MB for batch writes
max_event_size: 4 * 1024 * 1024, // 4MB single-event limit
max_batch_bytes: 64 * 1024 * 1024, // 64MB batch limit
}
}
}
Expand Down Expand Up @@ -123,6 +131,21 @@ impl FileEventLog {
///
/// Format: [event_id: u64][size: u32][data: bytes]
fn write_event_entry(&self, event_id: EventId, event_bytes: &[u8]) -> Result<()> {
if event_bytes.len() > self.config.max_event_size {
return Err(AzothError::InvalidState(format!(
"Event size {} exceeds max_event_size {}",
event_bytes.len(),
self.config.max_event_size
)));
}

if event_bytes.len() > u32::MAX as usize {
return Err(AzothError::InvalidState(format!(
"Event size {} exceeds u32 encoding limit",
event_bytes.len()
)));
}

let mut writer = self.writer.lock().unwrap();

// Write event_id (8 bytes, big-endian)
Expand Down Expand Up @@ -231,6 +254,29 @@ impl EventLog for FileEventLog {
.map(|e| 8 + 4 + e.len()) // event_id + size + data
.sum();

if total_size > self.config.max_batch_bytes {
return Err(AzothError::InvalidState(format!(
"Batch size {} exceeds max_batch_bytes {}",
total_size, self.config.max_batch_bytes
)));
}

for event in events {
if event.len() > self.config.max_event_size {
return Err(AzothError::InvalidState(format!(
"Event size {} exceeds max_event_size {}",
event.len(),
self.config.max_event_size
)));
}
if event.len() > u32::MAX as usize {
return Err(AzothError::InvalidState(format!(
"Event size {} exceeds u32 encoding limit",
event.len()
)));
}
}

// Check if batch exceeds reasonable size
if total_size > self.config.batch_buffer_size {
// For very large batches, fall back to individual writes
Expand Down Expand Up @@ -309,6 +355,7 @@ impl EventLog for FileEventLog {
start,
end_id,
meta.current_file_num,
self.config.max_event_size,
)?))
}

Expand Down Expand Up @@ -411,17 +458,25 @@ struct FileEventLogIter {
current_file: Option<File>,
next_event_id: EventId,
end_event_id: EventId,
max_event_size: usize,
}

impl FileEventLogIter {
fn new(base_dir: PathBuf, start: EventId, end: EventId, max_file_num: u64) -> Result<Self> {
fn new(
base_dir: PathBuf,
start: EventId,
end: EventId,
max_file_num: u64,
max_event_size: usize,
) -> Result<Self> {
let mut iter = Self {
base_dir,
current_file_num: 0,
max_file_num,
current_file: None,
next_event_id: start,
end_event_id: end,
max_event_size,
};

// Open first file
Expand Down Expand Up @@ -475,6 +530,13 @@ impl FileEventLogIter {
let event_id = u64::from_be_bytes(header[0..8].try_into().unwrap());
let size = u32::from_be_bytes(header[8..12].try_into().unwrap());

if size as usize > self.max_event_size {
return Err(AzothError::InvalidState(format!(
"Event {} size {} exceeds max_event_size {}",
event_id, size, self.max_event_size
)));
}

// Read event data
let mut data = vec![0u8; size as usize];
file.read_exact(&mut data)?;
Expand Down Expand Up @@ -516,6 +578,8 @@ mod tests {
max_file_size: 1024, // Small for testing rotation
write_buffer_size: 128,
batch_buffer_size: 4096,
max_event_size: 1024 * 1024,
max_batch_bytes: 16 * 1024 * 1024,
};
let log = FileEventLog::open(config).unwrap();
(log, temp_dir)
Expand Down
1 change: 1 addition & 0 deletions crates/azoth-lmdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ tokio = { workspace = true, features = ["time", "sync"] }
serde_json = { workspace = true }
chrono = { workspace = true }
dashmap = { workspace = true }
parking_lot = { workspace = true }

[dev-dependencies]
tempfile = { workspace = true }
Expand Down
43 changes: 19 additions & 24 deletions crates/azoth-lmdb/src/preflight_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
//! - Invalidation on transaction commit for modified keys

use dashmap::DashMap;
use parking_lot::Mutex;
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::{Duration, Instant};

Expand Down Expand Up @@ -42,14 +44,14 @@ impl CacheEntry {
/// Thread-safe in-memory cache for preflight reads.
///
/// Uses DashMap for lock-free concurrent access and tracks insertion order
/// for LRU eviction.
/// with an O(1) bounded eviction queue.
pub struct PreflightCache {
cache: Arc<DashMap<Vec<u8>, CacheEntry>>,
capacity: usize,
ttl: Duration,
enabled: bool,
/// Track insertion order for LRU eviction (key, inserted_at)
lru_tracker: Arc<DashMap<Vec<u8>, Instant>>,
/// FIFO queue used to bound cache size without O(n) scans.
eviction_queue: Arc<Mutex<VecDeque<Vec<u8>>>>,
}

impl PreflightCache {
Expand All @@ -65,7 +67,7 @@ impl PreflightCache {
capacity,
ttl: Duration::from_secs(ttl_secs),
enabled,
lru_tracker: Arc::new(DashMap::with_capacity(capacity)),
eviction_queue: Arc::new(Mutex::new(VecDeque::with_capacity(capacity))),
}
}

Expand All @@ -89,7 +91,6 @@ impl PreflightCache {
if entry.is_expired(self.ttl) {
drop(entry); // Release read lock
self.cache.remove(key);
self.lru_tracker.remove(key);
return None;
}

Expand All @@ -105,14 +106,12 @@ impl PreflightCache {
return;
}

// Check if we need to evict
if self.cache.len() >= self.capacity && !self.cache.contains_key(&key) {
self.evict_lru();
self.evict_one();
}

let now = Instant::now();
self.cache.insert(key.clone(), CacheEntry::new(value));
self.lru_tracker.insert(key, now);
self.eviction_queue.lock().push_back(key);
}

/// Invalidate (remove) specific keys from the cache.
Expand All @@ -125,34 +124,30 @@ impl PreflightCache {

for key in keys {
self.cache.remove(key);
self.lru_tracker.remove(key);
}
}

/// Evict the least recently used entry from the cache.
fn evict_lru(&self) {
if let Some(oldest_key) = self.find_oldest_key() {
self.cache.remove(&oldest_key);
self.lru_tracker.remove(&oldest_key);
/// Evict one key from the cache in amortized O(1) time.
///
/// The queue may contain stale duplicate keys; those are skipped until
/// a currently present key is removed.
fn evict_one(&self) {
let mut queue = self.eviction_queue.lock();
while let Some(key) = queue.pop_front() {
if self.cache.remove(&key).is_some() {
break;
}
}
}

/// Find the key with the oldest insertion time.
fn find_oldest_key(&self) -> Option<Vec<u8>> {
self.lru_tracker
.iter()
.min_by_key(|entry| *entry.value())
.map(|entry| entry.key().clone())
}

/// Clear all entries from the cache.
pub fn clear(&self) {
if !self.enabled {
return;
}

self.cache.clear();
self.lru_tracker.clear();
self.eviction_queue.lock().clear();
}

/// Get cache statistics.
Expand Down
2 changes: 2 additions & 0 deletions crates/azoth-lmdb/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ impl CanonicalStore for LmdbCanonicalStore {
// Initialize file-based event log
let event_log_config = FileEventLogConfig {
base_dir: cfg.path.join("event-log"),
max_event_size: cfg.event_max_size_bytes,
max_batch_bytes: cfg.event_batch_max_bytes,
..Default::default()
};
let event_log = Arc::new(FileEventLog::open(event_log_config)?);
Expand Down
1 change: 1 addition & 0 deletions crates/azoth-scheduler/examples/basic_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ async fn main() -> Result<()> {

// Setup database and projection
let db = Arc::new(AzothDb::open("./data/scheduler-example")?);
#[allow(clippy::arc_with_non_send_sync)]
let conn = Arc::new(Connection::open("./data/scheduler-example/projection.db")?);

// Create scheduler
Expand Down
1 change: 1 addition & 0 deletions crates/azoth-scheduler/examples/checkpoint_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ async fn main() -> anyhow::Result<()> {
let shutdown_config = config.clone();

// Create projection database for scheduler
#[allow(clippy::arc_with_non_send_sync)]
let projection_conn = Arc::new(Connection::open("./scheduler.db")?);

// Create scheduler and register handler based on storage backend
Expand Down
1 change: 1 addition & 0 deletions crates/azoth-scheduler/examples/cron_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ async fn main() -> Result<()> {

// Setup database and projection
let db = Arc::new(AzothDb::open("./data/cron-example")?);
#[allow(clippy::arc_with_non_send_sync)]
let conn = Arc::new(Connection::open("./data/cron-example/projection.db")?);

// Create scheduler
Expand Down
1 change: 1 addition & 0 deletions crates/azoth-scheduler/tests/simple_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ fn setup() -> (Arc<AzothDb>, Arc<Connection>, TempDir, TempDir) {
let db_dir = tempfile::tempdir().unwrap();

let db = Arc::new(AzothDb::open(data_dir.path()).unwrap());
#[allow(clippy::arc_with_non_send_sync)]
let conn = Arc::new(Connection::open(db_dir.path().join("projection.db")).unwrap());

(db, conn, data_dir, db_dir)
Expand Down
24 changes: 23 additions & 1 deletion crates/azoth-sqlite/src/read_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use azoth_core::{
use rusqlite::{Connection, OpenFlags};
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use std::time::Duration;
use std::time::{Duration, Instant};
use tokio::sync::{Semaphore, SemaphorePermit};

/// A pooled read-only connection for SQLite
Expand Down Expand Up @@ -176,6 +176,28 @@ impl SqliteReadPool {
pub fn pool_size(&self) -> usize {
self.connections.len()
}

/// Acquire a pooled read-only connection (blocking)
///
/// Blocks up to `acquire_timeout` waiting for an available connection.
pub fn acquire_blocking(&self) -> Result<PooledSqliteConnection<'_>> {
let deadline = Instant::now() + self.acquire_timeout;

loop {
if let Ok(Some(conn)) = self.try_acquire() {
return Ok(conn);
}

if Instant::now() >= deadline {
return Err(AzothError::Timeout(format!(
"Read pool acquire timeout after {:?}",
self.acquire_timeout
)));
}

std::thread::sleep(Duration::from_millis(1));
}
}
}

#[cfg(test)]
Expand Down
Loading