Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions parquet/benches/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,53 @@ where
InMemoryPageIterator::new(pages)
}

fn build_delta_encoded_incr_primitive_page_iterator<T>(
column_desc: ColumnDescPtr,
null_density: f32,
increment: usize,
) -> impl PageIterator + Clone
where
T: parquet::data_type::DataType,
T::T: SampleUniform + FromPrimitive,
{
let max_def_level = column_desc.max_def_level();
let max_rep_level = column_desc.max_rep_level();
let rep_levels = vec![0; VALUES_PER_PAGE];
let mut rng = seedable_rng();
let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
let mut running_val: usize = 1;
for _i in 0..NUM_ROW_GROUPS {
let mut column_chunk_pages = Vec::new();
for _j in 0..PAGES_PER_GROUP {
// generate page
let mut values = Vec::with_capacity(VALUES_PER_PAGE);
let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
for _k in 0..VALUES_PER_PAGE {
let def_level = if rng.random::<f32>() < null_density {
max_def_level - 1
} else {
max_def_level
};
if def_level == max_def_level {
let value = FromPrimitive::from_usize(running_val).unwrap();
running_val += increment;
values.push(value);
}
def_levels.push(def_level);
}
let mut page_builder =
DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
page_builder.add_rep_levels(max_rep_level, &rep_levels);
page_builder.add_def_levels(max_def_level, &def_levels);
page_builder.add_values::<T>(Encoding::DELTA_BINARY_PACKED, &values);
column_chunk_pages.push(page_builder.consume());
}
pages.push(column_chunk_pages);
}

InMemoryPageIterator::new(pages)
}

fn build_dictionary_encoded_primitive_page_iterator<T>(
column_desc: ColumnDescPtr,
null_density: f32,
Expand Down Expand Up @@ -1061,6 +1108,36 @@ fn bench_primitive<T>(
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

// binary packed same value
let data = build_delta_encoded_incr_primitive_page_iterator::<T>(
mandatory_column_desc.clone(),
0.0,
0,
);
group.bench_function("binary packed single value", |b| {
b.iter(|| {
let array_reader =
create_primitive_array_reader(data.clone(), mandatory_column_desc.clone());
count = bench_array_reader_skip(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

// binary packed monotonically increasing
let data = build_delta_encoded_incr_primitive_page_iterator::<T>(
mandatory_column_desc.clone(),
0.0,
1,
);
group.bench_function("binary packed increasing value", |b| {
b.iter(|| {
let array_reader =
create_primitive_array_reader(data.clone(), mandatory_column_desc.clone());
count = bench_array_reader_skip(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

let data = build_encoded_primitive_page_iterator::<T>(
optional_column_desc.clone(),
0.0,
Expand Down
90 changes: 81 additions & 9 deletions parquet/src/encodings/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -770,15 +770,44 @@ where

// At this point we have read the deltas to `buffer` we now need to offset
// these to get back to the original values that were encoded
for v in &mut buffer[read..read + batch_read] {
// It is OK for deltas to contain "overflowed" values after encoding,
// e.g. i64::MAX - i64::MIN, so we use `wrapping_add` to "overflow" again and
// restore original value.
*v = v
.wrapping_add(&self.min_delta)
.wrapping_add(&self.last_value);

self.last_value = *v;
//
// Optimization: if the bit_width for the miniblock is 0, then we can employ
// a faster decoding method than setting `value[i] = value[i-1] + value[i] + min_delta`.
// Where min_delta is 0 (all values in the miniblock are the same), we can simply
// set all values to `self.last_value`. In the case of non-zero min_delta (values
// in the mini-block form an arithmetic progression) each value can be computed via
// `value[i] = (i + 1) * min_delta + last_value`. In both cases we remove the
// dependence on the preceding value.
// Kudos to @pitrou for the idea https://github.com/apache/arrow/pull/49296
if bit_width == 0 {
let min_delta = self.min_delta.as_i64()?;
if min_delta == 0 {
Copy link
Contributor

@Dandandan Dandandan Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we repeat the min_delta==0 optimization below as well for bit_width > 0 ?

for v in &mut buffer[read..read + batch_read] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can use .fill() (if not faster, at least looks nice)

*v = self.last_value;
}
} else {
// the c++ version multiplies min_delta by the iter index, but doing
// wrapping_mul through T::T was a bit slower. this is still
// faster than before.
let mut delta = self.min_delta;
for v in &mut buffer[read..read + batch_read] {
*v = self.last_value.wrapping_add(&delta);
delta = delta.wrapping_add(&self.min_delta);
}

self.last_value = buffer[read + batch_read - 1];
}
} else {
for v in &mut buffer[read..read + batch_read] {
// It is OK for deltas to contain "overflowed" values after encoding,
// e.g. i64::MAX - i64::MIN, so we use `wrapping_add` to "overflow" again and
// restore original value.
*v = v
.wrapping_add(&self.min_delta)
.wrapping_add(&self.last_value);

self.last_value = *v;
}
}

read += batch_read;
Expand Down Expand Up @@ -1802,6 +1831,49 @@ mod tests {
);
}

#[test]
fn test_delta_bit_packed_int32_single_value_large() {
let block_data = vec![3; 10240];
test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
}

#[test]
fn test_delta_bit_packed_int32_increasing_value_large() {
let block_data = (0i32..10240).collect();
test_delta_bit_packed_decode::<Int32Type>(vec![block_data]);
}

#[test]
fn test_delta_bit_packed_int32_mixed_large() {
// should be enough for 4 mini-blocks plus a little so we get some
// mixed mini-blocks
const BLOCK_SIZE: i32 = 133;
let block1_data = (0..BLOCK_SIZE).map(|i| (i * 7) % 11).collect();
let block2_data = vec![3; BLOCK_SIZE as usize];
let block3_data = (0..BLOCK_SIZE).map(|i| (i * 5) % 13).collect();
let block4_data = (0..BLOCK_SIZE).collect();
let block5_data = (0..BLOCK_SIZE).map(|i| (i * 3) % 17).collect();
test_delta_bit_packed_decode::<Int32Type>(vec![
block1_data,
block2_data,
block3_data,
block4_data,
block5_data,
]);
}

#[test]
fn test_delta_bit_packed_int64_single_value_large() {
let block_data = vec![5; 10240];
test_delta_bit_packed_decode::<Int64Type>(vec![block_data]);
}

#[test]
fn test_delta_bit_packed_int64_increasing_value_large() {
let block_data = (0i64..10240).collect();
test_delta_bit_packed_decode::<Int64Type>(vec![block_data]);
}

#[test]
fn test_delta_byte_array_same_arrays() {
let data = vec![
Expand Down
Loading