-
Notifications
You must be signed in to change notification settings - Fork 86
chore: add compression #2345
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: graphite-base/2345
Are you sure you want to change the base?
chore: add compression #2345
Conversation
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
How to use the Graphite Merge QueueAdd the label merge-queue to this PR to add it to the merge queue. You must have a Graphite account in order to use the merge queue. Sign up using this link. An organization admin has enabled the Graphite Merge Queue in this repository. Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue. This stack of pull requests is managed by Graphite. Learn more about stacking. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR Summary
This PR adds compression support to the SQLite VFS FDB implementation, enabling data to be stored more efficiently in FoundationDB with multiple compression algorithms (None, LZ4, Snappy, Zstd).
- Added
CompressionType
enum andCompression
trait with implementations for all supported algorithms incompression.rs
- Modified
FdbFileMetadata
to include compression type with backward compatibility for older formats - Updated file operations in
file.rs
to compress/decompress page data during reads and writes - Added comprehensive compression metrics tracking including operation counts, bytes processed, ratios, and latencies
- Implemented VFS registration with compression-specific suffixes allowing multiple VFS variants to coexist
Greptile AI
7 file(s) reviewed, 8 comment(s)
Edit PR Review Bot Settings | Greptile
// Convert from FdbBindingError to FdbError | ||
Err(FdbError::from_code(1)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Converting any FdbBindingError to a generic error code 1 loses specific error information. Consider preserving the original error code from the FdbBindingError if available, or at least logging the specific error code before converting.
pub fn create_safe_bytes(size: usize) -> bytes::BytesMut { | ||
// Check if the size is reasonable and chunk if needed | ||
let safe_size = std::cmp::min(size, MAX_SAFE_PAGE_SIZE); | ||
let buffer = vec![0u8; safe_size]; | ||
bytes::BytesMut::from(&buffer[..]) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: This implementation creates a new buffer with zeros and then copies it to a BytesMut. Consider using BytesMut::with_capacity followed by resize_with for better performance.
pub fn create_safe_bytes(size: usize) -> bytes::BytesMut { | |
// Check if the size is reasonable and chunk if needed | |
let safe_size = std::cmp::min(size, MAX_SAFE_PAGE_SIZE); | |
let buffer = vec![0u8; safe_size]; | |
bytes::BytesMut::from(&buffer[..]) | |
} | |
pub fn create_safe_bytes(size: usize) -> bytes::BytesMut { | |
// Check if the size is reasonable and chunk if needed | |
let safe_size = std::cmp::min(size, MAX_SAFE_PAGE_SIZE); | |
let mut bytes = bytes::BytesMut::with_capacity(safe_size); | |
bytes.resize(safe_size, 0); | |
bytes | |
} |
impl From<u8> for CompressionType { | ||
fn from(value: u8) -> Self { | ||
match value { | ||
0 => CompressionType::None, | ||
1 => CompressionType::Lz4, | ||
2 => CompressionType::Snappy, | ||
3 => CompressionType::Zstd, | ||
_ => CompressionType::None, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: This implementation silently defaults to CompressionType::None
for invalid values, but there's an UnknownType
error defined that's never used. Consider either throwing the error or logging a warning when an unknown compression type is encountered.
|
||
impl fmt::Display for CompressionType { | ||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
match self { | ||
CompressionType::None => write!(f, "none"), | ||
CompressionType::Lz4 => write!(f, "lz4"), | ||
CompressionType::Snappy => write!(f, "snappy"), | ||
CompressionType::Zstd => write!(f, "zstd"), | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: This manual Display
implementation is redundant since you're already deriving Display
from strum at line 25. You can remove this implementation to avoid potential inconsistencies.
impl fmt::Display for CompressionType { | |
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | |
match self { | |
CompressionType::None => write!(f, "none"), | |
CompressionType::Lz4 => write!(f, "lz4"), | |
CompressionType::Snappy => write!(f, "snappy"), | |
CompressionType::Zstd => write!(f, "zstd"), | |
} | |
} | |
} |
// Store the original size at the beginning for decompression | ||
let mut result = BytesMut::with_capacity(4 + compressed_size); | ||
result.extend_from_slice(&(data.len() as u32).to_le_bytes()); | ||
result.extend_from_slice(&compressed[0..compressed_size]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: LZ4 implementation stores the original size in the compressed data, but Snappy doesn't. This creates an inconsistency in how the compressed data is formatted between different compression types.
fn decompress(&self, compressed_data: &[u8], expected_size: usize) -> Result<Bytes, CompressionError> { | ||
let timer = metrics::start_compression_operation(); | ||
|
||
// Decompress the data | ||
let decompressed = snap::raw::Decoder::new() | ||
.decompress_vec(compressed_data) | ||
.map_err(|e| CompressionError::SnappyError(e.to_string()))?; | ||
|
||
// Record metrics | ||
metrics::complete_compression_operation( | ||
&timer, | ||
CompressionType::Snappy, | ||
decompressed.len(), | ||
compressed_data.len(), | ||
false | ||
); | ||
|
||
Ok(Bytes::from(decompressed)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Unlike LZ4, the Snappy implementation doesn't use the expected_size parameter and doesn't store the original size in the compressed data. This could cause issues if the caller expects consistent behavior across compression types.
// Record ratio (only if compression and sizes are valid) | ||
if is_compression && original_size > 0 && compressed_size > 0 { | ||
let ratio = original_size as f64 / compressed_size as f64; | ||
COMPRESSION_RATIO | ||
.with_label_values(&[&comp_type, operation]) | ||
.observe(ratio); | ||
} else if !is_compression && original_size > 0 && compressed_size > 0 { | ||
let ratio = compressed_size as f64 / original_size as f64; | ||
COMPRESSION_RATIO | ||
.with_label_values(&[&comp_type, operation]) | ||
.observe(ratio); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: The ratio calculation is inverted between compression and decompression. For compression, you calculate original/compressed (which is correct), but for decompression you calculate compressed/original. This is inconsistent and could be confusing when analyzing metrics. Consider using the same ratio definition (original/compressed) for both operations and adding a note in the metric description that values > 1.0 indicate effective compression.
// Record ratio (only if compression and sizes are valid) | |
if is_compression && original_size > 0 && compressed_size > 0 { | |
let ratio = original_size as f64 / compressed_size as f64; | |
COMPRESSION_RATIO | |
.with_label_values(&[&comp_type, operation]) | |
.observe(ratio); | |
} else if !is_compression && original_size > 0 && compressed_size > 0 { | |
let ratio = compressed_size as f64 / original_size as f64; | |
COMPRESSION_RATIO | |
.with_label_values(&[&comp_type, operation]) | |
.observe(ratio); | |
} | |
// Record ratio (only if compression and sizes are valid) | |
if is_compression && original_size > 0 && compressed_size > 0 { | |
let ratio = original_size as f64 / compressed_size as f64; | |
COMPRESSION_RATIO | |
.with_label_values(&[&comp_type, operation]) | |
.observe(ratio); | |
} else if !is_compression && original_size > 0 && compressed_size > 0 { | |
let ratio = original_size as f64 / compressed_size as f64; | |
COMPRESSION_RATIO | |
.with_label_values(&[&comp_type, operation]) | |
.observe(ratio); | |
} |
/// Start measuring a compression operation | ||
pub fn start_compression_operation() -> MetricsTimer { | ||
MetricsTimer::start() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Unlike other metric start functions (e.g., start_file_open, start_fdb_transaction), this function doesn't increment any counter before starting the timer. This is fine if the counter is incremented in complete_compression_operation, but it's inconsistent with the pattern used elsewhere in the file.
Deploying rivet with
|
Latest commit: |
45181cb
|
Status: | ✅ Deploy successful! |
Preview URL: | https://a687fccd.rivet.pages.dev |
Branch Preview URL: | https://04-10-chore-add-compression.rivet.pages.dev |
e3c5f03
to
45181cb
Compare
Changes