From 480b4cb66740ce806e8b5b36528bc3b6bc57ff0b Mon Sep 17 00:00:00 2001 From: Francesco Cislaghi Date: Thu, 26 Mar 2026 19:37:44 +0100 Subject: [PATCH 1/3] feat(reservation-core): add extend hold lifecycle --- crates/reservation-core/src/command.rs | 5 + crates/reservation-core/src/command_codec.rs | 22 +- crates/reservation-core/src/recovery.rs | 136 +++++++++++ crates/reservation-core/src/snapshot.rs | 21 +- crates/reservation-core/src/state_machine.rs | 228 +++++++++++++++++++ 5 files changed, 402 insertions(+), 10 deletions(-) diff --git a/crates/reservation-core/src/command.rs b/crates/reservation-core/src/command.rs index 4912575..6e611d5 100644 --- a/crates/reservation-core/src/command.rs +++ b/crates/reservation-core/src/command.rs @@ -5,6 +5,7 @@ pub(crate) const TAG_PLACE_HOLD: u8 = 2; pub(crate) const TAG_CONFIRM_HOLD: u8 = 3; pub(crate) const TAG_RELEASE_HOLD: u8 = 4; pub(crate) const TAG_EXPIRE_HOLD: u8 = 5; +pub(crate) const TAG_EXTEND_HOLD: u8 = 6; #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub struct CommandContext { @@ -43,6 +44,10 @@ pub enum Command { ReleaseHold { hold_id: HoldId, }, + ExtendHold { + hold_id: HoldId, + deadline_slot: Slot, + }, ExpireHold { hold_id: HoldId, }, diff --git a/crates/reservation-core/src/command_codec.rs b/crates/reservation-core/src/command_codec.rs index aa3ecf5..4591ced 100644 --- a/crates/reservation-core/src/command_codec.rs +++ b/crates/reservation-core/src/command_codec.rs @@ -1,6 +1,6 @@ use crate::command::{ - ClientRequest, Command, TAG_CONFIRM_HOLD, TAG_CREATE_POOL, TAG_EXPIRE_HOLD, TAG_PLACE_HOLD, - TAG_RELEASE_HOLD, + ClientRequest, Command, TAG_CONFIRM_HOLD, TAG_CREATE_POOL, TAG_EXPIRE_HOLD, TAG_EXTEND_HOLD, + TAG_PLACE_HOLD, TAG_RELEASE_HOLD, }; use crate::ids::{ClientId, HoldId, OperationId, PoolId, Slot}; @@ -79,6 +79,14 @@ fn encode_command(bytes: &mut Vec, command: &Command) { bytes.push(TAG_RELEASE_HOLD); bytes.extend_from_slice(&hold_id.get().to_le_bytes()); } + Command::ExtendHold { + hold_id, + deadline_slot, + } => { + bytes.push(TAG_EXTEND_HOLD); + bytes.extend_from_slice(&hold_id.get().to_le_bytes()); + bytes.extend_from_slice(&deadline_slot.get().to_le_bytes()); + } Command::ExpireHold { hold_id } => { bytes.push(TAG_EXPIRE_HOLD); bytes.extend_from_slice(&hold_id.get().to_le_bytes()); @@ -104,6 +112,10 @@ fn decode_command(cursor: &mut Cursor<'_>) -> Result TAG_RELEASE_HOLD => Ok(Command::ReleaseHold { hold_id: HoldId(cursor.read_u128()?), }), + TAG_EXTEND_HOLD => Ok(Command::ExtendHold { + hold_id: HoldId(cursor.read_u128()?), + deadline_slot: Slot(cursor.read_u64()?), + }), TAG_EXPIRE_HOLD => Ok(Command::ExpireHold { hold_id: HoldId(cursor.read_u128()?), }), @@ -184,9 +196,9 @@ mod tests { #[test] fn internal_command_round_trips() { - let command = Command::CreatePool { - pool_id: PoolId(7), - total_capacity: 9, + let command = Command::ExtendHold { + hold_id: HoldId(7), + deadline_slot: Slot(9), }; let decoded = decode_internal_command(&encode_internal_command(command)).unwrap(); diff --git a/crates/reservation-core/src/recovery.rs b/crates/reservation-core/src/recovery.rs index d4029dd..712ee60 100644 --- a/crates/reservation-core/src/recovery.rs +++ b/crates/reservation-core/src/recovery.rs @@ -303,6 +303,41 @@ mod tests { } } + fn create_pool_request() -> ClientRequest { + ClientRequest { + operation_id: OperationId(1), + client_id: ClientId(1), + command: Command::CreatePool { + pool_id: PoolId(11), + total_capacity: 5, + }, + } + } + + fn place_hold_request(deadline_slot: Slot) -> ClientRequest { + ClientRequest { + operation_id: OperationId(2), + client_id: ClientId(1), + command: Command::PlaceHold { + pool_id: PoolId(11), + hold_id: HoldId(21), + quantity: 5, + deadline_slot, + }, + } + } + + fn extend_hold_request(deadline_slot: Slot) -> ClientRequest { + ClientRequest { + operation_id: OperationId(3), + client_id: ClientId(1), + command: Command::ExtendHold { + hold_id: HoldId(21), + deadline_slot, + }, + } + } + #[test] fn recovery_replays_from_empty_snapshot() { let snapshot_path = temp_path("snapshot-empty", "snapshot"); @@ -483,6 +518,107 @@ mod tests { let _ = fs::remove_file(wal_path); } + #[test] + fn recovery_preserves_extended_deadline_before_later_request() { + let snapshot_path = temp_path("snapshot-extend-live-match", "snapshot"); + let wal_path = temp_path("wal-extend-live-match", "wal"); + let snapshot_file = SnapshotFile::new(&snapshot_path, 4096); + let mut wal_file = WalFile::open(&wal_path, 1024).unwrap(); + + let prefix = [ + Frame { + lsn: Lsn(1), + request_slot: Slot(1), + record_type: RecordType::ClientCommand, + payload: encode_client_request(create_pool_request()), + }, + Frame { + lsn: Lsn(2), + request_slot: Slot(2), + record_type: RecordType::ClientCommand, + payload: encode_client_request(place_hold_request(Slot(5))), + }, + Frame { + lsn: Lsn(3), + request_slot: Slot(3), + record_type: RecordType::ClientCommand, + payload: encode_client_request(extend_hold_request(Slot(10))), + }, + ]; + for frame in prefix { + wal_file.append_frame(&frame).unwrap(); + } + wal_file.sync().unwrap(); + + let mut live = ReservationDb::new(config()).unwrap(); + let _ = live.apply_client( + CommandContext { + lsn: Lsn(1), + request_slot: Slot(1), + }, + create_pool_request(), + ); + let _ = live.apply_client( + CommandContext { + lsn: Lsn(2), + request_slot: Slot(2), + }, + place_hold_request(Slot(5)), + ); + let _ = live.apply_client( + CommandContext { + lsn: Lsn(3), + request_slot: Slot(3), + }, + extend_hold_request(Slot(10)), + ); + + let mut recovered = recover_reservation(config(), &snapshot_file, &mut wal_file).unwrap(); + let request = ClientRequest { + operation_id: OperationId(4), + client_id: ClientId(1), + command: Command::CreatePool { + pool_id: PoolId(12), + total_capacity: 1, + }, + }; + let context = CommandContext { + lsn: Lsn(4), + request_slot: Slot(8), + }; + + let live_outcome = live.apply_client(context, request); + let recovered_outcome = recovered.db.apply_client(context, request); + + assert_eq!(recovered_outcome, live_outcome); + assert_eq!(recovered.db.snapshot(), live.snapshot()); + assert_eq!( + recovered + .db + .snapshot() + .holds + .iter() + .find(|record| record.hold_id == HoldId(21)) + .unwrap() + .state, + HoldState::Held + ); + assert_eq!( + recovered + .db + .snapshot() + .holds + .iter() + .find(|record| record.hold_id == HoldId(21)) + .unwrap() + .deadline_slot, + Slot(10) + ); + + let _ = fs::remove_file(snapshot_path); + let _ = fs::remove_file(wal_path); + } + #[test] fn recovery_truncates_torn_expire_frame_without_fabricating_expired_state() { let snapshot_path = temp_path("snapshot-torn-expire", "snapshot"); diff --git a/crates/reservation-core/src/snapshot.rs b/crates/reservation-core/src/snapshot.rs index 0292dad..369737b 100644 --- a/crates/reservation-core/src/snapshot.rs +++ b/crates/reservation-core/src/snapshot.rs @@ -1,6 +1,7 @@ use crate::Command; use crate::command::{ - TAG_CONFIRM_HOLD, TAG_CREATE_POOL, TAG_EXPIRE_HOLD, TAG_PLACE_HOLD, TAG_RELEASE_HOLD, + TAG_CONFIRM_HOLD, TAG_CREATE_POOL, TAG_EXPIRE_HOLD, TAG_EXTEND_HOLD, TAG_PLACE_HOLD, + TAG_RELEASE_HOLD, }; use crate::config::{Config, ConfigError}; use crate::fixed_map::FixedMapError; @@ -365,6 +366,14 @@ fn encode_command(bytes: &mut Vec, command: Command) { bytes.push(TAG_RELEASE_HOLD); bytes.extend_from_slice(&hold_id.get().to_le_bytes()); } + Command::ExtendHold { + hold_id, + deadline_slot, + } => { + bytes.push(TAG_EXTEND_HOLD); + bytes.extend_from_slice(&hold_id.get().to_le_bytes()); + bytes.extend_from_slice(&deadline_slot.get().to_le_bytes()); + } Command::ExpireHold { hold_id } => { bytes.push(TAG_EXPIRE_HOLD); bytes.extend_from_slice(&hold_id.get().to_le_bytes()); @@ -390,6 +399,10 @@ fn decode_command(bytes: &[u8], cursor: &mut usize) -> Result Ok(Command::ReleaseHold { hold_id: HoldId(decode_u128(bytes, cursor)?), }), + TAG_EXTEND_HOLD => Ok(Command::ExtendHold { + hold_id: HoldId(decode_u128(bytes, cursor)?), + deadline_slot: Slot(decode_u64(bytes, cursor)?), + }), TAG_EXPIRE_HOLD => Ok(Command::ExpireHold { hold_id: HoldId(decode_u128(bytes, cursor)?), }), @@ -540,11 +553,9 @@ mod tests { operations: vec![OperationRecord { client_id: ClientId(1), operation_id: OperationId(1), - command: Command::PlaceHold { - pool_id: PoolId(11), + command: Command::ExtendHold { hold_id: HoldId(21), - quantity: 2, - deadline_slot: Slot(5), + deadline_slot: Slot(7), }, result_code: ResultCode::Ok, result_pool_id: Some(PoolId(11)), diff --git a/crates/reservation-core/src/state_machine.rs b/crates/reservation-core/src/state_machine.rs index 97a901e..f941e3c 100644 --- a/crates/reservation-core/src/state_machine.rs +++ b/crates/reservation-core/src/state_machine.rs @@ -248,6 +248,10 @@ impl ReservationDb { self.apply_confirm_hold(context.request_slot, hold_id) } Command::ReleaseHold { hold_id } => self.apply_release_hold(hold_id), + Command::ExtendHold { + hold_id, + deadline_slot, + } => self.apply_extend_hold(context.request_slot, hold_id, deadline_slot), Command::ExpireHold { hold_id } => { self.apply_expire_hold(context.request_slot, hold_id) } @@ -421,6 +425,61 @@ impl ReservationDb { CommandOutcome::with_pool_and_hold(ResultCode::Ok, hold.pool_id, hold.hold_id) } + fn apply_extend_hold( + &mut self, + request_slot: Slot, + hold_id: HoldId, + deadline_slot: Slot, + ) -> CommandOutcome { + let Some(mut hold) = self.holds.get(hold_id).copied() else { + warn!( + "extend_hold rejected hold_not_found hold_id={}", + hold_id.get() + ); + return CommandOutcome::with_hold(ResultCode::HoldNotFound, hold_id); + }; + + match hold.state { + HoldState::Confirmed | HoldState::Released => { + return CommandOutcome::with_hold(ResultCode::InvalidState, hold_id); + } + HoldState::Expired => { + return CommandOutcome::with_hold(ResultCode::HoldExpired, hold_id); + } + HoldState::Held => {} + } + + if request_slot.get() >= hold.deadline_slot.get() { + let pool = self + .pools + .get_mut(hold.pool_id) + .expect("hold pool must remain present during expiration"); + pool.held_capacity -= hold.quantity; + hold.state = HoldState::Expired; + self.replace_hold(hold); + return CommandOutcome::with_pool_and_hold( + ResultCode::HoldExpired, + hold.pool_id, + hold.hold_id, + ); + } + + if deadline_slot.get() <= request_slot.get() + || deadline_slot.get() <= hold.deadline_slot.get() + { + return CommandOutcome::with_pool_and_hold( + ResultCode::InvalidState, + hold.pool_id, + hold.hold_id, + ); + } + + hold.deadline_slot = deadline_slot; + self.replace_hold(hold); + self.push_hold_expiry(hold_id, deadline_slot); + CommandOutcome::with_pool_and_hold(ResultCode::Ok, hold.pool_id, hold.hold_id) + } + fn apply_expire_hold(&mut self, request_slot: Slot, hold_id: HoldId) -> CommandOutcome { let Some(mut hold) = self.holds.get(hold_id).copied() else { warn!( @@ -670,6 +729,10 @@ impl ReservationDb { continue; } + if request_slot.get() <= hold.deadline_slot.get() { + continue; + } + let pool = self .pools .get_mut(hold.pool_id) @@ -1116,6 +1179,171 @@ mod tests { assert_eq!(db.snapshot().holds[0].state, HoldState::Released); } + #[test] + fn extend_hold_moves_deadline_forward_without_changing_capacity() { + let mut db = ReservationDb::new(config()).unwrap(); + db.apply_client( + context(1, 1), + request( + 1, + Command::CreatePool { + pool_id: PoolId(11), + total_capacity: 10, + }, + ), + ); + db.apply_client( + context(2, 2), + request( + 2, + Command::PlaceHold { + pool_id: PoolId(11), + hold_id: HoldId(21), + quantity: 3, + deadline_slot: Slot(5), + }, + ), + ); + + let outcome = db.apply_client( + context(3, 3), + request( + 3, + Command::ExtendHold { + hold_id: HoldId(21), + deadline_slot: Slot(9), + }, + ), + ); + + assert_eq!(outcome.result_code, ResultCode::Ok); + assert_eq!(db.snapshot().pools[0].held_capacity, 3); + assert_eq!(db.snapshot().pools[0].consumed_capacity, 0); + assert_eq!(db.snapshot().holds[0].deadline_slot, Slot(9)); + assert_eq!(db.snapshot().holds[0].state, HoldState::Held); + } + + #[test] + fn extend_hold_rejects_elapsed_or_non_increasing_deadline() { + let mut db = ReservationDb::new(config()).unwrap(); + db.apply_client( + context(1, 1), + request( + 1, + Command::CreatePool { + pool_id: PoolId(11), + total_capacity: 10, + }, + ), + ); + db.apply_client( + context(2, 2), + request( + 2, + Command::PlaceHold { + pool_id: PoolId(11), + hold_id: HoldId(21), + quantity: 3, + deadline_slot: Slot(8), + }, + ), + ); + + let stale = db.apply_client( + context(3, 3), + request( + 3, + Command::ExtendHold { + hold_id: HoldId(21), + deadline_slot: Slot(3), + }, + ), + ); + let non_increasing = db.apply_client( + context(4, 4), + request( + 4, + Command::ExtendHold { + hold_id: HoldId(21), + deadline_slot: Slot(8), + }, + ), + ); + + assert_eq!(stale.result_code, ResultCode::InvalidState); + assert_eq!(non_increasing.result_code, ResultCode::InvalidState); + assert_eq!(db.snapshot().holds[0].deadline_slot, Slot(8)); + assert_eq!(db.snapshot().holds[0].state, HoldState::Held); + } + + #[test] + fn extended_hold_does_not_auto_expire_at_old_deadline() { + let mut db = ReservationDb::new(config()).unwrap(); + db.apply_client( + context(1, 1), + request( + 1, + Command::CreatePool { + pool_id: PoolId(11), + total_capacity: 10, + }, + ), + ); + db.apply_client( + context(2, 2), + request( + 2, + Command::PlaceHold { + pool_id: PoolId(11), + hold_id: HoldId(21), + quantity: 3, + deadline_slot: Slot(5), + }, + ), + ); + db.apply_client( + context(3, 3), + request( + 3, + Command::ExtendHold { + hold_id: HoldId(21), + deadline_slot: Slot(10), + }, + ), + ); + + let outcome = db.apply_client( + context(4, 8), + request( + 4, + Command::CreatePool { + pool_id: PoolId(12), + total_capacity: 1, + }, + ), + ); + + assert_eq!(outcome.result_code, ResultCode::Ok); + assert_eq!( + db.snapshot() + .holds + .iter() + .find(|record| record.hold_id == HoldId(21)) + .unwrap() + .state, + HoldState::Held + ); + assert_eq!( + db.snapshot() + .holds + .iter() + .find(|record| record.hold_id == HoldId(21)) + .unwrap() + .deadline_slot, + Slot(10) + ); + } + #[test] fn confirm_after_deadline_expires_hold_and_releases_capacity() { let mut db = ReservationDb::new(config()).unwrap(); From ca5ed3ae518d8d2c141be4b074faa041368579a5 Mon Sep 17 00:00:00 2001 From: Francesco Cislaghi Date: Fri, 27 Mar 2026 08:44:42 +0100 Subject: [PATCH 2/3] fix(reservation-core): reschedule hold expiry on extend --- crates/reservation-core/src/state_machine.rs | 78 +++++++++++++++++++- 1 file changed, 77 insertions(+), 1 deletion(-) diff --git a/crates/reservation-core/src/state_machine.rs b/crates/reservation-core/src/state_machine.rs index f941e3c..f674227 100644 --- a/crates/reservation-core/src/state_machine.rs +++ b/crates/reservation-core/src/state_machine.rs @@ -476,7 +476,7 @@ impl ReservationDb { hold.deadline_slot = deadline_slot; self.replace_hold(hold); - self.push_hold_expiry(hold_id, deadline_slot); + self.rebuild_live_hold_retire_queue(); CommandOutcome::with_pool_and_hold(ResultCode::Ok, hold.pool_id, hold.hold_id) } @@ -548,6 +548,31 @@ impl ReservationDb { } } + fn rebuild_live_hold_retire_queue(&mut self) { + let max_holds = usize::try_from(self.config.max_holds).expect("validated max_holds"); + let mut entries: Vec<_> = self + .holds + .iter() + .filter(|record| record.state == HoldState::Held) + .map(|record| (record.hold_id, record.deadline_slot)) + .collect(); + entries + .sort_unstable_by_key(|(hold_id, deadline_slot)| (deadline_slot.get(), hold_id.get())); + + self.hold_retire_queue = RetireQueue::with_capacity(max_holds); + for (hold_id, deadline_slot) in entries { + match self.hold_retire_queue.push(RetireEntry { + key: hold_id, + retire_after_slot: deadline_slot, + }) { + Ok(()) => {} + Err(RetireQueueError::Full) => { + panic!("live hold retirement rebuild must respect configured capacity") + } + } + } + } + pub(crate) fn rebuild_operation_retire_queue( &mut self, entries: &mut Vec<(ClientOperationKey, Slot, u64)>, @@ -1344,6 +1369,57 @@ mod tests { ); } + #[test] + fn extend_hold_reschedules_without_overfilling_single_hold_queue() { + let mut config = config(); + config.max_holds = 1; + let mut db = ReservationDb::new(config).unwrap(); + db.apply_client( + context(1, 1), + request( + 1, + Command::CreatePool { + pool_id: PoolId(11), + total_capacity: 10, + }, + ), + ); + db.apply_client( + context(2, 2), + request( + 2, + Command::PlaceHold { + pool_id: PoolId(11), + hold_id: HoldId(21), + quantity: 3, + deadline_slot: Slot(5), + }, + ), + ); + + let outcome = db.apply_client( + context(3, 3), + request( + 3, + Command::ExtendHold { + hold_id: HoldId(21), + deadline_slot: Slot(10), + }, + ), + ); + + assert_eq!(outcome.result_code, ResultCode::Ok); + assert_eq!( + db.hold_retire_queue.front(), + Some(allocdb_retire_queue::RetireEntry { + key: HoldId(21), + retire_after_slot: Slot(10), + }) + ); + assert_eq!(db.hold_retire_queue.pop_front().unwrap().key, HoldId(21)); + assert_eq!(db.hold_retire_queue.pop_front(), None); + } + #[test] fn confirm_after_deadline_expires_hold_and_releases_capacity() { let mut db = ReservationDb::new(config()).unwrap(); From 9029bd2f389fecfeccb1122f782d1b9eb60dcaae Mon Sep 17 00:00:00 2001 From: Francesco Cislaghi Date: Fri, 27 Mar 2026 08:47:16 +0100 Subject: [PATCH 3/3] fix(reservation-core): align fixtures with extend hold --- crates/reservation-core/src/command_codec.rs | 29 ++++++++++++++++---- crates/reservation-core/src/snapshot.rs | 2 +- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/crates/reservation-core/src/command_codec.rs b/crates/reservation-core/src/command_codec.rs index 4591ced..407f76a 100644 --- a/crates/reservation-core/src/command_codec.rs +++ b/crates/reservation-core/src/command_codec.rs @@ -196,13 +196,30 @@ mod tests { #[test] fn internal_command_round_trips() { - let command = Command::ExtendHold { - hold_id: HoldId(7), - deadline_slot: Slot(9), - }; + let commands = [ + Command::CreatePool { + pool_id: PoolId(5), + total_capacity: 9, + }, + Command::PlaceHold { + pool_id: PoolId(5), + hold_id: HoldId(6), + quantity: 2, + deadline_slot: Slot(7), + }, + Command::ConfirmHold { hold_id: HoldId(6) }, + Command::ReleaseHold { hold_id: HoldId(6) }, + Command::ExtendHold { + hold_id: HoldId(6), + deadline_slot: Slot(8), + }, + Command::ExpireHold { hold_id: HoldId(6) }, + ]; - let decoded = decode_internal_command(&encode_internal_command(command)).unwrap(); - assert_eq!(decoded, command); + for command in commands { + let decoded = decode_internal_command(&encode_internal_command(command)).unwrap(); + assert_eq!(decoded, command); + } } #[test] diff --git a/crates/reservation-core/src/snapshot.rs b/crates/reservation-core/src/snapshot.rs index 369737b..72e27c6 100644 --- a/crates/reservation-core/src/snapshot.rs +++ b/crates/reservation-core/src/snapshot.rs @@ -547,7 +547,7 @@ mod tests { hold_id: HoldId(21), pool_id: PoolId(11), quantity: 2, - deadline_slot: Slot(5), + deadline_slot: Slot(7), state: HoldState::Held, }], operations: vec![OperationRecord {