From f3d15d29eacdafdc71ecc079bc000b16b457d2ae Mon Sep 17 00:00:00 2001 From: Francesco Cislaghi Date: Thu, 26 Mar 2026 18:10:48 +0100 Subject: [PATCH] refactor: extract shared wal file substrate --- Cargo.lock | 7 + Cargo.toml | 1 + crates/allocdb-core/Cargo.toml | 1 + crates/allocdb-core/src/recovery.rs | 6 +- .../src/recovery_issue_30_tests.rs | 10 +- .../src/recovery_issue_31_tests.rs | 2 +- .../allocdb-core/src/recovery_revoke_tests.rs | 6 +- crates/allocdb-core/src/recovery_tests.rs | 17 +- crates/allocdb-core/src/wal_file.rs | 87 ++------ crates/allocdb-node/src/engine.rs | 15 +- crates/allocdb-wal-file/Cargo.toml | 8 + crates/allocdb-wal-file/src/lib.rs | 186 ++++++++++++++++++ crates/quota-core/Cargo.toml | 1 + crates/quota-core/src/recovery.rs | 12 +- crates/quota-core/src/wal_file.rs | 87 ++------ crates/reservation-core/Cargo.toml | 1 + crates/reservation-core/src/wal_file.rs | 87 ++------ scripts/check_repo.sh | 2 +- 18 files changed, 294 insertions(+), 242 deletions(-) create mode 100644 crates/allocdb-wal-file/Cargo.toml create mode 100644 crates/allocdb-wal-file/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 2f4112e..e4a4786 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -24,6 +24,7 @@ name = "allocdb-core" version = "0.1.0" dependencies = [ "allocdb-retire-queue", + "allocdb-wal-file", "allocdb-wal-frame", "log", ] @@ -43,6 +44,10 @@ dependencies = [ name = "allocdb-retire-queue" version = "0.1.0" +[[package]] +name = "allocdb-wal-file" +version = "0.1.0" + [[package]] name = "allocdb-wal-frame" version = "0.1.0" @@ -227,6 +232,7 @@ name = "quota-core" version = "0.1.0" dependencies = [ "allocdb-retire-queue", + "allocdb-wal-file", "allocdb-wal-frame", "crc32c", "log", @@ -275,6 +281,7 @@ name = "reservation-core" version = "0.1.0" dependencies = [ "allocdb-retire-queue", + "allocdb-wal-file", "allocdb-wal-frame", "crc32c", "log", diff --git a/Cargo.toml b/Cargo.toml index 183392f..bddbeb5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] members = [ "crates/allocdb-retire-queue", + "crates/allocdb-wal-file", "crates/allocdb-wal-frame", "crates/allocdb-bench", "crates/allocdb-core", diff --git a/crates/allocdb-core/Cargo.toml b/crates/allocdb-core/Cargo.toml index cfd26d9..247a70a 100644 --- a/crates/allocdb-core/Cargo.toml +++ b/crates/allocdb-core/Cargo.toml @@ -6,6 +6,7 @@ rust-version.workspace = true [dependencies] allocdb-retire-queue = { path = "../allocdb-retire-queue" } +allocdb-wal-file = { path = "../allocdb-wal-file" } allocdb-wal-frame = { path = "../allocdb-wal-frame" } log = "0.4.28" diff --git a/crates/allocdb-core/src/recovery.rs b/crates/allocdb-core/src/recovery.rs index 6837e53..7568c53 100644 --- a/crates/allocdb-core/src/recovery.rs +++ b/crates/allocdb-core/src/recovery.rs @@ -149,7 +149,7 @@ impl From for RecoveryObserverError { pub fn recover_allocdb( config: Config, snapshot_file: &SnapshotFile, - wal_file: &WalFile, + wal_file: &mut WalFile, ) -> Result { recover_allocdb_with_observer( config, @@ -173,7 +173,7 @@ pub fn recover_allocdb( pub fn recover_allocdb_with_observer( config: Config, snapshot_file: &SnapshotFile, - wal_file: &WalFile, + wal_file: &mut WalFile, mut observer: F, ) -> Result> where @@ -203,7 +203,7 @@ where fn recover_allocdb_impl( config: Config, snapshot_file: &SnapshotFile, - wal_file: &WalFile, + wal_file: &mut WalFile, observer: &mut F, ) -> Result> where diff --git a/crates/allocdb-core/src/recovery_issue_30_tests.rs b/crates/allocdb-core/src/recovery_issue_30_tests.rs index f1e49ff..865dcb5 100644 --- a/crates/allocdb-core/src/recovery_issue_30_tests.rs +++ b/crates/allocdb-core/src/recovery_issue_30_tests.rs @@ -79,7 +79,8 @@ fn recover_allocdb_rejects_non_monotonic_lsn() { .unwrap(); wal.sync().unwrap(); - let error = recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &wal).unwrap_err(); + let error = + recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &mut wal).unwrap_err(); assert!(matches!( error, RecoveryError::Replay(ReplayError::NonMonotonicLsn { @@ -126,7 +127,8 @@ fn recover_allocdb_rejects_rewound_request_slot() { .unwrap(); wal.sync().unwrap(); - let error = recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &wal).unwrap_err(); + let error = + recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &mut wal).unwrap_err(); assert!(matches!( error, RecoveryError::Replay(ReplayError::RewoundRequestSlot { @@ -143,7 +145,7 @@ fn recover_allocdb_rejects_rewound_request_slot() { fn recover_allocdb_rejects_semantically_invalid_snapshot() { let wal_path = test_path("recover-invalid-snapshot", "wal"); let snapshot_path = test_path("recover-invalid-snapshot", "snapshot"); - let wal = WalFile::open(&wal_path, 512).unwrap(); + let mut wal = WalFile::open(&wal_path, 512).unwrap(); let snapshot_file = SnapshotFile::new(&snapshot_path); snapshot_file @@ -172,7 +174,7 @@ fn recover_allocdb_rejects_semantically_invalid_snapshot() { }) .unwrap(); - let error = recover_allocdb(config(), &snapshot_file, &wal).unwrap_err(); + let error = recover_allocdb(config(), &snapshot_file, &mut wal).unwrap_err(); assert!(matches!( error, RecoveryError::Snapshot(SnapshotError::DuplicateResourceId(ResourceId(11))) diff --git a/crates/allocdb-core/src/recovery_issue_31_tests.rs b/crates/allocdb-core/src/recovery_issue_31_tests.rs index 64b7aee..0181f93 100644 --- a/crates/allocdb-core/src/recovery_issue_31_tests.rs +++ b/crates/allocdb-core/src/recovery_issue_31_tests.rs @@ -67,7 +67,7 @@ fn recover_allocdb_rejects_client_slot_overflow_in_replayed_wal() { .unwrap(); wal.sync().unwrap(); - let error = recover_allocdb(config, &SnapshotFile::new(&snapshot_path), &wal).unwrap_err(); + let error = recover_allocdb(config, &SnapshotFile::new(&snapshot_path), &mut wal).unwrap_err(); assert!(matches!( error, RecoveryError::Replay(ReplayError::SlotOverflow { diff --git a/crates/allocdb-core/src/recovery_revoke_tests.rs b/crates/allocdb-core/src/recovery_revoke_tests.rs index 5f85e5c..a844486 100644 --- a/crates/allocdb-core/src/recovery_revoke_tests.rs +++ b/crates/allocdb-core/src/recovery_revoke_tests.rs @@ -89,7 +89,8 @@ fn recover_allocdb_preserves_revoking_state() { wal.append_frame(&client_frame(4, 2, &revoke)).unwrap(); wal.sync().unwrap(); - let recovered = recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &wal).unwrap(); + let recovered = + recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &mut wal).unwrap(); let reservation = recovered.db.reservation(ReservationId(2), Slot(2)).unwrap(); let resource = recovered.db.resource(ResourceId(11)).unwrap(); @@ -156,7 +157,8 @@ fn recover_allocdb_preserves_revoked_state() { wal.append_frame(&client_frame(5, 3, &reclaim)).unwrap(); wal.sync().unwrap(); - let recovered = recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &wal).unwrap(); + let recovered = + recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &mut wal).unwrap(); let reservation = recovered.db.reservation(ReservationId(2), Slot(5)).unwrap(); let resource = recovered.db.resource(ResourceId(11)).unwrap(); diff --git a/crates/allocdb-core/src/recovery_tests.rs b/crates/allocdb-core/src/recovery_tests.rs index a77789f..5a5083b 100644 --- a/crates/allocdb-core/src/recovery_tests.rs +++ b/crates/allocdb-core/src/recovery_tests.rs @@ -92,7 +92,7 @@ fn recover_allocdb_replays_wal_without_snapshot() { wal.sync().unwrap(); let snapshot_file = SnapshotFile::new(&snapshot_path); - let recovered = recover_allocdb(config(), &snapshot_file, &wal).unwrap(); + let recovered = recover_allocdb(config(), &snapshot_file, &mut wal).unwrap(); assert!(!recovered.loaded_snapshot); assert_eq!(recovered.loaded_snapshot_lsn, None); @@ -148,7 +148,7 @@ fn recover_allocdb_skips_frames_covered_by_snapshot() { wal.append_frame(&client_frame(3, 2, &confirm)).unwrap(); wal.sync().unwrap(); - let recovered = recover_allocdb(config(), &snapshot_file, &wal).unwrap(); + let recovered = recover_allocdb(config(), &snapshot_file, &mut wal).unwrap(); assert!(recovered.loaded_snapshot); assert_eq!(recovered.loaded_snapshot_lsn, Some(Lsn(1))); @@ -193,7 +193,8 @@ fn recover_allocdb_truncates_torn_tail() { .write_all(&torn[..torn.len() - 2]) .unwrap(); - let recovered = recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &wal).unwrap(); + let recovered = + recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &mut wal).unwrap(); assert!(!recovered.loaded_snapshot); assert_eq!(recovered.recovered_wal.scan_result.frames.len(), 1); @@ -212,13 +213,13 @@ fn recover_allocdb_truncates_torn_tail() { fn recover_allocdb_marks_empty_snapshot_as_loaded() { let wal_path = test_path("recover-empty-snapshot", "wal"); let snapshot_path = test_path("recover-empty-snapshot", "snapshot"); - let wal = WalFile::open(&wal_path, 512).unwrap(); + let mut wal = WalFile::open(&wal_path, 512).unwrap(); let snapshot_file = SnapshotFile::new(&snapshot_path); snapshot_file .write_snapshot(&AllocDb::new(config()).unwrap().snapshot()) .unwrap(); - let recovered = recover_allocdb(config(), &snapshot_file, &wal).unwrap(); + let recovered = recover_allocdb(config(), &snapshot_file, &mut wal).unwrap(); assert!(recovered.loaded_snapshot); assert_eq!(recovered.loaded_snapshot_lsn, None); @@ -262,7 +263,8 @@ fn recover_allocdb_replays_internal_commands() { wal.append_frame(&internal_frame(3, 5, &expire)).unwrap(); wal.sync().unwrap(); - let recovered = recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &wal).unwrap(); + let recovered = + recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &mut wal).unwrap(); assert_eq!(recovered.replayed_wal_frame_count, 3); assert_eq!(recovered.replayed_wal_last_lsn, Some(Lsn(3))); @@ -308,7 +310,8 @@ fn recover_allocdb_fails_closed_on_mid_log_corruption() { bytes[last_index] ^= 0xff; fs::write(&wal_path, bytes).unwrap(); - let error = recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &wal).unwrap_err(); + let error = + recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &mut wal).unwrap_err(); assert!(matches!( error, diff --git a/crates/allocdb-core/src/wal_file.rs b/crates/allocdb-core/src/wal_file.rs index c438654..6d5750a 100644 --- a/crates/allocdb-core/src/wal_file.rs +++ b/crates/allocdb-core/src/wal_file.rs @@ -1,13 +1,11 @@ -use std::fs::{self, File, OpenOptions}; -use std::io::{Read, Seek, SeekFrom, Write}; -use std::path::{Path, PathBuf}; +use std::path::Path; use crate::wal::{DecodeError, Frame, ScanResult, ScanStopReason, scan_frames}; +use allocdb_wal_file::AppendWalFile; #[derive(Debug)] pub struct WalFile { - path: PathBuf, - file: File, + file: AppendWalFile, max_payload_bytes: usize, } @@ -43,11 +41,9 @@ impl WalFile { /// /// Returns [`WalFileError`] if the file cannot be opened or created. pub fn open(path: impl AsRef, max_payload_bytes: usize) -> Result { - let path = path.as_ref().to_path_buf(); - let file = open_append_file(&path)?; + let file = AppendWalFile::open(path)?; Ok(Self { - path, file, max_payload_bytes, }) @@ -55,7 +51,7 @@ impl WalFile { #[must_use] pub fn path(&self) -> &Path { - &self.path + self.file.path() } /// Appends one encoded frame to the WAL file. @@ -66,9 +62,7 @@ impl WalFile { /// or [`WalFileError::Io`] if the append fails. pub fn append_frame(&mut self, frame: &Frame) -> Result<(), WalFileError> { self.validate_payload_len(frame)?; - - let encoded = frame.encode(); - self.file.write_all(&encoded)?; + self.file.append_bytes(&frame.encode())?; Ok(()) } @@ -78,7 +72,7 @@ impl WalFile { /// /// Returns [`WalFileError::Io`] if the sync fails. pub fn sync(&self) -> Result<(), WalFileError> { - self.file.sync_data()?; + self.file.sync()?; Ok(()) } @@ -88,7 +82,7 @@ impl WalFile { /// /// Returns [`WalFileError::Io`] if the file cannot be read. pub fn recover(&self) -> Result { - recover_path(&self.path) + recover_file(&self.file) } /// Truncates the file to the last valid frame boundary discovered by recovery scanning. @@ -100,8 +94,8 @@ impl WalFile { /// # Panics /// /// Panics only if the discovered valid prefix cannot fit into `u64`. - pub fn truncate_to_valid_prefix(&self) -> Result { - let recovered = recover_path(&self.path)?; + pub fn truncate_to_valid_prefix(&mut self) -> Result { + let recovered = recover_file(&self.file)?; let valid_prefix = u64::try_from(recovered.scan_result.valid_up_to).expect("valid WAL prefix must fit"); @@ -109,10 +103,7 @@ impl WalFile { ScanStopReason::CleanEof => {} ScanStopReason::TornTail { .. } => { if recovered.file_len > valid_prefix { - let mut file = OpenOptions::new().write(true).open(&self.path)?; - file.set_len(valid_prefix)?; - file.seek(SeekFrom::Start(valid_prefix))?; - file.sync_data()?; + self.file.truncate_to(valid_prefix)?; } } ScanStopReason::Corruption { offset, error } => { @@ -134,22 +125,11 @@ impl WalFile { self.validate_payload_len(frame)?; } - if let Some(parent) = self.path.parent() { - fs::create_dir_all(parent)?; - } - - let temp_path = self.temp_path(); - { - let mut temp_file = File::create(&temp_path)?; - for frame in frames { - temp_file.write_all(&frame.encode())?; - } - temp_file.sync_data()?; + let mut bytes = Vec::new(); + for frame in frames { + bytes.extend_from_slice(&frame.encode()); } - - fs::rename(&temp_path, &self.path)?; - sync_parent_dir(&self.path)?; - self.file = open_append_file(&self.path)?; + self.file.replace_with_bytes(&bytes)?; Ok(()) } @@ -163,22 +143,10 @@ impl WalFile { Ok(()) } - - fn temp_path(&self) -> PathBuf { - let mut temp_path = self.path.clone(); - let extension = temp_path - .extension() - .and_then(|value| value.to_str()) - .map_or_else(|| "tmp".to_owned(), |value| format!("{value}.tmp")); - temp_path.set_extension(extension); - temp_path - } } -fn recover_path(path: &Path) -> Result { - let mut file = File::open(path)?; - let mut bytes = Vec::new(); - file.read_to_end(&mut bytes)?; +fn recover_file(file: &AppendWalFile) -> Result { + let bytes = file.read_all()?; let scan_result = scan_frames(&bytes); Ok(RecoveredWal { scan_result, @@ -186,27 +154,6 @@ fn recover_path(path: &Path) -> Result { }) } -fn open_append_file(path: &Path) -> Result { - OpenOptions::new() - .create(true) - .read(true) - .append(true) - .open(path) -} - -#[cfg(unix)] -fn sync_parent_dir(path: &Path) -> Result<(), std::io::Error> { - if let Some(parent) = path.parent() { - OpenOptions::new().read(true).open(parent)?.sync_all()?; - } - Ok(()) -} - -#[cfg(not(unix))] -fn sync_parent_dir(_path: &Path) -> Result<(), std::io::Error> { - Ok(()) -} - #[cfg(test)] mod tests { use std::fs; diff --git a/crates/allocdb-node/src/engine.rs b/crates/allocdb-node/src/engine.rs index 6152b81..7649120 100644 --- a/crates/allocdb-node/src/engine.rs +++ b/crates/allocdb-node/src/engine.rs @@ -417,10 +417,11 @@ impl SingleNodeEngine { ) -> Result { engine_config.validate().map_err(EngineOpenError::from)?; let snapshot_file = SnapshotFile::new(snapshot_path); - let wal = WalFile::open(wal_path.as_ref(), engine_config.max_command_bytes) + let mut wal = WalFile::open(wal_path.as_ref(), engine_config.max_command_bytes) .map_err(EngineOpenError::from)?; let mut pending_crash = crash_plan; - let recovered = recover_allocdb_with_observer(core_config, &snapshot_file, &wal, |point| { + let recovered = + recover_allocdb_with_observer(core_config, &snapshot_file, &mut wal, |point| { if pending_crash.is_some_and(|plan| plan.matches_recovery_boundary(point)) { let plan = pending_crash .take() @@ -433,11 +434,11 @@ impl SingleNodeEngine { } Ok(()) - }) - .map_err(|error| match error { - RecoveryObserverError::Recovery(error) => RecoverEngineError::Recovery(error), - RecoveryObserverError::Observer(plan) => RecoverEngineError::CrashInjected(plan), - })?; + }) + .map_err(|error| match error { + RecoveryObserverError::Recovery(error) => RecoverEngineError::Recovery(error), + RecoveryObserverError::Observer(plan) => RecoverEngineError::CrashInjected(plan), + })?; Self::from_parts( recovered.db, engine_config, diff --git a/crates/allocdb-wal-file/Cargo.toml b/crates/allocdb-wal-file/Cargo.toml new file mode 100644 index 0000000..b69545c --- /dev/null +++ b/crates/allocdb-wal-file/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "allocdb-wal-file" +version.workspace = true +edition.workspace = true +rust-version.workspace = true + +[lints] +workspace = true diff --git a/crates/allocdb-wal-file/src/lib.rs b/crates/allocdb-wal-file/src/lib.rs new file mode 100644 index 0000000..dc4e240 --- /dev/null +++ b/crates/allocdb-wal-file/src/lib.rs @@ -0,0 +1,186 @@ +use std::fs::{self, File, OpenOptions}; +use std::io::{Read, Seek, SeekFrom, Write}; +use std::path::{Path, PathBuf}; + +#[derive(Debug)] +pub struct AppendWalFile { + path: PathBuf, + file: File, +} + +impl AppendWalFile { + /// Opens or creates one appendable WAL file. + /// + /// # Errors + /// + /// Returns [`std::io::Error`] if the file cannot be opened or created. + pub fn open(path: impl AsRef) -> Result { + let path = path.as_ref().to_path_buf(); + let file = open_append_file(&path)?; + Ok(Self { path, file }) + } + + #[must_use] + pub fn path(&self) -> &Path { + &self.path + } + + /// Appends already encoded frame bytes. + /// + /// # Errors + /// + /// Returns [`std::io::Error`] if the append fails. + pub fn append_bytes(&mut self, bytes: &[u8]) -> Result<(), std::io::Error> { + self.file.write_all(bytes) + } + + /// Forces appended WAL bytes to durable storage. + /// + /// # Errors + /// + /// Returns [`std::io::Error`] if syncing fails. + pub fn sync(&self) -> Result<(), std::io::Error> { + self.file.sync_data() + } + + /// Reads the full WAL contents. + /// + /// # Errors + /// + /// Returns [`std::io::Error`] if the file cannot be read. + pub fn read_all(&self) -> Result, std::io::Error> { + let mut file = File::open(&self.path)?; + let mut bytes = Vec::new(); + file.read_to_end(&mut bytes)?; + Ok(bytes) + } + + /// Truncates the WAL to a known-good valid prefix and refreshes the append handle. + /// + /// # Errors + /// + /// Returns [`std::io::Error`] if the file cannot be reopened, truncated, or synced. + pub fn truncate_to(&mut self, valid_prefix: u64) -> Result<(), std::io::Error> { + let mut file = OpenOptions::new().write(true).open(&self.path)?; + file.set_len(valid_prefix)?; + file.seek(SeekFrom::Start(valid_prefix))?; + file.sync_data()?; + drop(file); + self.file = open_append_file(&self.path)?; + Ok(()) + } + + /// Replaces the on-disk WAL bytes atomically and refreshes the append handle. + /// + /// # Errors + /// + /// Returns [`std::io::Error`] if the temp-file rewrite, rename, or reopen fails. + pub fn replace_with_bytes(&mut self, bytes: &[u8]) -> Result<(), std::io::Error> { + if let Some(parent) = self.path.parent() { + fs::create_dir_all(parent)?; + } + + let temp_path = temp_path(&self.path); + { + let mut temp_file = File::create(&temp_path)?; + temp_file.write_all(bytes)?; + temp_file.sync_data()?; + } + + fs::rename(&temp_path, &self.path)?; + sync_parent_dir(&self.path)?; + self.file = open_append_file(&self.path)?; + Ok(()) + } +} + +fn open_append_file(path: &Path) -> Result { + OpenOptions::new() + .create(true) + .read(true) + .append(true) + .open(path) +} + +fn temp_path(path: &Path) -> PathBuf { + let mut temp_path = path.to_path_buf(); + let extension = temp_path + .extension() + .and_then(|value| value.to_str()) + .map_or_else(|| "tmp".to_owned(), |value| format!("{value}.tmp")); + temp_path.set_extension(extension); + temp_path +} + +#[cfg(unix)] +fn sync_parent_dir(path: &Path) -> Result<(), std::io::Error> { + if let Some(parent) = path.parent() { + OpenOptions::new().read(true).open(parent)?.sync_all()?; + } + Ok(()) +} + +#[cfg(not(unix))] +fn sync_parent_dir(_path: &Path) -> Result<(), std::io::Error> { + Ok(()) +} + +#[cfg(test)] +mod tests { + use std::fs; + use std::path::PathBuf; + use std::time::{SystemTime, UNIX_EPOCH}; + + use super::AppendWalFile; + + fn test_path(name: &str) -> PathBuf { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("system time should be after epoch") + .as_nanos(); + std::env::temp_dir().join(format!("allocdb-wal-file-{name}-{nanos}.wal")) + } + + #[test] + fn append_and_read_round_trip() { + let path = test_path("append"); + let mut wal = AppendWalFile::open(&path).unwrap(); + wal.append_bytes(b"abc").unwrap(); + wal.append_bytes(b"def").unwrap(); + wal.sync().unwrap(); + + assert_eq!(wal.read_all().unwrap(), b"abcdef"); + + fs::remove_file(path).unwrap(); + } + + #[test] + fn truncate_reopens_for_future_appends() { + let path = test_path("truncate"); + let mut wal = AppendWalFile::open(&path).unwrap(); + wal.append_bytes(b"abcdef").unwrap(); + wal.sync().unwrap(); + wal.truncate_to(3).unwrap(); + wal.append_bytes(b"XYZ").unwrap(); + wal.sync().unwrap(); + + assert_eq!(wal.read_all().unwrap(), b"abcXYZ"); + + fs::remove_file(path).unwrap(); + } + + #[test] + fn replace_rewrites_contents_and_reopens_append_handle() { + let path = test_path("replace"); + let mut wal = AppendWalFile::open(&path).unwrap(); + wal.append_bytes(b"old").unwrap(); + wal.sync().unwrap(); + wal.replace_with_bytes(b"new").unwrap(); + wal.append_bytes(b"!").unwrap(); + wal.sync().unwrap(); + + assert_eq!(wal.read_all().unwrap(), b"new!"); + + fs::remove_file(path).unwrap(); + } +} diff --git a/crates/quota-core/Cargo.toml b/crates/quota-core/Cargo.toml index ec4e746..3d5a9ba 100644 --- a/crates/quota-core/Cargo.toml +++ b/crates/quota-core/Cargo.toml @@ -6,6 +6,7 @@ rust-version.workspace = true [dependencies] allocdb-retire-queue = { path = "../allocdb-retire-queue" } +allocdb-wal-file = { path = "../allocdb-wal-file" } allocdb-wal-frame = { path = "../allocdb-wal-frame" } crc32c = "0.6" log = "0.4.28" diff --git a/crates/quota-core/src/recovery.rs b/crates/quota-core/src/recovery.rs index 7610e71..a7a50b4 100644 --- a/crates/quota-core/src/recovery.rs +++ b/crates/quota-core/src/recovery.rs @@ -135,7 +135,7 @@ impl From for RecoveryObserverError { pub fn recover_quota( config: Config, snapshot_file: &SnapshotFile, - wal_file: &WalFile, + wal_file: &mut WalFile, ) -> Result { recover_quota_with_observer( config, @@ -152,7 +152,7 @@ pub fn recover_quota( pub fn recover_quota_with_observer( config: Config, snapshot_file: &SnapshotFile, - wal_file: &WalFile, + wal_file: &mut WalFile, mut observer: F, ) -> Result> where @@ -374,7 +374,7 @@ mod tests { .unwrap(); wal.sync().unwrap(); - let recovered = recover_quota(config(), &snapshot_file, &wal).unwrap(); + let recovered = recover_quota(config(), &snapshot_file, &mut wal).unwrap(); assert_eq!(recovered.replayed_wal_frame_count, 2); assert_eq!(recovered.replayed_wal_last_lsn, Some(Lsn(3))); assert_eq!(recovered.db.snapshot(), live.snapshot()); @@ -412,7 +412,7 @@ mod tests { .unwrap(); wal.sync().unwrap(); - let recovered = recover_quota(config, &snapshot_file, &wal).unwrap(); + let recovered = recover_quota(config, &snapshot_file, &mut wal).unwrap(); assert_eq!(recovered.replayed_wal_frame_count, 1); assert_eq!(recovered.db.snapshot().buckets.len(), 1); @@ -462,7 +462,7 @@ mod tests { .unwrap(); wal.sync().unwrap(); - let error = recover_quota(config, &snapshot_file, &wal).unwrap_err(); + let error = recover_quota(config, &snapshot_file, &mut wal).unwrap_err(); assert!(matches!(error, RecoveryError::Replay(_))); fs::remove_file(snapshot_path).unwrap(); @@ -517,7 +517,7 @@ mod tests { .write_all(&torn[..torn.len() - 3]) .unwrap(); - let recovered = recover_quota(config(), &snapshot_file, &wal).unwrap(); + let recovered = recover_quota(config(), &snapshot_file, &mut wal).unwrap(); assert!(!recovered.loaded_snapshot); assert_eq!(recovered.recovered_wal.scan_result.frames.len(), 1); diff --git a/crates/quota-core/src/wal_file.rs b/crates/quota-core/src/wal_file.rs index 3fb75d6..ca54fa4 100644 --- a/crates/quota-core/src/wal_file.rs +++ b/crates/quota-core/src/wal_file.rs @@ -1,13 +1,11 @@ -use std::fs::{self, File, OpenOptions}; -use std::io::{Read, Seek, SeekFrom, Write}; -use std::path::{Path, PathBuf}; +use std::path::Path; use crate::wal::{DecodeError, Frame, ScanResult, ScanStopReason, scan_frames}; +use allocdb_wal_file::AppendWalFile; #[derive(Debug)] pub struct WalFile { - path: PathBuf, - file: File, + file: AppendWalFile, max_payload_bytes: usize, } @@ -38,11 +36,9 @@ impl From for WalFileError { impl WalFile { pub fn open(path: impl AsRef, max_payload_bytes: usize) -> Result { - let path = path.as_ref().to_path_buf(); - let file = open_append_file(&path)?; + let file = AppendWalFile::open(path)?; Ok(Self { - path, file, max_payload_bytes, }) @@ -50,28 +46,26 @@ impl WalFile { #[must_use] pub fn path(&self) -> &Path { - &self.path + self.file.path() } pub fn append_frame(&mut self, frame: &Frame) -> Result<(), WalFileError> { self.validate_payload_len(frame)?; - - let encoded = frame.encode(); - self.file.write_all(&encoded)?; + self.file.append_bytes(&frame.encode())?; Ok(()) } pub fn sync(&self) -> Result<(), WalFileError> { - self.file.sync_data()?; + self.file.sync()?; Ok(()) } pub fn recover(&self) -> Result { - recover_path(&self.path) + recover_file(&self.file) } - pub fn truncate_to_valid_prefix(&self) -> Result { - let recovered = recover_path(&self.path)?; + pub fn truncate_to_valid_prefix(&mut self) -> Result { + let recovered = recover_file(&self.file)?; let valid_prefix = u64::try_from(recovered.scan_result.valid_up_to).expect("valid WAL prefix must fit"); @@ -79,10 +73,7 @@ impl WalFile { ScanStopReason::CleanEof => {} ScanStopReason::TornTail { .. } => { if recovered.file_len > valid_prefix { - let mut file = OpenOptions::new().write(true).open(&self.path)?; - file.set_len(valid_prefix)?; - file.seek(SeekFrom::Start(valid_prefix))?; - file.sync_data()?; + self.file.truncate_to(valid_prefix)?; } } ScanStopReason::Corruption { offset, error } => { @@ -98,22 +89,11 @@ impl WalFile { self.validate_payload_len(frame)?; } - if let Some(parent) = self.path.parent() { - fs::create_dir_all(parent)?; - } - - let temp_path = self.temp_path(); - { - let mut temp_file = File::create(&temp_path)?; - for frame in frames { - temp_file.write_all(&frame.encode())?; - } - temp_file.sync_data()?; + let mut bytes = Vec::new(); + for frame in frames { + bytes.extend_from_slice(&frame.encode()); } - - fs::rename(&temp_path, &self.path)?; - sync_parent_dir(&self.path)?; - self.file = open_append_file(&self.path)?; + self.file.replace_with_bytes(&bytes)?; Ok(()) } @@ -127,22 +107,10 @@ impl WalFile { Ok(()) } - - fn temp_path(&self) -> PathBuf { - let mut temp_path = self.path.clone(); - let extension = temp_path - .extension() - .and_then(|value| value.to_str()) - .map_or_else(|| "tmp".to_owned(), |value| format!("{value}.tmp")); - temp_path.set_extension(extension); - temp_path - } } -fn recover_path(path: &Path) -> Result { - let mut file = File::open(path)?; - let mut bytes = Vec::new(); - file.read_to_end(&mut bytes)?; +fn recover_file(file: &AppendWalFile) -> Result { + let bytes = file.read_all()?; let scan_result = scan_frames(&bytes); Ok(RecoveredWal { scan_result, @@ -150,27 +118,6 @@ fn recover_path(path: &Path) -> Result { }) } -fn open_append_file(path: &Path) -> Result { - OpenOptions::new() - .create(true) - .read(true) - .append(true) - .open(path) -} - -#[cfg(unix)] -fn sync_parent_dir(path: &Path) -> Result<(), std::io::Error> { - if let Some(parent) = path.parent() { - OpenOptions::new().read(true).open(parent)?.sync_all()?; - } - Ok(()) -} - -#[cfg(not(unix))] -fn sync_parent_dir(_path: &Path) -> Result<(), std::io::Error> { - Ok(()) -} - #[cfg(test)] mod tests { use std::fs; diff --git a/crates/reservation-core/Cargo.toml b/crates/reservation-core/Cargo.toml index 65ae047..83d75a1 100644 --- a/crates/reservation-core/Cargo.toml +++ b/crates/reservation-core/Cargo.toml @@ -6,6 +6,7 @@ rust-version.workspace = true [dependencies] allocdb-retire-queue = { path = "../allocdb-retire-queue" } +allocdb-wal-file = { path = "../allocdb-wal-file" } allocdb-wal-frame = { path = "../allocdb-wal-frame" } crc32c = "0.6" log = "0.4.28" diff --git a/crates/reservation-core/src/wal_file.rs b/crates/reservation-core/src/wal_file.rs index 4be5d4d..ca54fa4 100644 --- a/crates/reservation-core/src/wal_file.rs +++ b/crates/reservation-core/src/wal_file.rs @@ -1,13 +1,11 @@ -use std::fs::{self, File, OpenOptions}; -use std::io::{Read, Seek, SeekFrom, Write}; -use std::path::{Path, PathBuf}; +use std::path::Path; use crate::wal::{DecodeError, Frame, ScanResult, ScanStopReason, scan_frames}; +use allocdb_wal_file::AppendWalFile; #[derive(Debug)] pub struct WalFile { - path: PathBuf, - file: File, + file: AppendWalFile, max_payload_bytes: usize, } @@ -38,11 +36,9 @@ impl From for WalFileError { impl WalFile { pub fn open(path: impl AsRef, max_payload_bytes: usize) -> Result { - let path = path.as_ref().to_path_buf(); - let file = open_append_file(&path)?; + let file = AppendWalFile::open(path)?; Ok(Self { - path, file, max_payload_bytes, }) @@ -50,28 +46,26 @@ impl WalFile { #[must_use] pub fn path(&self) -> &Path { - &self.path + self.file.path() } pub fn append_frame(&mut self, frame: &Frame) -> Result<(), WalFileError> { self.validate_payload_len(frame)?; - - let encoded = frame.encode(); - self.file.write_all(&encoded)?; + self.file.append_bytes(&frame.encode())?; Ok(()) } pub fn sync(&self) -> Result<(), WalFileError> { - self.file.sync_data()?; + self.file.sync()?; Ok(()) } pub fn recover(&self) -> Result { - recover_path(&self.path) + recover_file(&self.file) } pub fn truncate_to_valid_prefix(&mut self) -> Result { - let recovered = recover_path(&self.path)?; + let recovered = recover_file(&self.file)?; let valid_prefix = u64::try_from(recovered.scan_result.valid_up_to).expect("valid WAL prefix must fit"); @@ -79,12 +73,7 @@ impl WalFile { ScanStopReason::CleanEof => {} ScanStopReason::TornTail { .. } => { if recovered.file_len > valid_prefix { - let mut file = OpenOptions::new().write(true).open(&self.path)?; - file.set_len(valid_prefix)?; - file.seek(SeekFrom::Start(valid_prefix))?; - file.sync_data()?; - drop(file); - self.file = open_append_file(&self.path)?; + self.file.truncate_to(valid_prefix)?; } } ScanStopReason::Corruption { offset, error } => { @@ -100,22 +89,11 @@ impl WalFile { self.validate_payload_len(frame)?; } - if let Some(parent) = self.path.parent() { - fs::create_dir_all(parent)?; - } - - let temp_path = self.temp_path(); - { - let mut temp_file = File::create(&temp_path)?; - for frame in frames { - temp_file.write_all(&frame.encode())?; - } - temp_file.sync_data()?; + let mut bytes = Vec::new(); + for frame in frames { + bytes.extend_from_slice(&frame.encode()); } - - fs::rename(&temp_path, &self.path)?; - sync_parent_dir(&self.path)?; - self.file = open_append_file(&self.path)?; + self.file.replace_with_bytes(&bytes)?; Ok(()) } @@ -129,22 +107,10 @@ impl WalFile { Ok(()) } - - fn temp_path(&self) -> PathBuf { - let mut temp_path = self.path.clone(); - let extension = temp_path - .extension() - .and_then(|value| value.to_str()) - .map_or_else(|| "tmp".to_owned(), |value| format!("{value}.tmp")); - temp_path.set_extension(extension); - temp_path - } } -fn recover_path(path: &Path) -> Result { - let mut file = File::open(path)?; - let mut bytes = Vec::new(); - file.read_to_end(&mut bytes)?; +fn recover_file(file: &AppendWalFile) -> Result { + let bytes = file.read_all()?; let scan_result = scan_frames(&bytes); Ok(RecoveredWal { scan_result, @@ -152,27 +118,6 @@ fn recover_path(path: &Path) -> Result { }) } -fn open_append_file(path: &Path) -> Result { - OpenOptions::new() - .create(true) - .read(true) - .append(true) - .open(path) -} - -#[cfg(unix)] -fn sync_parent_dir(path: &Path) -> Result<(), std::io::Error> { - if let Some(parent) = path.parent() { - OpenOptions::new().read(true).open(parent)?.sync_all()?; - } - Ok(()) -} - -#[cfg(not(unix))] -fn sync_parent_dir(_path: &Path) -> Result<(), std::io::Error> { - Ok(()) -} - #[cfg(test)] mod tests { use std::fs; diff --git a/scripts/check_repo.sh b/scripts/check_repo.sh index 6d3befb..c846524 100755 --- a/scripts/check_repo.sh +++ b/scripts/check_repo.sh @@ -35,7 +35,7 @@ check_forbidden_pattern() { check_allowed_dependencies() { local dependency - local allowed='allocdb-retire-queue allocdb-wal-frame crc32c log' + local allowed='allocdb-retire-queue allocdb-wal-file allocdb-wal-frame crc32c log' while IFS= read -r dependency; do if [[ ! " $allowed " =~ [[:space:]]"$dependency"[[:space:]] ]]; then