diff --git a/parquet/benches/arrow_reader.rs b/parquet/benches/arrow_reader.rs index 2ea0706e3517..14fa16b3531e 100644 --- a/parquet/benches/arrow_reader.rs +++ b/parquet/benches/arrow_reader.rs @@ -326,6 +326,58 @@ where InMemoryPageIterator::new(pages) } +fn build_delta_encoded_incr_primitive_page_iterator( + column_desc: ColumnDescPtr, + null_density: f32, + increment: usize, + stepped: bool, +) -> impl PageIterator + Clone +where + T: parquet::data_type::DataType, + T::T: SampleUniform + FromPrimitive, +{ + let max_def_level = column_desc.max_def_level(); + let max_rep_level = column_desc.max_rep_level(); + let rep_levels = vec![0; VALUES_PER_PAGE]; + let mut rng = seedable_rng(); + let mut pages: Vec> = Vec::new(); + let mut running_val: usize = 1; + for _i in 0..NUM_ROW_GROUPS { + let mut column_chunk_pages = Vec::new(); + for _j in 0..PAGES_PER_GROUP { + // generate page + let mut values = Vec::with_capacity(VALUES_PER_PAGE); + let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE); + for k in 0..VALUES_PER_PAGE { + let def_level = if rng.random::() < null_density { + max_def_level - 1 + } else { + max_def_level + }; + if def_level == max_def_level { + let value = FromPrimitive::from_usize(running_val).unwrap(); + running_val = if !stepped || k % 2 == 1 { + running_val + increment + } else { + running_val + }; + values.push(value); + } + def_levels.push(def_level); + } + let mut page_builder = + DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true); + page_builder.add_rep_levels(max_rep_level, &rep_levels); + page_builder.add_def_levels(max_def_level, &def_levels); + page_builder.add_values::(Encoding::DELTA_BINARY_PACKED, &values); + column_chunk_pages.push(page_builder.consume()); + } + pages.push(column_chunk_pages); + } + + InMemoryPageIterator::new(pages) +} + fn build_dictionary_encoded_primitive_page_iterator( column_desc: ColumnDescPtr, null_density: f32, @@ -439,6 +491,52 @@ fn build_plain_encoded_byte_array_page_iterator_inner( InMemoryPageIterator::new(pages) } +fn build_constant_prefix_byte_array_page_iterator( + column_desc: ColumnDescPtr, + null_density: f32, + encoding: Encoding, + const_string: bool, +) -> impl PageIterator + Clone { + let max_def_level = column_desc.max_def_level(); + let max_rep_level = column_desc.max_rep_level(); + let rep_levels = vec![0; VALUES_PER_PAGE]; + let mut rng = seedable_rng(); + let mut pages: Vec> = Vec::new(); + for i in 0..NUM_ROW_GROUPS { + let mut column_chunk_pages = Vec::new(); + for j in 0..PAGES_PER_GROUP { + // generate page + let mut values = Vec::with_capacity(VALUES_PER_PAGE); + let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE); + for k in 0..VALUES_PER_PAGE { + let def_level = if rng.random::() < null_density { + max_def_level - 1 + } else { + max_def_level + }; + if def_level == max_def_level { + let string_value = if const_string { + "01234567890123456789012345678901".to_string() + } else { + format!("01234567890123456789012345678901:{:x}{j}{i}", (k % 16)) + }; + values.push(parquet::data_type::ByteArray::from(string_value.as_str())); + } + def_levels.push(def_level); + } + let mut page_builder = + DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true); + page_builder.add_rep_levels(max_rep_level, &rep_levels); + page_builder.add_def_levels(max_def_level, &def_levels); + page_builder.add_values::(encoding, &values); + column_chunk_pages.push(page_builder.consume()); + } + pages.push(column_chunk_pages); + } + + InMemoryPageIterator::new(pages) +} + fn build_plain_encoded_byte_array_page_iterator( column_desc: ColumnDescPtr, null_density: f32, @@ -1094,6 +1192,99 @@ fn bench_primitive( assert_eq!(count, EXPECTED_VALUE_COUNT); }); + // binary packed same value + let data = build_delta_encoded_incr_primitive_page_iterator::( + mandatory_column_desc.clone(), + 0.0, + 0, + false, + ); + group.bench_function("binary packed single value", |b| { + b.iter(|| { + let array_reader = + create_primitive_array_reader(data.clone(), mandatory_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + let data = build_delta_encoded_incr_primitive_page_iterator::( + mandatory_column_desc.clone(), + 0.0, + 0, + false, + ); + group.bench_function("binary packed skip single value", |b| { + b.iter(|| { + let array_reader = + create_primitive_array_reader(data.clone(), mandatory_column_desc.clone()); + count = bench_array_reader_skip(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + // binary packed monotonically increasing + let data = build_delta_encoded_incr_primitive_page_iterator::( + mandatory_column_desc.clone(), + 0.0, + 1, + false, + ); + group.bench_function("binary packed increasing value", |b| { + b.iter(|| { + let array_reader = + create_primitive_array_reader(data.clone(), mandatory_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + let data = build_delta_encoded_incr_primitive_page_iterator::( + mandatory_column_desc.clone(), + 0.0, + 1, + false, + ); + group.bench_function("binary packed skip increasing value", |b| { + b.iter(|| { + let array_reader = + create_primitive_array_reader(data.clone(), mandatory_column_desc.clone()); + count = bench_array_reader_skip(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + // binary packed increasing stepped + let data = build_delta_encoded_incr_primitive_page_iterator::( + mandatory_column_desc.clone(), + 0.0, + 1, + true, + ); + group.bench_function("binary packed stepped increasing value", |b| { + b.iter(|| { + let array_reader = + create_primitive_array_reader(data.clone(), mandatory_column_desc.clone()); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + let data = build_delta_encoded_incr_primitive_page_iterator::( + mandatory_column_desc.clone(), + 0.0, + 1, + true, + ); + group.bench_function("binary packed skip stepped increasing value", |b| { + b.iter(|| { + let array_reader = + create_primitive_array_reader(data.clone(), mandatory_column_desc.clone()); + count = bench_array_reader_skip(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + // dictionary encoded, no NULLs let data = build_dictionary_encoded_primitive_page_iterator::(mandatory_column_desc.clone(), 0.0); @@ -1594,6 +1785,66 @@ fn add_benches(c: &mut Criterion) { assert_eq!(count, EXPECTED_VALUE_COUNT); }); + // delta byte array with constant prefix and suffix lengths + let delta_string_const_prefix_no_null_data = build_constant_prefix_byte_array_page_iterator( + mandatory_string_column_desc.clone(), + 0.0, + Encoding::DELTA_BYTE_ARRAY, + false, + ); + group.bench_function( + "const prefix delta byte array encoded, mandatory, no NULLs", + |b| { + b.iter(|| { + let array_reader = create_byte_array_reader( + delta_string_const_prefix_no_null_data.clone(), + mandatory_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }, + ); + + // delta byte array with constant prefix and no suffix + let delta_string_const_no_null_data = build_constant_prefix_byte_array_page_iterator( + mandatory_string_column_desc.clone(), + 0.0, + Encoding::DELTA_BYTE_ARRAY, + true, + ); + group.bench_function("const delta byte array encoded, mandatory, no NULLs", |b| { + b.iter(|| { + let array_reader = create_byte_array_reader( + delta_string_const_no_null_data.clone(), + mandatory_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }); + + // delta length byte array with constant lengths + let delta_string_const_no_null_data = build_constant_prefix_byte_array_page_iterator( + mandatory_string_column_desc.clone(), + 0.0, + Encoding::DELTA_LENGTH_BYTE_ARRAY, + true, + ); + group.bench_function( + "const delta length byte array encoded, mandatory, no NULLs", + |b| { + b.iter(|| { + let array_reader = create_byte_array_reader( + delta_string_const_no_null_data.clone(), + mandatory_string_column_desc.clone(), + ); + count = bench_array_reader(array_reader); + }); + assert_eq!(count, EXPECTED_VALUE_COUNT); + }, + ); + group.finish(); // binary benchmarks diff --git a/parquet/src/encodings/decoding.rs b/parquet/src/encodings/decoding.rs index 58430820a9b6..a04f495d8d77 100644 --- a/parquet/src/encodings/decoding.rs +++ b/parquet/src/encodings/decoding.rs @@ -770,15 +770,48 @@ where // At this point we have read the deltas to `buffer` we now need to offset // these to get back to the original values that were encoded - for v in &mut buffer[read..read + batch_read] { + // + // Optimization: if the bit_width for the miniblock is 0, then we can employ + // a faster decoding method than setting `value[i] = value[i-1] + value[i] + min_delta`. + // Where min_delta is 0 (all values in the miniblock are the same), we can simply + // set all values to `self.last_value`. In the case of non-zero min_delta (values + // in the mini-block form an arithmetic progression) each value can be computed via + // `value[i] = (i + 1) * min_delta + last_value`. In both cases we remove the + // dependence on the preceding value. + // Kudos to @pitrou for the idea https://github.com/apache/arrow/pull/49296 + let min_delta = self.min_delta.as_i64()?; + if bit_width == 0 { + if min_delta == 0 { + buffer[read..read + batch_read].fill(self.last_value); + } else { + // the c++ version multiplies min_delta by the iter index, but doing + // wrapping_mul through T::T was a bit slower. this is still + // faster than before. + let mut delta = self.min_delta; + for v in &mut buffer[read..read + batch_read] { + *v = self.last_value.wrapping_add(&delta); + delta = delta.wrapping_add(&self.min_delta); + } + + self.last_value = buffer[read + batch_read - 1]; + } + } else { // It is OK for deltas to contain "overflowed" values after encoding, // e.g. i64::MAX - i64::MIN, so we use `wrapping_add` to "overflow" again and // restore original value. - *v = v - .wrapping_add(&self.min_delta) - .wrapping_add(&self.last_value); - - self.last_value = *v; + if min_delta == 0 { + for v in &mut buffer[read..read + batch_read] { + *v = v.wrapping_add(&self.last_value); + self.last_value = *v; + } + } else { + for v in &mut buffer[read..read + batch_read] { + *v = v + .wrapping_add(&self.min_delta) + .wrapping_add(&self.last_value); + self.last_value = *v; + } + } } read += batch_read; @@ -1802,6 +1835,49 @@ mod tests { ); } + #[test] + fn test_delta_bit_packed_int32_single_value_large() { + let block_data = vec![3; 10240]; + test_delta_bit_packed_decode::(vec![block_data]); + } + + #[test] + fn test_delta_bit_packed_int32_increasing_value_large() { + let block_data = (0i32..10240).collect(); + test_delta_bit_packed_decode::(vec![block_data]); + } + + #[test] + fn test_delta_bit_packed_int32_mixed_large() { + // should be enough for 4 mini-blocks plus a little so we get some + // mixed mini-blocks + const BLOCK_SIZE: i32 = 133; + let block1_data = (0..BLOCK_SIZE).map(|i| (i * 7) % 11).collect(); + let block2_data = vec![3; BLOCK_SIZE as usize]; + let block3_data = (0..BLOCK_SIZE).map(|i| (i * 5) % 13).collect(); + let block4_data = (0..BLOCK_SIZE).collect(); + let block5_data = (0..BLOCK_SIZE).map(|i| (i * 3) % 17).collect(); + test_delta_bit_packed_decode::(vec![ + block1_data, + block2_data, + block3_data, + block4_data, + block5_data, + ]); + } + + #[test] + fn test_delta_bit_packed_int64_single_value_large() { + let block_data = vec![5; 10240]; + test_delta_bit_packed_decode::(vec![block_data]); + } + + #[test] + fn test_delta_bit_packed_int64_increasing_value_large() { + let block_data = (0i64..10240).collect(); + test_delta_bit_packed_decode::(vec![block_data]); + } + #[test] fn test_delta_byte_array_same_arrays() { let data = vec![