diff --git a/crates/reservation-core/src/recovery.rs b/crates/reservation-core/src/recovery.rs index 954c852..d4029dd 100644 --- a/crates/reservation-core/src/recovery.rs +++ b/crates/reservation-core/src/recovery.rs @@ -265,16 +265,18 @@ fn validate_replay_order( #[cfg(test)] mod tests { use std::fs; + use std::fs::OpenOptions; + use std::io::Write; use std::path::PathBuf; use std::time::{SystemTime, UNIX_EPOCH}; use crate::{ - command::{ClientRequest, Command}, + command::{ClientRequest, Command, CommandContext}, command_codec::encode_client_request, config::Config, - ids::{ClientId, Lsn, OperationId, PoolId, Slot}, + ids::{ClientId, HoldId, Lsn, OperationId, PoolId, Slot}, snapshot_file::SnapshotFile, - state_machine::{PoolRecord, ReservationDb}, + state_machine::{HoldState, PoolRecord, ReservationDb}, wal::{Frame, RecordType}, wal_file::WalFile, }; @@ -371,4 +373,213 @@ mod tests { let _ = fs::remove_file(snapshot_path); let _ = fs::remove_file(wal_path); } + + #[test] + fn recovery_matches_live_path_when_next_request_expires_overdue_hold() { + let snapshot_path = temp_path("snapshot-expiry-live-match", "snapshot"); + let wal_path = temp_path("wal-expiry-live-match", "wal"); + let snapshot_file = SnapshotFile::new(&snapshot_path, 4096); + let mut wal_file = WalFile::open(&wal_path, 1024).unwrap(); + + let prefix = [ + Frame { + lsn: Lsn(1), + request_slot: Slot(1), + record_type: RecordType::ClientCommand, + payload: encode_client_request(ClientRequest { + operation_id: OperationId(1), + client_id: ClientId(1), + command: Command::CreatePool { + pool_id: PoolId(11), + total_capacity: 5, + }, + }), + }, + Frame { + lsn: Lsn(2), + request_slot: Slot(2), + record_type: RecordType::ClientCommand, + payload: encode_client_request(ClientRequest { + operation_id: OperationId(2), + client_id: ClientId(1), + command: Command::PlaceHold { + pool_id: PoolId(11), + hold_id: HoldId(21), + quantity: 5, + deadline_slot: Slot(5), + }, + }), + }, + ]; + for frame in prefix { + wal_file.append_frame(&frame).unwrap(); + } + wal_file.sync().unwrap(); + + let mut live = ReservationDb::new(config()).unwrap(); + let _ = live.apply_client( + CommandContext { + lsn: Lsn(1), + request_slot: Slot(1), + }, + ClientRequest { + operation_id: OperationId(1), + client_id: ClientId(1), + command: Command::CreatePool { + pool_id: PoolId(11), + total_capacity: 5, + }, + }, + ); + let _ = live.apply_client( + CommandContext { + lsn: Lsn(2), + request_slot: Slot(2), + }, + ClientRequest { + operation_id: OperationId(2), + client_id: ClientId(1), + command: Command::PlaceHold { + pool_id: PoolId(11), + hold_id: HoldId(21), + quantity: 5, + deadline_slot: Slot(5), + }, + }, + ); + + let mut recovered = recover_reservation(config(), &snapshot_file, &mut wal_file).unwrap(); + let request = ClientRequest { + operation_id: OperationId(3), + client_id: ClientId(1), + command: Command::CreatePool { + pool_id: PoolId(12), + total_capacity: 1, + }, + }; + let context = CommandContext { + lsn: Lsn(3), + request_slot: Slot(20), + }; + + let live_outcome = live.apply_client(context, request); + let recovered_outcome = recovered.db.apply_client(context, request); + + assert_eq!(recovered_outcome, live_outcome); + assert_eq!(recovered.db.snapshot(), live.snapshot()); + assert_eq!( + recovered + .db + .snapshot() + .holds + .iter() + .find(|record| record.hold_id == HoldId(21)) + .unwrap() + .state, + HoldState::Expired + ); + + let _ = fs::remove_file(snapshot_path); + let _ = fs::remove_file(wal_path); + } + + #[test] + fn recovery_truncates_torn_expire_frame_without_fabricating_expired_state() { + let snapshot_path = temp_path("snapshot-torn-expire", "snapshot"); + let wal_path = temp_path("wal-torn-expire", "wal"); + let snapshot_file = SnapshotFile::new(&snapshot_path, 4096); + let mut wal_file = WalFile::open(&wal_path, 1024).unwrap(); + + let create_pool = ClientRequest { + operation_id: OperationId(1), + client_id: ClientId(1), + command: Command::CreatePool { + pool_id: PoolId(11), + total_capacity: 5, + }, + }; + let place_hold = ClientRequest { + operation_id: OperationId(2), + client_id: ClientId(1), + command: Command::PlaceHold { + pool_id: PoolId(11), + hold_id: HoldId(21), + quantity: 5, + deadline_slot: Slot(5), + }, + }; + wal_file + .append_frame(&Frame { + lsn: Lsn(1), + request_slot: Slot(1), + record_type: RecordType::ClientCommand, + payload: encode_client_request(create_pool), + }) + .unwrap(); + wal_file + .append_frame(&Frame { + lsn: Lsn(2), + request_slot: Slot(2), + record_type: RecordType::ClientCommand, + payload: encode_client_request(place_hold), + }) + .unwrap(); + wal_file.sync().unwrap(); + + let torn_expire = Frame { + lsn: Lsn(3), + request_slot: Slot(6), + record_type: RecordType::InternalCommand, + payload: vec![3, 21], + } + .encode(); + let mut raw = OpenOptions::new().append(true).open(&wal_path).unwrap(); + raw.write_all(&torn_expire[..torn_expire.len() - 2]) + .unwrap(); + raw.sync_all().unwrap(); + + let mut recovered = recover_reservation(config(), &snapshot_file, &mut wal_file).unwrap(); + assert_eq!( + recovered + .db + .snapshot() + .holds + .iter() + .find(|record| record.hold_id == HoldId(21)) + .unwrap() + .state, + HoldState::Held + ); + + let outcome = recovered.db.apply_client( + CommandContext { + lsn: Lsn(3), + request_slot: Slot(6), + }, + ClientRequest { + operation_id: OperationId(3), + client_id: ClientId(1), + command: Command::CreatePool { + pool_id: PoolId(12), + total_capacity: 1, + }, + }, + ); + + assert_eq!(outcome.result_code, crate::result::ResultCode::Ok); + assert_eq!( + recovered + .db + .snapshot() + .holds + .iter() + .find(|record| record.hold_id == HoldId(21)) + .unwrap() + .state, + HoldState::Expired + ); + + let _ = fs::remove_file(snapshot_path); + let _ = fs::remove_file(wal_path); + } } diff --git a/crates/reservation-core/src/snapshot.rs b/crates/reservation-core/src/snapshot.rs index 48ef64a..0292dad 100644 --- a/crates/reservation-core/src/snapshot.rs +++ b/crates/reservation-core/src/snapshot.rs @@ -226,11 +226,13 @@ impl ReservationDb { let mut hold_retire_entries = Vec::new(); for (ordinal, record) in holds.into_iter().enumerate() { - hold_retire_entries.push(( - record.hold_id, - record.deadline_slot, - u64::try_from(ordinal).expect("hold ordinal must fit u64"), - )); + if record.state == HoldState::Held { + hold_retire_entries.push(( + record.hold_id, + record.deadline_slot, + u64::try_from(ordinal).expect("hold ordinal must fit u64"), + )); + } match db.restore_hold(record) { Ok(()) => {} Err(FixedMapError::DuplicateKey) => { diff --git a/crates/reservation-core/src/state_machine.rs b/crates/reservation-core/src/state_machine.rs index 4808d1c..3220396 100644 --- a/crates/reservation-core/src/state_machine.rs +++ b/crates/reservation-core/src/state_machine.rs @@ -346,6 +346,7 @@ impl ReservationDb { deadline_slot, state: HoldState::Held, }); + self.push_hold_expiry(hold_id, deadline_slot); CommandOutcome::with_pool_and_hold(ResultCode::Ok, pool_id, hold_id) } @@ -617,6 +618,18 @@ impl ReservationDb { } } + fn push_hold_expiry(&mut self, hold_id: HoldId, deadline_slot: Slot) { + match self.hold_retire_queue.push(RetireEntry { + key: hold_id, + retire_after_slot: deadline_slot, + }) { + Ok(()) => {} + Err(RetireQueueError::Full) => { + panic!("hold retire queue must stay within hold capacity") + } + } + } + fn begin_apply(&mut self, context: CommandContext) { if let Some(previous_lsn) = self.last_applied_lsn { assert!( @@ -636,6 +649,36 @@ impl ReservationDb { } fn retire_state(&mut self, request_slot: Slot) { + while let Some(entry) = self.hold_retire_queue.front() { + if entry.retire_after_slot.get() >= request_slot.get() { + break; + } + + let hold_id = entry.key; + let popped = self.hold_retire_queue.pop_front(); + assert!(popped.is_some(), "peeked hold retirement entry must pop"); + + let Some(mut hold) = self.holds.get(hold_id).copied() else { + warn!( + "hold retirement queue referenced missing hold_id={}", + hold_id.get() + ); + continue; + }; + + if hold.state != HoldState::Held { + continue; + } + + let pool = self + .pools + .get_mut(hold.pool_id) + .expect("hold pool must remain present during automatic expiry"); + pool.held_capacity -= hold.quantity; + hold.state = HoldState::Expired; + self.replace_hold(hold); + } + while let Some(entry) = self.operation_retire_queue.front() { if entry.retire_after_slot.get() > request_slot.get() { break; @@ -1115,6 +1158,104 @@ mod tests { assert_eq!(db.snapshot().holds[0].state, HoldState::Expired); } + #[test] + fn overdue_hold_auto_expires_before_later_request() { + let mut db = ReservationDb::new(config()).unwrap(); + db.apply_client( + context(1, 1), + request( + 1, + Command::CreatePool { + pool_id: PoolId(11), + total_capacity: 5, + }, + ), + ); + db.apply_client( + context(2, 2), + request( + 2, + Command::PlaceHold { + pool_id: PoolId(11), + hold_id: HoldId(21), + quantity: 5, + deadline_slot: Slot(5), + }, + ), + ); + + let outcome = db.apply_client( + context(3, 20), + request( + 3, + Command::PlaceHold { + pool_id: PoolId(11), + hold_id: HoldId(22), + quantity: 5, + deadline_slot: Slot(30), + }, + ), + ); + + assert_eq!(outcome.result_code, ResultCode::Ok); + assert_eq!(outcome.pool_id, Some(PoolId(11))); + assert_eq!(outcome.hold_id, Some(HoldId(22))); + assert_eq!(db.snapshot().pools[0].held_capacity, 5); + assert_eq!( + db.snapshot() + .holds + .iter() + .find(|record| record.hold_id == HoldId(21)) + .unwrap() + .state, + HoldState::Expired + ); + } + + #[test] + fn duplicate_place_hold_retry_after_deadline_returns_cached_outcome() { + let mut db = ReservationDb::new(config()).unwrap(); + db.apply_client( + context(1, 1), + request( + 1, + Command::CreatePool { + pool_id: PoolId(11), + total_capacity: 5, + }, + ), + ); + let first = db.apply_client( + context(2, 2), + request( + 2, + Command::PlaceHold { + pool_id: PoolId(11), + hold_id: HoldId(21), + quantity: 5, + deadline_slot: Slot(5), + }, + ), + ); + + let retry = db.apply_client( + context(3, 6), + request( + 2, + Command::PlaceHold { + pool_id: PoolId(11), + hold_id: HoldId(21), + quantity: 5, + deadline_slot: Slot(5), + }, + ), + ); + + assert_eq!(retry, first); + assert_eq!(db.snapshot().pools[0].held_capacity, 0); + assert_eq!(db.snapshot().holds[0].state, HoldState::Expired); + } + #[test] fn expire_hold_transitions_to_expired_once_due() { let mut db = ReservationDb::new(config()).unwrap();