Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[workspace]
members = [
"crates/allocdb-retire-queue",
"crates/allocdb-wal-file",
"crates/allocdb-wal-frame",
"crates/allocdb-bench",
"crates/allocdb-core",
Expand Down
1 change: 1 addition & 0 deletions crates/allocdb-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ rust-version.workspace = true

[dependencies]
allocdb-retire-queue = { path = "../allocdb-retire-queue" }
allocdb-wal-file = { path = "../allocdb-wal-file" }
allocdb-wal-frame = { path = "../allocdb-wal-frame" }
log = "0.4.28"

Expand Down
6 changes: 3 additions & 3 deletions crates/allocdb-core/src/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl<E> From<ReplayError> for RecoveryObserverError<E> {
pub fn recover_allocdb(
config: Config,
snapshot_file: &SnapshotFile,
wal_file: &WalFile,
wal_file: &mut WalFile,
) -> Result<RecoveryResult, RecoveryError> {
recover_allocdb_with_observer(
config,
Expand All @@ -173,7 +173,7 @@ pub fn recover_allocdb(
pub fn recover_allocdb_with_observer<E, F>(
config: Config,
snapshot_file: &SnapshotFile,
wal_file: &WalFile,
wal_file: &mut WalFile,
mut observer: F,
) -> Result<RecoveryResult, RecoveryObserverError<E>>
where
Expand Down Expand Up @@ -203,7 +203,7 @@ where
fn recover_allocdb_impl<E, F>(
config: Config,
snapshot_file: &SnapshotFile,
wal_file: &WalFile,
wal_file: &mut WalFile,
observer: &mut F,
) -> Result<RecoveryResult, RecoveryObserverError<E>>
where
Expand Down
10 changes: 6 additions & 4 deletions crates/allocdb-core/src/recovery_issue_30_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ fn recover_allocdb_rejects_non_monotonic_lsn() {
.unwrap();
wal.sync().unwrap();

let error = recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &wal).unwrap_err();
let error =
recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &mut wal).unwrap_err();
assert!(matches!(
error,
RecoveryError::Replay(ReplayError::NonMonotonicLsn {
Expand Down Expand Up @@ -126,7 +127,8 @@ fn recover_allocdb_rejects_rewound_request_slot() {
.unwrap();
wal.sync().unwrap();

let error = recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &wal).unwrap_err();
let error =
recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &mut wal).unwrap_err();
assert!(matches!(
error,
RecoveryError::Replay(ReplayError::RewoundRequestSlot {
Expand All @@ -143,7 +145,7 @@ fn recover_allocdb_rejects_rewound_request_slot() {
fn recover_allocdb_rejects_semantically_invalid_snapshot() {
let wal_path = test_path("recover-invalid-snapshot", "wal");
let snapshot_path = test_path("recover-invalid-snapshot", "snapshot");
let wal = WalFile::open(&wal_path, 512).unwrap();
let mut wal = WalFile::open(&wal_path, 512).unwrap();
let snapshot_file = SnapshotFile::new(&snapshot_path);

snapshot_file
Expand Down Expand Up @@ -172,7 +174,7 @@ fn recover_allocdb_rejects_semantically_invalid_snapshot() {
})
.unwrap();

let error = recover_allocdb(config(), &snapshot_file, &wal).unwrap_err();
let error = recover_allocdb(config(), &snapshot_file, &mut wal).unwrap_err();
assert!(matches!(
error,
RecoveryError::Snapshot(SnapshotError::DuplicateResourceId(ResourceId(11)))
Expand Down
2 changes: 1 addition & 1 deletion crates/allocdb-core/src/recovery_issue_31_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ fn recover_allocdb_rejects_client_slot_overflow_in_replayed_wal() {
.unwrap();
wal.sync().unwrap();

let error = recover_allocdb(config, &SnapshotFile::new(&snapshot_path), &wal).unwrap_err();
let error = recover_allocdb(config, &SnapshotFile::new(&snapshot_path), &mut wal).unwrap_err();
assert!(matches!(
error,
RecoveryError::Replay(ReplayError::SlotOverflow {
Expand Down
6 changes: 4 additions & 2 deletions crates/allocdb-core/src/recovery_revoke_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ fn recover_allocdb_preserves_revoking_state() {
wal.append_frame(&client_frame(4, 2, &revoke)).unwrap();
wal.sync().unwrap();

let recovered = recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &wal).unwrap();
let recovered =
recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &mut wal).unwrap();
let reservation = recovered.db.reservation(ReservationId(2), Slot(2)).unwrap();
let resource = recovered.db.resource(ResourceId(11)).unwrap();

Expand Down Expand Up @@ -156,7 +157,8 @@ fn recover_allocdb_preserves_revoked_state() {
wal.append_frame(&client_frame(5, 3, &reclaim)).unwrap();
wal.sync().unwrap();

let recovered = recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &wal).unwrap();
let recovered =
recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &mut wal).unwrap();
let reservation = recovered.db.reservation(ReservationId(2), Slot(5)).unwrap();
let resource = recovered.db.resource(ResourceId(11)).unwrap();

Expand Down
17 changes: 10 additions & 7 deletions crates/allocdb-core/src/recovery_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ fn recover_allocdb_replays_wal_without_snapshot() {
wal.sync().unwrap();

let snapshot_file = SnapshotFile::new(&snapshot_path);
let recovered = recover_allocdb(config(), &snapshot_file, &wal).unwrap();
let recovered = recover_allocdb(config(), &snapshot_file, &mut wal).unwrap();

assert!(!recovered.loaded_snapshot);
assert_eq!(recovered.loaded_snapshot_lsn, None);
Expand Down Expand Up @@ -148,7 +148,7 @@ fn recover_allocdb_skips_frames_covered_by_snapshot() {
wal.append_frame(&client_frame(3, 2, &confirm)).unwrap();
wal.sync().unwrap();

let recovered = recover_allocdb(config(), &snapshot_file, &wal).unwrap();
let recovered = recover_allocdb(config(), &snapshot_file, &mut wal).unwrap();

assert!(recovered.loaded_snapshot);
assert_eq!(recovered.loaded_snapshot_lsn, Some(Lsn(1)));
Expand Down Expand Up @@ -193,7 +193,8 @@ fn recover_allocdb_truncates_torn_tail() {
.write_all(&torn[..torn.len() - 2])
.unwrap();

let recovered = recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &wal).unwrap();
let recovered =
recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &mut wal).unwrap();

assert!(!recovered.loaded_snapshot);
assert_eq!(recovered.recovered_wal.scan_result.frames.len(), 1);
Expand All @@ -212,13 +213,13 @@ fn recover_allocdb_truncates_torn_tail() {
fn recover_allocdb_marks_empty_snapshot_as_loaded() {
let wal_path = test_path("recover-empty-snapshot", "wal");
let snapshot_path = test_path("recover-empty-snapshot", "snapshot");
let wal = WalFile::open(&wal_path, 512).unwrap();
let mut wal = WalFile::open(&wal_path, 512).unwrap();
let snapshot_file = SnapshotFile::new(&snapshot_path);
snapshot_file
.write_snapshot(&AllocDb::new(config()).unwrap().snapshot())
.unwrap();

let recovered = recover_allocdb(config(), &snapshot_file, &wal).unwrap();
let recovered = recover_allocdb(config(), &snapshot_file, &mut wal).unwrap();

assert!(recovered.loaded_snapshot);
assert_eq!(recovered.loaded_snapshot_lsn, None);
Expand Down Expand Up @@ -262,7 +263,8 @@ fn recover_allocdb_replays_internal_commands() {
wal.append_frame(&internal_frame(3, 5, &expire)).unwrap();
wal.sync().unwrap();

let recovered = recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &wal).unwrap();
let recovered =
recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &mut wal).unwrap();

assert_eq!(recovered.replayed_wal_frame_count, 3);
assert_eq!(recovered.replayed_wal_last_lsn, Some(Lsn(3)));
Expand Down Expand Up @@ -308,7 +310,8 @@ fn recover_allocdb_fails_closed_on_mid_log_corruption() {
bytes[last_index] ^= 0xff;
fs::write(&wal_path, bytes).unwrap();

let error = recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &wal).unwrap_err();
let error =
recover_allocdb(config(), &SnapshotFile::new(&snapshot_path), &mut wal).unwrap_err();

assert!(matches!(
error,
Expand Down
87 changes: 17 additions & 70 deletions crates/allocdb-core/src/wal_file.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use std::fs::{self, File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::path::Path;

use crate::wal::{DecodeError, Frame, ScanResult, ScanStopReason, scan_frames};
use allocdb_wal_file::AppendWalFile;

#[derive(Debug)]
pub struct WalFile {
path: PathBuf,
file: File,
file: AppendWalFile,
max_payload_bytes: usize,
}

Expand Down Expand Up @@ -43,19 +41,17 @@ impl WalFile {
///
/// Returns [`WalFileError`] if the file cannot be opened or created.
pub fn open(path: impl AsRef<Path>, max_payload_bytes: usize) -> Result<Self, WalFileError> {
let path = path.as_ref().to_path_buf();
let file = open_append_file(&path)?;
let file = AppendWalFile::open(path)?;

Ok(Self {
path,
file,
max_payload_bytes,
})
}

#[must_use]
pub fn path(&self) -> &Path {
&self.path
self.file.path()
}

/// Appends one encoded frame to the WAL file.
Expand All @@ -66,9 +62,7 @@ impl WalFile {
/// or [`WalFileError::Io`] if the append fails.
pub fn append_frame(&mut self, frame: &Frame) -> Result<(), WalFileError> {
self.validate_payload_len(frame)?;

let encoded = frame.encode();
self.file.write_all(&encoded)?;
self.file.append_bytes(&frame.encode())?;
Ok(())
}

Expand All @@ -78,7 +72,7 @@ impl WalFile {
///
/// Returns [`WalFileError::Io`] if the sync fails.
pub fn sync(&self) -> Result<(), WalFileError> {
self.file.sync_data()?;
self.file.sync()?;
Ok(())
}

Expand All @@ -88,7 +82,7 @@ impl WalFile {
///
/// Returns [`WalFileError::Io`] if the file cannot be read.
pub fn recover(&self) -> Result<RecoveredWal, WalFileError> {
recover_path(&self.path)
recover_file(&self.file)
}

/// Truncates the file to the last valid frame boundary discovered by recovery scanning.
Expand All @@ -100,19 +94,16 @@ impl WalFile {
/// # Panics
///
/// Panics only if the discovered valid prefix cannot fit into `u64`.
pub fn truncate_to_valid_prefix(&self) -> Result<RecoveredWal, WalFileError> {
let recovered = recover_path(&self.path)?;
pub fn truncate_to_valid_prefix(&mut self) -> Result<RecoveredWal, WalFileError> {
let recovered = recover_file(&self.file)?;
let valid_prefix =
u64::try_from(recovered.scan_result.valid_up_to).expect("valid WAL prefix must fit");

match recovered.scan_result.stop_reason {
ScanStopReason::CleanEof => {}
ScanStopReason::TornTail { .. } => {
if recovered.file_len > valid_prefix {
let mut file = OpenOptions::new().write(true).open(&self.path)?;
file.set_len(valid_prefix)?;
file.seek(SeekFrom::Start(valid_prefix))?;
file.sync_data()?;
self.file.truncate_to(valid_prefix)?;
}
}
ScanStopReason::Corruption { offset, error } => {
Expand All @@ -134,22 +125,11 @@ impl WalFile {
self.validate_payload_len(frame)?;
}

if let Some(parent) = self.path.parent() {
fs::create_dir_all(parent)?;
}

let temp_path = self.temp_path();
{
let mut temp_file = File::create(&temp_path)?;
for frame in frames {
temp_file.write_all(&frame.encode())?;
}
temp_file.sync_data()?;
let mut bytes = Vec::new();
for frame in frames {
bytes.extend_from_slice(&frame.encode());
}

fs::rename(&temp_path, &self.path)?;
sync_parent_dir(&self.path)?;
self.file = open_append_file(&self.path)?;
self.file.replace_with_bytes(&bytes)?;
Ok(())
}

Expand All @@ -163,50 +143,17 @@ impl WalFile {

Ok(())
}

fn temp_path(&self) -> PathBuf {
let mut temp_path = self.path.clone();
let extension = temp_path
.extension()
.and_then(|value| value.to_str())
.map_or_else(|| "tmp".to_owned(), |value| format!("{value}.tmp"));
temp_path.set_extension(extension);
temp_path
}
}

fn recover_path(path: &Path) -> Result<RecoveredWal, WalFileError> {
let mut file = File::open(path)?;
let mut bytes = Vec::new();
file.read_to_end(&mut bytes)?;
fn recover_file(file: &AppendWalFile) -> Result<RecoveredWal, WalFileError> {
let bytes = file.read_all()?;
let scan_result = scan_frames(&bytes);
Ok(RecoveredWal {
scan_result,
file_len: u64::try_from(bytes.len()).expect("file length must fit into u64"),
})
}

fn open_append_file(path: &Path) -> Result<File, std::io::Error> {
OpenOptions::new()
.create(true)
.read(true)
.append(true)
.open(path)
}

#[cfg(unix)]
fn sync_parent_dir(path: &Path) -> Result<(), std::io::Error> {
if let Some(parent) = path.parent() {
OpenOptions::new().read(true).open(parent)?.sync_all()?;
}
Ok(())
}

#[cfg(not(unix))]
fn sync_parent_dir(_path: &Path) -> Result<(), std::io::Error> {
Ok(())
}

#[cfg(test)]
mod tests {
use std::fs;
Expand Down
Loading
Loading