From 349db49726d881109e6aa958cb03bdaecf422ff7 Mon Sep 17 00:00:00 2001 From: johnny-emp Date: Fri, 6 Feb 2026 16:08:27 -0500 Subject: [PATCH 1/2] Improve performance paths, harden event log/migrations, and fix clippy --- crates/azoth-core/src/config/canonical.rs | 30 +++++++ crates/azoth-file-log/src/store.rs | 72 +++++++++++++++- crates/azoth-lmdb/Cargo.toml | 1 + crates/azoth-lmdb/src/preflight_cache.rs | 43 +++++----- crates/azoth-lmdb/src/store.rs | 2 + .../examples/basic_scheduler.rs | 1 + .../examples/checkpoint_service.rs | 1 + crates/azoth-scheduler/examples/cron_tasks.rs | 1 + crates/azoth-scheduler/tests/simple_test.rs | 1 + crates/azoth-sqlite/src/read_pool.rs | 24 +++++- crates/azoth-sqlite/src/store.rs | 15 ++++ crates/azoth-sqlite/src/txn.rs | 2 +- crates/azoth-vector/tests/integration_test.rs | 13 +-- crates/azoth/Cargo.toml | 3 +- crates/azoth/benches/basic_benchmark.rs | 2 +- crates/azoth/examples/dlq_replay.rs | 3 + .../azoth/examples/event_processor_builder.rs | 5 +- crates/azoth/src/event_handler.rs | 82 ++++++++++--------- crates/azoth/src/migration.rs | 64 ++++++++++++--- crates/azoth/src/onchain_registry.rs | 49 ++++++----- crates/azoth/src/typed_values.rs | 7 +- crates/azoth/tests/concurrency_safety_test.rs | 3 +- crates/azoth/tests/preflight_cache_test.rs | 4 +- crates/azoth/tests/stress_test.rs | 6 +- 24 files changed, 315 insertions(+), 119 deletions(-) diff --git a/crates/azoth-core/src/config/canonical.rs b/crates/azoth-core/src/config/canonical.rs index 73089ac..f85a9f1 100644 --- a/crates/azoth-core/src/config/canonical.rs +++ b/crates/azoth-core/src/config/canonical.rs @@ -133,6 +133,14 @@ pub struct CanonicalConfig { /// `LockTimeout` error instead of blocking indefinitely. #[serde(default = "default_lock_timeout")] pub lock_timeout_ms: u64, + + /// Maximum size of a single event payload in bytes (default: 4MB) + #[serde(default = "default_event_max_size")] + pub event_max_size_bytes: usize, + + /// Maximum total size for a single event batch append in bytes (default: 64MB) + #[serde(default = "default_event_batch_max_bytes")] + pub event_batch_max_bytes: usize, } #[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)] @@ -178,6 +186,14 @@ fn default_lock_timeout() -> u64 { 5000 } +fn default_event_max_size() -> usize { + 4 * 1024 * 1024 +} + +fn default_event_batch_max_bytes() -> usize { + 64 * 1024 * 1024 +} + impl CanonicalConfig { pub fn new(path: PathBuf) -> Self { Self { @@ -193,6 +209,8 @@ impl CanonicalConfig { preflight_cache_ttl_secs: default_preflight_cache_ttl(), read_pool: ReadPoolConfig::default(), lock_timeout_ms: default_lock_timeout(), + event_max_size_bytes: default_event_max_size(), + event_batch_max_bytes: default_event_batch_max_bytes(), } } @@ -246,4 +264,16 @@ impl CanonicalConfig { self.lock_timeout_ms = timeout_ms; self } + + /// Set maximum event payload size in bytes. + pub fn with_event_max_size(mut self, size_bytes: usize) -> Self { + self.event_max_size_bytes = size_bytes; + self + } + + /// Set maximum event batch size in bytes. + pub fn with_event_batch_max_bytes(mut self, size_bytes: usize) -> Self { + self.event_batch_max_bytes = size_bytes; + self + } } diff --git a/crates/azoth-file-log/src/store.rs b/crates/azoth-file-log/src/store.rs index c6c5fb1..f098601 100644 --- a/crates/azoth-file-log/src/store.rs +++ b/crates/azoth-file-log/src/store.rs @@ -26,15 +26,23 @@ pub struct FileEventLogConfig { /// Maximum size for batch write buffer (bytes) pub batch_buffer_size: usize, + + /// Maximum size of a single event payload (bytes) + pub max_event_size: usize, + + /// Maximum total size for a single append batch (bytes) + pub max_batch_bytes: usize, } impl Default for FileEventLogConfig { fn default() -> Self { Self { base_dir: PathBuf::from("./data/event-log"), - max_file_size: 512 * 1024 * 1024, // 512MB - write_buffer_size: 256 * 1024, // 256KB (increased from 64KB) - batch_buffer_size: 1024 * 1024, // 1MB for batch writes + max_file_size: 512 * 1024 * 1024, // 512MB + write_buffer_size: 256 * 1024, // 256KB (increased from 64KB) + batch_buffer_size: 1024 * 1024, // 1MB for batch writes + max_event_size: 4 * 1024 * 1024, // 4MB single-event limit + max_batch_bytes: 64 * 1024 * 1024, // 64MB batch limit } } } @@ -123,6 +131,21 @@ impl FileEventLog { /// /// Format: [event_id: u64][size: u32][data: bytes] fn write_event_entry(&self, event_id: EventId, event_bytes: &[u8]) -> Result<()> { + if event_bytes.len() > self.config.max_event_size { + return Err(AzothError::InvalidState(format!( + "Event size {} exceeds max_event_size {}", + event_bytes.len(), + self.config.max_event_size + ))); + } + + if event_bytes.len() > u32::MAX as usize { + return Err(AzothError::InvalidState(format!( + "Event size {} exceeds u32 encoding limit", + event_bytes.len() + ))); + } + let mut writer = self.writer.lock().unwrap(); // Write event_id (8 bytes, big-endian) @@ -231,6 +254,29 @@ impl EventLog for FileEventLog { .map(|e| 8 + 4 + e.len()) // event_id + size + data .sum(); + if total_size > self.config.max_batch_bytes { + return Err(AzothError::InvalidState(format!( + "Batch size {} exceeds max_batch_bytes {}", + total_size, self.config.max_batch_bytes + ))); + } + + for event in events { + if event.len() > self.config.max_event_size { + return Err(AzothError::InvalidState(format!( + "Event size {} exceeds max_event_size {}", + event.len(), + self.config.max_event_size + ))); + } + if event.len() > u32::MAX as usize { + return Err(AzothError::InvalidState(format!( + "Event size {} exceeds u32 encoding limit", + event.len() + ))); + } + } + // Check if batch exceeds reasonable size if total_size > self.config.batch_buffer_size { // For very large batches, fall back to individual writes @@ -309,6 +355,7 @@ impl EventLog for FileEventLog { start, end_id, meta.current_file_num, + self.config.max_event_size, )?)) } @@ -411,10 +458,17 @@ struct FileEventLogIter { current_file: Option, next_event_id: EventId, end_event_id: EventId, + max_event_size: usize, } impl FileEventLogIter { - fn new(base_dir: PathBuf, start: EventId, end: EventId, max_file_num: u64) -> Result { + fn new( + base_dir: PathBuf, + start: EventId, + end: EventId, + max_file_num: u64, + max_event_size: usize, + ) -> Result { let mut iter = Self { base_dir, current_file_num: 0, @@ -422,6 +476,7 @@ impl FileEventLogIter { current_file: None, next_event_id: start, end_event_id: end, + max_event_size, }; // Open first file @@ -475,6 +530,13 @@ impl FileEventLogIter { let event_id = u64::from_be_bytes(header[0..8].try_into().unwrap()); let size = u32::from_be_bytes(header[8..12].try_into().unwrap()); + if size as usize > self.max_event_size { + return Err(AzothError::InvalidState(format!( + "Event {} size {} exceeds max_event_size {}", + event_id, size, self.max_event_size + ))); + } + // Read event data let mut data = vec![0u8; size as usize]; file.read_exact(&mut data)?; @@ -516,6 +578,8 @@ mod tests { max_file_size: 1024, // Small for testing rotation write_buffer_size: 128, batch_buffer_size: 4096, + max_event_size: 1024 * 1024, + max_batch_bytes: 16 * 1024 * 1024, }; let log = FileEventLog::open(config).unwrap(); (log, temp_dir) diff --git a/crates/azoth-lmdb/Cargo.toml b/crates/azoth-lmdb/Cargo.toml index 3cfc8dd..ec8613a 100644 --- a/crates/azoth-lmdb/Cargo.toml +++ b/crates/azoth-lmdb/Cargo.toml @@ -23,6 +23,7 @@ tokio = { workspace = true, features = ["time", "sync"] } serde_json = { workspace = true } chrono = { workspace = true } dashmap = { workspace = true } +parking_lot = { workspace = true } [dev-dependencies] tempfile = { workspace = true } diff --git a/crates/azoth-lmdb/src/preflight_cache.rs b/crates/azoth-lmdb/src/preflight_cache.rs index f6004cc..47d9ebb 100644 --- a/crates/azoth-lmdb/src/preflight_cache.rs +++ b/crates/azoth-lmdb/src/preflight_cache.rs @@ -7,6 +7,8 @@ //! - Invalidation on transaction commit for modified keys use dashmap::DashMap; +use parking_lot::Mutex; +use std::collections::VecDeque; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -42,14 +44,14 @@ impl CacheEntry { /// Thread-safe in-memory cache for preflight reads. /// /// Uses DashMap for lock-free concurrent access and tracks insertion order -/// for LRU eviction. +/// with an O(1) bounded eviction queue. pub struct PreflightCache { cache: Arc, CacheEntry>>, capacity: usize, ttl: Duration, enabled: bool, - /// Track insertion order for LRU eviction (key, inserted_at) - lru_tracker: Arc, Instant>>, + /// FIFO queue used to bound cache size without O(n) scans. + eviction_queue: Arc>>>, } impl PreflightCache { @@ -65,7 +67,7 @@ impl PreflightCache { capacity, ttl: Duration::from_secs(ttl_secs), enabled, - lru_tracker: Arc::new(DashMap::with_capacity(capacity)), + eviction_queue: Arc::new(Mutex::new(VecDeque::with_capacity(capacity))), } } @@ -89,7 +91,6 @@ impl PreflightCache { if entry.is_expired(self.ttl) { drop(entry); // Release read lock self.cache.remove(key); - self.lru_tracker.remove(key); return None; } @@ -105,14 +106,12 @@ impl PreflightCache { return; } - // Check if we need to evict if self.cache.len() >= self.capacity && !self.cache.contains_key(&key) { - self.evict_lru(); + self.evict_one(); } - let now = Instant::now(); self.cache.insert(key.clone(), CacheEntry::new(value)); - self.lru_tracker.insert(key, now); + self.eviction_queue.lock().push_back(key); } /// Invalidate (remove) specific keys from the cache. @@ -125,26 +124,22 @@ impl PreflightCache { for key in keys { self.cache.remove(key); - self.lru_tracker.remove(key); } } - /// Evict the least recently used entry from the cache. - fn evict_lru(&self) { - if let Some(oldest_key) = self.find_oldest_key() { - self.cache.remove(&oldest_key); - self.lru_tracker.remove(&oldest_key); + /// Evict one key from the cache in amortized O(1) time. + /// + /// The queue may contain stale duplicate keys; those are skipped until + /// a currently present key is removed. + fn evict_one(&self) { + let mut queue = self.eviction_queue.lock(); + while let Some(key) = queue.pop_front() { + if self.cache.remove(&key).is_some() { + break; + } } } - /// Find the key with the oldest insertion time. - fn find_oldest_key(&self) -> Option> { - self.lru_tracker - .iter() - .min_by_key(|entry| *entry.value()) - .map(|entry| entry.key().clone()) - } - /// Clear all entries from the cache. pub fn clear(&self) { if !self.enabled { @@ -152,7 +147,7 @@ impl PreflightCache { } self.cache.clear(); - self.lru_tracker.clear(); + self.eviction_queue.lock().clear(); } /// Get cache statistics. diff --git a/crates/azoth-lmdb/src/store.rs b/crates/azoth-lmdb/src/store.rs index af01d9b..ea2d2db 100644 --- a/crates/azoth-lmdb/src/store.rs +++ b/crates/azoth-lmdb/src/store.rs @@ -162,6 +162,8 @@ impl CanonicalStore for LmdbCanonicalStore { // Initialize file-based event log let event_log_config = FileEventLogConfig { base_dir: cfg.path.join("event-log"), + max_event_size: cfg.event_max_size_bytes, + max_batch_bytes: cfg.event_batch_max_bytes, ..Default::default() }; let event_log = Arc::new(FileEventLog::open(event_log_config)?); diff --git a/crates/azoth-scheduler/examples/basic_scheduler.rs b/crates/azoth-scheduler/examples/basic_scheduler.rs index 4d03f35..f3b72d9 100644 --- a/crates/azoth-scheduler/examples/basic_scheduler.rs +++ b/crates/azoth-scheduler/examples/basic_scheduler.rs @@ -74,6 +74,7 @@ async fn main() -> Result<()> { // Setup database and projection let db = Arc::new(AzothDb::open("./data/scheduler-example")?); + #[allow(clippy::arc_with_non_send_sync)] let conn = Arc::new(Connection::open("./data/scheduler-example/projection.db")?); // Create scheduler diff --git a/crates/azoth-scheduler/examples/checkpoint_service.rs b/crates/azoth-scheduler/examples/checkpoint_service.rs index 2095bec..3813f82 100644 --- a/crates/azoth-scheduler/examples/checkpoint_service.rs +++ b/crates/azoth-scheduler/examples/checkpoint_service.rs @@ -128,6 +128,7 @@ async fn main() -> anyhow::Result<()> { let shutdown_config = config.clone(); // Create projection database for scheduler + #[allow(clippy::arc_with_non_send_sync)] let projection_conn = Arc::new(Connection::open("./scheduler.db")?); // Create scheduler and register handler based on storage backend diff --git a/crates/azoth-scheduler/examples/cron_tasks.rs b/crates/azoth-scheduler/examples/cron_tasks.rs index bb5df2a..430960a 100644 --- a/crates/azoth-scheduler/examples/cron_tasks.rs +++ b/crates/azoth-scheduler/examples/cron_tasks.rs @@ -101,6 +101,7 @@ async fn main() -> Result<()> { // Setup database and projection let db = Arc::new(AzothDb::open("./data/cron-example")?); + #[allow(clippy::arc_with_non_send_sync)] let conn = Arc::new(Connection::open("./data/cron-example/projection.db")?); // Create scheduler diff --git a/crates/azoth-scheduler/tests/simple_test.rs b/crates/azoth-scheduler/tests/simple_test.rs index 083ce33..e1be50e 100644 --- a/crates/azoth-scheduler/tests/simple_test.rs +++ b/crates/azoth-scheduler/tests/simple_test.rs @@ -29,6 +29,7 @@ fn setup() -> (Arc, Arc, TempDir, TempDir) { let db_dir = tempfile::tempdir().unwrap(); let db = Arc::new(AzothDb::open(data_dir.path()).unwrap()); + #[allow(clippy::arc_with_non_send_sync)] let conn = Arc::new(Connection::open(db_dir.path().join("projection.db")).unwrap()); (db, conn, data_dir, db_dir) diff --git a/crates/azoth-sqlite/src/read_pool.rs b/crates/azoth-sqlite/src/read_pool.rs index ef49150..0a8d6af 100644 --- a/crates/azoth-sqlite/src/read_pool.rs +++ b/crates/azoth-sqlite/src/read_pool.rs @@ -10,7 +10,7 @@ use azoth_core::{ use rusqlite::{Connection, OpenFlags}; use std::path::{Path, PathBuf}; use std::sync::Mutex; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio::sync::{Semaphore, SemaphorePermit}; /// A pooled read-only connection for SQLite @@ -176,6 +176,28 @@ impl SqliteReadPool { pub fn pool_size(&self) -> usize { self.connections.len() } + + /// Acquire a pooled read-only connection (blocking) + /// + /// Blocks up to `acquire_timeout` waiting for an available connection. + pub fn acquire_blocking(&self) -> Result> { + let deadline = Instant::now() + self.acquire_timeout; + + loop { + if let Ok(Some(conn)) = self.try_acquire() { + return Ok(conn); + } + + if Instant::now() >= deadline { + return Err(AzothError::Timeout(format!( + "Read pool acquire timeout after {:?}", + self.acquire_timeout + ))); + } + + std::thread::sleep(Duration::from_millis(1)); + } + } } #[cfg(test)] diff --git a/crates/azoth-sqlite/src/store.rs b/crates/azoth-sqlite/src/store.rs index 9945d55..f8bc913 100644 --- a/crates/azoth-sqlite/src/store.rs +++ b/crates/azoth-sqlite/src/store.rs @@ -126,6 +126,16 @@ impl SqliteProjectionStore { F: FnOnce(&Connection) -> Result + Send + 'static, R: Send + 'static, { + if let Some(pool) = &self.read_pool { + let pool = Arc::clone(pool); + return tokio::task::spawn_blocking(move || { + let conn = pool.acquire_blocking()?; + f(conn.connection()) + }) + .await + .map_err(|e| AzothError::Projection(format!("Query task failed: {}", e)))?; + } + let conn = self.read_conn.clone(); tokio::task::spawn_blocking(move || { let conn_guard = conn.lock().unwrap(); @@ -151,6 +161,11 @@ impl SqliteProjectionStore { where F: FnOnce(&Connection) -> Result, { + if let Some(pool) = &self.read_pool { + let conn = pool.acquire_blocking()?; + return f(conn.connection()); + } + let conn_guard = self.read_conn.lock().unwrap(); f(&conn_guard) } diff --git a/crates/azoth-sqlite/src/txn.rs b/crates/azoth-sqlite/src/txn.rs index 3201268..eaa0f4c 100644 --- a/crates/azoth-sqlite/src/txn.rs +++ b/crates/azoth-sqlite/src/txn.rs @@ -14,7 +14,7 @@ pub struct SimpleProjectionTxn<'a> { impl<'a> SimpleProjectionTxn<'a> { pub fn new(conn: MutexGuard<'a, Connection>) -> Result { - conn.execute("BEGIN EXCLUSIVE TRANSACTION", []) + conn.execute("BEGIN IMMEDIATE TRANSACTION", []) .map_err(|e| AzothError::Projection(e.to_string()))?; Ok(Self { conn, in_txn: true }) diff --git a/crates/azoth-vector/tests/integration_test.rs b/crates/azoth-vector/tests/integration_test.rs index dd3996d..06fc1a3 100644 --- a/crates/azoth-vector/tests/integration_test.rs +++ b/crates/azoth-vector/tests/integration_test.rs @@ -6,13 +6,8 @@ //! //! Download manually: -use azoth_core::error::AzothError; -use azoth_core::{ProjectionConfig, ProjectionStore}; -use azoth_sqlite::SqliteProjectionStore; -use azoth_vector::{DistanceMetric, Vector, VectorConfig, VectorExtension, VectorSearch}; +#[cfg(feature = "has-vector-extension")] use std::path::PathBuf; -use std::sync::Arc; -use tempfile::tempdir; /// Resolve path to the sqlite-vector extension for loading. /// 1. Use SQLITE_VECTOR_EXTENSION_PATH env if set. @@ -138,6 +133,12 @@ fn find_vector_binary_in_dir(dir: &std::path::Path) -> Option { #[cfg(feature = "has-vector-extension")] mod with_extension { use super::*; + use azoth_core::error::AzothError; + use azoth_core::{ProjectionConfig, ProjectionStore}; + use azoth_sqlite::SqliteProjectionStore; + use azoth_vector::{DistanceMetric, Vector, VectorConfig, VectorExtension, VectorSearch}; + use std::sync::Arc; + use tempfile::tempdir; fn setup_db() -> Arc { let dir = tempdir().unwrap(); diff --git a/crates/azoth/Cargo.toml b/crates/azoth/Cargo.toml index 7207549..ebd5f61 100644 --- a/crates/azoth/Cargo.toml +++ b/crates/azoth/Cargo.toml @@ -47,6 +47,7 @@ sha2 = { workspace = true } # Ethereum/blockchain support (optional, for onchain registry) alloy-primitives = { version = "0.8", optional = true } alloy-provider = { version = "0.6", optional = true, features = ["reqwest"] } +alloy-network = { version = "0.6", optional = true } alloy-rpc-types = { version = "0.6", optional = true } alloy-signer = { version = "0.6", optional = true } alloy-signer-local = { version = "0.6", optional = true } @@ -55,7 +56,7 @@ alloy-transport-http = { version = "0.6", optional = true } [features] default = [] -onchain = ["alloy-primitives", "alloy-provider", "alloy-rpc-types", "alloy-signer", "alloy-signer-local", "alloy-sol-types", "alloy-transport-http"] +onchain = ["alloy-primitives", "alloy-provider", "alloy-network", "alloy-rpc-types", "alloy-signer", "alloy-signer-local", "alloy-sol-types", "alloy-transport-http"] [dev-dependencies] tempfile = { workspace = true } diff --git a/crates/azoth/benches/basic_benchmark.rs b/crates/azoth/benches/basic_benchmark.rs index 94d448e..f008ef0 100644 --- a/crates/azoth/benches/basic_benchmark.rs +++ b/crates/azoth/benches/basic_benchmark.rs @@ -115,7 +115,7 @@ fn main() { let start = Instant::now(); let mut iter = db.canonical().iter_events(0, None).unwrap(); let mut count = 0; - while let Some(_) = iter.next().unwrap() { + while iter.next().unwrap().is_some() { count += 1; } let duration = start.elapsed(); diff --git a/crates/azoth/examples/dlq_replay.rs b/crates/azoth/examples/dlq_replay.rs index 589b671..64e596d 100644 --- a/crates/azoth/examples/dlq_replay.rs +++ b/crates/azoth/examples/dlq_replay.rs @@ -70,12 +70,14 @@ async fn main() -> Result<()> { // Open database let db = Arc::new(AzothDb::open("./tmp/dlq_example")?); + #[allow(clippy::arc_with_non_send_sync)] let conn = Arc::new( rusqlite::Connection::open("./tmp/dlq_example/projection.db") .map_err(|e| AzothError::Projection(e.to_string()))?, ); // Create DLQ + #[allow(clippy::arc_with_non_send_sync)] let dlq = Arc::new(DeadLetterQueue::new(conn.clone())?); // Register handler that will fail 2 times before succeeding @@ -139,6 +141,7 @@ async fn main() -> Result<()> { stop_on_consecutive_failures: Some(5), }; + #[allow(clippy::arc_with_non_send_sync)] let replayer = Arc::new(DlqReplayer::new(dlq.clone(), registry.clone(), config)); tracing::info!("Starting DLQ replayer..."); diff --git a/crates/azoth/examples/event_processor_builder.rs b/crates/azoth/examples/event_processor_builder.rs index a9d76be..b29ef15 100644 --- a/crates/azoth/examples/event_processor_builder.rs +++ b/crates/azoth/examples/event_processor_builder.rs @@ -100,6 +100,7 @@ async fn main() -> Result<()> { // Setup SQL table let db_path = temp_dir.path().join("accounts.db"); + #[allow(clippy::arc_with_non_send_sync)] let conn = Arc::new(Connection::open(&db_path).map_err(|e| AzothError::Projection(e.to_string()))?); @@ -140,7 +141,7 @@ async fn main() -> Result<()> { if i % 2 == 0 { // Deposit - Transaction::new(&*db).execute(|ctx| { + Transaction::new(&db).execute(|ctx| { ctx.log( "deposit", &DepositPayload { @@ -153,7 +154,7 @@ async fn main() -> Result<()> { println!(" 📝 Wrote deposit event for account {}", account_id); } else { // Withdraw - Transaction::new(&*db).execute(|ctx| { + Transaction::new(&db).execute(|ctx| { ctx.log( "withdraw", &WithdrawPayload { diff --git a/crates/azoth/src/event_handler.rs b/crates/azoth/src/event_handler.rs index 0ad2c10..4b08e91 100644 --- a/crates/azoth/src/event_handler.rs +++ b/crates/azoth/src/event_handler.rs @@ -231,64 +231,66 @@ impl EventHandlerRegistry { where I: IntoIterator, { - // Group events by type - let mut event_groups: HashMap> = HashMap::new(); + // Preserve global event order by batching only contiguous runs + // of the same event type. + let mut current_type: Option = None; + let mut current_batch: Vec = Vec::new(); + let mut current_handler: Option<&dyn EventHandler> = None; let mut last_flush = Instant::now(); let mut total_processed = 0; + let flush_batch = |handler: &dyn EventHandler, + batch: &mut Vec, + total: &mut usize| + -> Result<()> { + if batch.is_empty() { + return Ok(()); + } + handler.handle_batch(conn, batch)?; + *total += batch.len(); + batch.clear(); + Ok(()) + }; + for (event_id, event_bytes) in events { - // Parse event type let event_str = std::str::from_utf8(event_bytes) .map_err(|e| AzothError::EventDecode(format!("Invalid UTF-8: {}", e)))?; - let (event_type, payload) = event_str.split_once(':').ok_or_else(|| { AzothError::EventDecode("Event must be in format 'type:payload'".into()) })?; - // Add to appropriate group - let batch = event_groups.entry(event_type.to_string()).or_default(); + let should_rotate = current_type.as_deref() != Some(event_type) + || current_batch.len() >= self.batch_config.max_batch_size + || last_flush.elapsed() >= self.batch_config.max_batch_duration; + + if should_rotate { + if let Some(handler) = current_handler { + flush_batch(handler, &mut current_batch, &mut total_processed)?; + } + + current_handler = Some(self.get(event_type).ok_or_else(|| { + AzothError::EventDecode(format!("No handler for event type '{}'", event_type)) + })?); + current_type = Some(event_type.to_string()); + last_flush = Instant::now(); + } - batch.push(BatchEvent { + current_batch.push(BatchEvent { event_id, payload: payload.as_bytes().to_vec(), }); - - // Check if we should flush any batches - let should_flush_size = batch.len() >= self.batch_config.max_batch_size; - let should_flush_time = last_flush.elapsed() >= self.batch_config.max_batch_duration; - - if should_flush_size || should_flush_time { - // Flush all batches that have reached their limits - for (event_type, events) in event_groups.iter_mut() { - if !events.is_empty() - && (events.len() >= self.batch_config.max_batch_size || should_flush_time) - { - let handler = self.get(event_type).ok_or_else(|| { - AzothError::EventDecode(format!( - "No handler for event type '{}'", - event_type - )) - })?; - - handler.handle_batch(conn, events)?; - total_processed += events.len(); - events.clear(); - } + if current_batch.len() >= self.batch_config.max_batch_size + || last_flush.elapsed() >= self.batch_config.max_batch_duration + { + if let Some(handler) = current_handler { + flush_batch(handler, &mut current_batch, &mut total_processed)?; + last_flush = Instant::now(); } - last_flush = Instant::now(); } } - // Flush any remaining events - for (event_type, events) in event_groups.iter() { - if !events.is_empty() { - let handler = self.get(event_type).ok_or_else(|| { - AzothError::EventDecode(format!("No handler for event type '{}'", event_type)) - })?; - - handler.handle_batch(conn, events)?; - total_processed += events.len(); - } + if let Some(handler) = current_handler { + flush_batch(handler, &mut current_batch, &mut total_processed)?; } Ok(total_processed) diff --git a/crates/azoth/src/migration.rs b/crates/azoth/src/migration.rs index 80cc85a..d56f933 100644 --- a/crates/azoth/src/migration.rs +++ b/crates/azoth/src/migration.rs @@ -229,22 +229,40 @@ impl MigrationManager { migration.name() ); - // Execute the migration SQL - { - let conn = projection.conn().lock().unwrap(); + // Run migration, history write, and schema-version bump atomically. + let conn = projection.conn().lock().unwrap(); + conn.execute_batch("BEGIN IMMEDIATE TRANSACTION") + .map_err(|e| AzothError::Projection(e.to_string()))?; + + let apply_result: Result<()> = (|| { migration.up(&conn)?; - // Record in migration history conn.execute( "INSERT OR REPLACE INTO migration_history (version, name, applied_at) VALUES (?1, ?2, datetime('now'))", rusqlite::params![migration.version(), migration.name()], ) .map_err(|e| AzothError::Projection(e.to_string()))?; + + conn.execute( + "UPDATE projection_meta + SET schema_version = ?1, updated_at = datetime('now') + WHERE id = 0", + [migration.version() as i64], + ) + .map_err(|e| AzothError::Projection(e.to_string()))?; + + migration.verify(&conn)?; + Ok(()) + })(); + + if let Err(e) = apply_result { + let _ = conn.execute_batch("ROLLBACK"); + return Err(e); } - // Update schema version - projection.migrate(migration.version())?; + conn.execute_batch("COMMIT") + .map_err(|e| AzothError::Projection(e.to_string()))?; tracing::info!("Migration v{} complete", migration.version()); Ok(()) @@ -280,14 +298,38 @@ impl MigrationManager { migration.name() ); - // Execute the down() migration - { - let conn = projection.conn().lock().unwrap(); + // Execute rollback and metadata updates atomically. + let conn = projection.conn().lock().unwrap(); + conn.execute_batch("BEGIN IMMEDIATE TRANSACTION") + .map_err(|e| AzothError::Projection(e.to_string()))?; + + let rollback_result: Result<()> = (|| { migration.down(&conn)?; + + conn.execute( + "DELETE FROM migration_history WHERE version = ?1", + [current_version as i64], + ) + .map_err(|e| AzothError::Projection(e.to_string()))?; + + conn.execute( + "UPDATE projection_meta + SET schema_version = ?1, updated_at = datetime('now') + WHERE id = 0", + [(current_version - 1) as i64], + ) + .map_err(|e| AzothError::Projection(e.to_string()))?; + + Ok(()) + })(); + + if let Err(e) = rollback_result { + let _ = conn.execute_batch("ROLLBACK"); + return Err(e); } - // Update schema version - projection.migrate(current_version - 1)?; + conn.execute_batch("COMMIT") + .map_err(|e| AzothError::Projection(e.to_string()))?; Ok(()) } diff --git a/crates/azoth/src/onchain_registry.rs b/crates/azoth/src/onchain_registry.rs index 85e4f4e..4121aa9 100644 --- a/crates/azoth/src/onchain_registry.rs +++ b/crates/azoth/src/onchain_registry.rs @@ -46,14 +46,13 @@ use crate::checkpoint::{CheckpointMetadata, CheckpointStorage}; use crate::{AzothError, Result}; +use alloy_network::EthereumWallet; use alloy_primitives::{Address, FixedBytes}; -use alloy_provider::{Provider, ProviderBuilder, RootProvider}; +use alloy_provider::{Provider, ProviderBuilder}; use alloy_rpc_types::TransactionRequest; use alloy_signer_local::PrivateKeySigner; use alloy_sol_types::{sol, SolCall}; -use alloy_transport_http::Http; use async_trait::async_trait; -use reqwest::Client; use std::path::Path; use std::str::FromStr; @@ -142,7 +141,6 @@ pub struct OnChainRegistry { config: OnChainConfig, backup_storage: Box, signer: PrivateKeySigner, - provider: RootProvider>, } impl OnChainRegistry { @@ -153,19 +151,10 @@ impl OnChainRegistry { let signer = PrivateKeySigner::from_str(private_key) .map_err(|e| AzothError::Config(format!("Invalid private key: {}", e)))?; - // Create provider - let provider = ProviderBuilder::new().on_http( - config - .rpc_url - .parse() - .map_err(|e| AzothError::Config(format!("Invalid RPC URL: {}", e)))?, - ); - Ok(Self { config, backup_storage: storage, signer, - provider, }) } @@ -220,8 +209,18 @@ impl CheckpointStorage for OnChainRegistry { tx.chain_id = Some(self.config.chain_id); // Sign and send transaction - let tx_hash = self - .provider + let wallet = EthereumWallet::from(self.signer.clone()); + let url = self + .config + .rpc_url + .parse() + .map_err(|e| AzothError::Config(format!("Invalid RPC URL: {}", e)))?; + let provider = ProviderBuilder::new() + .with_recommended_fillers() + .wallet(wallet) + .on_http(url); + + let tx_hash = provider .send_transaction(tx) .await .map_err(|e| AzothError::Backup(format!("Failed to send transaction: {}", e)))? @@ -265,8 +264,14 @@ impl CheckpointStorage for OnChainRegistry { .to(self.config.contract_address) .input(calldata.into()); - let result = self - .provider + let url = self + .config + .rpc_url + .parse() + .map_err(|e| AzothError::Config(format!("Invalid RPC URL: {}", e)))?; + let provider = ProviderBuilder::new().on_http(url); + + let result = provider .call(&tx) .await .map_err(|e| AzothError::Backup(format!("Failed to query backup IDs: {}", e)))?; @@ -291,8 +296,14 @@ impl CheckpointStorage for OnChainRegistry { .to(self.config.contract_address) .input(calldata.into()); - let result = self - .provider + let url = self + .config + .rpc_url + .parse() + .map_err(|e| AzothError::Config(format!("Invalid RPC URL: {}", e)))?; + let provider = ProviderBuilder::new().on_http(url); + + let result = provider .call(&tx) .await .map_err(|e| AzothError::Backup(format!("Failed to query backup: {}", e)))?; diff --git a/crates/azoth/src/typed_values.rs b/crates/azoth/src/typed_values.rs index cda9704..a112f6b 100644 --- a/crates/azoth/src/typed_values.rs +++ b/crates/azoth/src/typed_values.rs @@ -905,7 +905,12 @@ mod tests { // Should fail when deserializing to wrong type let result: Result = value.to_json(); - assert!(result.is_err()); + if let Ok(post) = result { + panic!( + "Expected type mismatch error, got Post {{ id: {}, title: {} }}", + post.id, post.title + ); + } } #[test] diff --git a/crates/azoth/tests/concurrency_safety_test.rs b/crates/azoth/tests/concurrency_safety_test.rs index 3fd2be9..cb87396 100644 --- a/crates/azoth/tests/concurrency_safety_test.rs +++ b/crates/azoth/tests/concurrency_safety_test.rs @@ -336,8 +336,7 @@ fn test_lmdb_write_serialization() { println!(" Write order: {:?}", order); // Verify no overlap in write transactions (they must be serialized) - for i in 0..ends.len() - 1 { - let (tid1, end1) = ends[i]; + for &(tid1, end1) in ends.iter().take(ends.len().saturating_sub(1)) { // Find the next transaction in execution order if let Some((tid2, start2)) = starts.iter().find(|(tid, _)| { diff --git a/crates/azoth/tests/preflight_cache_test.rs b/crates/azoth/tests/preflight_cache_test.rs index f7fd209..a8562dc 100644 --- a/crates/azoth/tests/preflight_cache_test.rs +++ b/crates/azoth/tests/preflight_cache_test.rs @@ -36,7 +36,7 @@ fn test_preflight_cache_hot_keys() -> Result<()> { // Read the same key multiple times during preflight // First read should be a cache miss, subsequent reads should be cache hits - for i in 0..100 { + for _i in 0..100 { Transaction::new(&db) .keys(vec![b"balance".to_vec()]) .preflight(|ctx| { @@ -121,7 +121,7 @@ fn test_preflight_cache_concurrent() -> Result<()> { let mut handles = vec![]; let barrier = Arc::new(Barrier::new(10)); - for thread_id in 0..10 { + for _thread_id in 0..10 { let db_clone = Arc::clone(&db); let barrier_clone = Arc::clone(&barrier); diff --git a/crates/azoth/tests/stress_test.rs b/crates/azoth/tests/stress_test.rs index 0fb7209..2f7b9ff 100644 --- a/crates/azoth/tests/stress_test.rs +++ b/crates/azoth/tests/stress_test.rs @@ -311,10 +311,8 @@ fn test_lock_fairness() { .execute(|ctx| { // Increment counter let current = match ctx.get_opt(b"counter")? { - Some(val) => match val { - TypedValue::U64(v) => v, - _ => 0, - }, + Some(TypedValue::U64(v)) => v, + Some(_) => 0, None => 0, }; ctx.set(b"counter", &TypedValue::U64(current + 1))?; From 2f49babf055c611c66db4fe44c5d462b04509cc5 Mon Sep 17 00:00:00 2001 From: johnny-emp Date: Fri, 6 Feb 2026 16:09:58 -0500 Subject: [PATCH 2/2] format --- crates/azoth/tests/concurrency_safety_test.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/azoth/tests/concurrency_safety_test.rs b/crates/azoth/tests/concurrency_safety_test.rs index cb87396..c5b09fa 100644 --- a/crates/azoth/tests/concurrency_safety_test.rs +++ b/crates/azoth/tests/concurrency_safety_test.rs @@ -337,7 +337,6 @@ fn test_lmdb_write_serialization() { // Verify no overlap in write transactions (they must be serialized) for &(tid1, end1) in ends.iter().take(ends.len().saturating_sub(1)) { - // Find the next transaction in execution order if let Some((tid2, start2)) = starts.iter().find(|(tid, _)| { let pos1 = order.iter().position(|&t| t == tid1).unwrap();