Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[workspace]
members = [
"crates/allocdb-retire-queue",
"crates/allocdb-wal-frame",
"crates/allocdb-bench",
"crates/allocdb-core",
"crates/allocdb-node",
Expand Down
2 changes: 1 addition & 1 deletion crates/allocdb-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ rust-version.workspace = true

[dependencies]
allocdb-retire-queue = { path = "../allocdb-retire-queue" }
crc32c = "0.6.8"
allocdb-wal-frame = { path = "../allocdb-wal-frame" }
log = "0.4.28"

[lints]
Expand Down
187 changes: 28 additions & 159 deletions crates/allocdb-core/src/wal.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,15 @@
use crate::ids::{Lsn, Slot};
use allocdb_wal_frame::{RawFrame, WalFormat, scan_frames_with};

const MAGIC: u32 = 0x4144_424c;
const VERSION: u16 = 1;
const HEADER_LEN: usize = 31;
pub use allocdb_wal_frame::{DecodeError, RecordType, ScanStopReason};

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum RecordType {
ClientCommand = 1,
InternalCommand = 2,
SnapshotMarker = 3,
}

impl RecordType {
fn encode(self) -> u8 {
self as u8
}
const FORMAT: WalFormat = WalFormat {
magic: 0x4144_424c,
checksum_start: 8,
};

fn decode(value: u8) -> Result<Self, DecodeError> {
match value {
1 => Ok(Self::ClientCommand),
2 => Ok(Self::InternalCommand),
3 => Ok(Self::SnapshotMarker),
_ => Err(DecodeError::InvalidRecordType(value)),
}
}
}
#[cfg(test)]
const HEADER_LEN: usize = allocdb_wal_frame::HEADER_LEN;

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Frame {
Expand All @@ -34,30 +19,13 @@ pub struct Frame {
pub payload: Vec<u8>,
}

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum DecodeError {
BufferTooShort,
InvalidMagic(u32),
InvalidVersion(u16),
InvalidRecordType(u8),
InvalidChecksum,
PayloadTooLarge,
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ScanResult {
pub frames: Vec<Frame>,
pub valid_up_to: usize,
pub stop_reason: ScanStopReason,
}

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum ScanStopReason {
CleanEof,
TornTail { offset: usize },
Corruption { offset: usize, error: DecodeError },
}

impl Frame {
/// Encodes one WAL frame using explicit little-endian fields and a CRC32C checksum.
///
Expand All @@ -66,21 +34,13 @@ impl Frame {
/// Panics if the payload length does not fit into `u32`.
#[must_use]
pub fn encode(&self) -> Vec<u8> {
let payload_len =
u32::try_from(self.payload.len()).expect("payload length must fit in u32 for WAL");
let mut bytes = Vec::with_capacity(HEADER_LEN + self.payload.len());
bytes.extend_from_slice(&MAGIC.to_le_bytes());
bytes.extend_from_slice(&VERSION.to_le_bytes());
bytes.extend_from_slice(&self.lsn.get().to_le_bytes());
bytes.extend_from_slice(&self.request_slot.get().to_le_bytes());
bytes.push(self.record_type.encode());
bytes.extend_from_slice(&payload_len.to_le_bytes());
bytes.extend_from_slice(&0u32.to_le_bytes());
bytes.extend_from_slice(&self.payload);

let checksum = crc32c::crc32c(&bytes[8..]);
bytes[27..31].copy_from_slice(&checksum.to_le_bytes());
bytes
RawFrame::encode_parts(
FORMAT,
self.lsn.get(),
self.request_slot.get(),
self.record_type,
&self.payload,
)
}

/// Decodes one complete WAL frame.
Expand All @@ -94,50 +54,7 @@ impl Frame {
///
/// Panics only if the implementation's fixed header layout assumptions are violated.
pub fn decode(bytes: &[u8]) -> Result<Self, DecodeError> {
if bytes.len() < HEADER_LEN {
return Err(DecodeError::BufferTooShort);
}

let magic = u32::from_le_bytes(bytes[0..4].try_into().expect("slice has exact size"));
if magic != MAGIC {
return Err(DecodeError::InvalidMagic(magic));
}

let version = u16::from_le_bytes(bytes[4..6].try_into().expect("slice has exact size"));
if version != VERSION {
return Err(DecodeError::InvalidVersion(version));
}

let lsn = Lsn(u64::from_le_bytes(
bytes[6..14].try_into().expect("slice has exact size"),
));
let request_slot = Slot(u64::from_le_bytes(
bytes[14..22].try_into().expect("slice has exact size"),
));
let record_type = RecordType::decode(bytes[22])?;
let payload_len =
u32::from_le_bytes(bytes[23..27].try_into().expect("slice has exact size"));
let payload_len = usize::try_from(payload_len).expect("u32 payload must fit usize");
let frame_len = HEADER_LEN + payload_len;
if bytes.len() < frame_len {
return Err(DecodeError::BufferTooShort);
}

let stored_checksum =
u32::from_le_bytes(bytes[27..31].try_into().expect("slice has exact size"));
let mut checksum_bytes = bytes[..frame_len].to_vec();
checksum_bytes[27..31].copy_from_slice(&0u32.to_le_bytes());
let computed_checksum = crc32c::crc32c(&checksum_bytes[8..]);
if stored_checksum != computed_checksum {
return Err(DecodeError::InvalidChecksum);
}

Ok(Self {
lsn,
request_slot,
record_type,
payload: bytes[31..frame_len].to_vec(),
})
Ok(Self::from_raw(RawFrame::decode_with(bytes, FORMAT)?))
}

/// Returns the full frame length implied by the encoded payload length field.
Expand All @@ -150,75 +67,27 @@ impl Frame {
///
/// Panics only if the implementation's fixed header layout assumptions are violated.
pub fn encoded_len(bytes: &[u8]) -> Result<usize, DecodeError> {
if bytes.len() < HEADER_LEN {
return Err(DecodeError::BufferTooShort);
}

let magic = u32::from_le_bytes(bytes[0..4].try_into().expect("slice has exact size"));
if magic != MAGIC {
return Err(DecodeError::InvalidMagic(magic));
}
RawFrame::encoded_len_with(bytes, FORMAT)
}

let version = u16::from_le_bytes(bytes[4..6].try_into().expect("slice has exact size"));
if version != VERSION {
return Err(DecodeError::InvalidVersion(version));
fn from_raw(frame: RawFrame) -> Self {
Self {
lsn: Lsn(frame.lsn),
request_slot: Slot(frame.request_slot),
record_type: frame.record_type,
payload: frame.payload,
}

let record_type = bytes[22];
let _ = RecordType::decode(record_type)?;
let payload_len =
u32::from_le_bytes(bytes[23..27].try_into().expect("slice has exact size"));
let payload_len = usize::try_from(payload_len).map_err(|_| DecodeError::PayloadTooLarge)?;
Ok(HEADER_LEN + payload_len)
}
}

/// Scans a WAL buffer and stops at the first torn or invalid frame boundary.
#[must_use]
pub fn scan_frames(bytes: &[u8]) -> ScanResult {
let mut frames = Vec::new();
let mut offset = 0usize;
let mut stop_reason = ScanStopReason::CleanEof;

while offset < bytes.len() {
let remaining = &bytes[offset..];
let frame_len = match Frame::encoded_len(remaining) {
Ok(frame_len) => frame_len,
Err(DecodeError::BufferTooShort) => {
stop_reason = ScanStopReason::TornTail { offset };
break;
}
Err(error) => {
stop_reason = ScanStopReason::Corruption { offset, error };
break;
}
};

if remaining.len() < frame_len {
stop_reason = ScanStopReason::TornTail { offset };
break;
}

match Frame::decode(&remaining[..frame_len]) {
Ok(frame) => {
frames.push(frame);
offset += frame_len;
}
Err(DecodeError::BufferTooShort) => {
stop_reason = ScanStopReason::TornTail { offset };
break;
}
Err(error) => {
stop_reason = ScanStopReason::Corruption { offset, error };
break;
}
}
}

let scanned = scan_frames_with(bytes, FORMAT);
ScanResult {
frames,
valid_up_to: offset,
stop_reason,
frames: scanned.frames.into_iter().map(Frame::from_raw).collect(),
valid_up_to: scanned.valid_up_to,
stop_reason: scanned.stop_reason,
}
}

Expand Down
11 changes: 11 additions & 0 deletions crates/allocdb-wal-frame/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "allocdb-wal-frame"
version.workspace = true
edition.workspace = true
rust-version.workspace = true

[dependencies]
crc32c = "0.6.8"

[lints]
workspace = true
Loading
Loading