Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 206 additions & 0 deletions parquet/benches/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,58 @@ where
InMemoryPageIterator::new(pages)
}

fn build_delta_encoded_incr_primitive_page_iterator<T>(
column_desc: ColumnDescPtr,
null_density: f32,
increment: usize,
stepped: bool,
) -> impl PageIterator + Clone
where
T: parquet::data_type::DataType,
T::T: SampleUniform + FromPrimitive,
{
let max_def_level = column_desc.max_def_level();
let max_rep_level = column_desc.max_rep_level();
let rep_levels = vec![0; VALUES_PER_PAGE];
let mut rng = seedable_rng();
let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
let mut running_val: usize = 1;
for _i in 0..NUM_ROW_GROUPS {
let mut column_chunk_pages = Vec::new();
for _j in 0..PAGES_PER_GROUP {
// generate page
let mut values = Vec::with_capacity(VALUES_PER_PAGE);
let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
for k in 0..VALUES_PER_PAGE {
let def_level = if rng.random::<f32>() < null_density {
max_def_level - 1
} else {
max_def_level
};
if def_level == max_def_level {
let value = FromPrimitive::from_usize(running_val).unwrap();
running_val = if !stepped || k % 2 == 1 {
running_val + increment
} else {
running_val
};
values.push(value);
}
def_levels.push(def_level);
}
let mut page_builder =
DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
page_builder.add_rep_levels(max_rep_level, &rep_levels);
page_builder.add_def_levels(max_def_level, &def_levels);
page_builder.add_values::<T>(Encoding::DELTA_BINARY_PACKED, &values);
column_chunk_pages.push(page_builder.consume());
}
pages.push(column_chunk_pages);
}

InMemoryPageIterator::new(pages)
}

fn build_dictionary_encoded_primitive_page_iterator<T>(
column_desc: ColumnDescPtr,
null_density: f32,
Expand Down Expand Up @@ -439,6 +491,52 @@ fn build_plain_encoded_byte_array_page_iterator_inner(
InMemoryPageIterator::new(pages)
}

fn build_constant_prefix_byte_array_page_iterator(
column_desc: ColumnDescPtr,
null_density: f32,
encoding: Encoding,
const_string: bool,
) -> impl PageIterator + Clone {
let max_def_level = column_desc.max_def_level();
let max_rep_level = column_desc.max_rep_level();
let rep_levels = vec![0; VALUES_PER_PAGE];
let mut rng = seedable_rng();
let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
for i in 0..NUM_ROW_GROUPS {
let mut column_chunk_pages = Vec::new();
for j in 0..PAGES_PER_GROUP {
// generate page
let mut values = Vec::with_capacity(VALUES_PER_PAGE);
let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
for k in 0..VALUES_PER_PAGE {
let def_level = if rng.random::<f32>() < null_density {
max_def_level - 1
} else {
max_def_level
};
if def_level == max_def_level {
let string_value = if const_string {
"01234567890123456789012345678901".to_string()
} else {
format!("01234567890123456789012345678901:{:x}{j}{i}", (k % 16))
};
values.push(parquet::data_type::ByteArray::from(string_value.as_str()));
}
def_levels.push(def_level);
}
let mut page_builder =
DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
page_builder.add_rep_levels(max_rep_level, &rep_levels);
page_builder.add_def_levels(max_def_level, &def_levels);
page_builder.add_values::<ByteArrayType>(encoding, &values);
column_chunk_pages.push(page_builder.consume());
}
pages.push(column_chunk_pages);
}

InMemoryPageIterator::new(pages)
}

fn build_plain_encoded_byte_array_page_iterator(
column_desc: ColumnDescPtr,
null_density: f32,
Expand Down Expand Up @@ -1061,6 +1159,54 @@ fn bench_primitive<T>(
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

// binary packed same value
let data = build_delta_encoded_incr_primitive_page_iterator::<T>(
mandatory_column_desc.clone(),
0.0,
0,
false,
);
group.bench_function("binary packed single value", |b| {
b.iter(|| {
let array_reader =
create_primitive_array_reader(data.clone(), mandatory_column_desc.clone());
count = bench_array_reader_skip(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

// binary packed monotonically increasing
let data = build_delta_encoded_incr_primitive_page_iterator::<T>(
mandatory_column_desc.clone(),
0.0,
1,
false,
);
group.bench_function("binary packed increasing value", |b| {
b.iter(|| {
let array_reader =
create_primitive_array_reader(data.clone(), mandatory_column_desc.clone());
count = bench_array_reader_skip(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

// binary packed increasing stepped
let data = build_delta_encoded_incr_primitive_page_iterator::<T>(
mandatory_column_desc.clone(),
0.0,
1,
true,
);
group.bench_function("binary packed stepped increasing value", |b| {
b.iter(|| {
let array_reader =
create_primitive_array_reader(data.clone(), mandatory_column_desc.clone());
count = bench_array_reader_skip(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

let data = build_encoded_primitive_page_iterator::<T>(
optional_column_desc.clone(),
0.0,
Expand Down Expand Up @@ -1594,6 +1740,66 @@ fn add_benches(c: &mut Criterion) {
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

// delta byte array with constant prefix and suffix lengths
let delta_string_const_prefix_no_null_data = build_constant_prefix_byte_array_page_iterator(
mandatory_string_column_desc.clone(),
0.0,
Encoding::DELTA_BYTE_ARRAY,
false,
);
group.bench_function(
"const prefix delta byte array encoded, mandatory, no NULLs",
|b| {
b.iter(|| {
let array_reader = create_byte_array_reader(
delta_string_const_prefix_no_null_data.clone(),
mandatory_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);

// delta byte array with constant prefix and no suffix
let delta_string_const_no_null_data = build_constant_prefix_byte_array_page_iterator(
mandatory_string_column_desc.clone(),
0.0,
Encoding::DELTA_BYTE_ARRAY,
true,
);
group.bench_function("const delta byte array encoded, mandatory, no NULLs", |b| {
b.iter(|| {
let array_reader = create_byte_array_reader(
delta_string_const_no_null_data.clone(),
mandatory_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

// delta length byte array with constant lengths
let delta_string_const_no_null_data = build_constant_prefix_byte_array_page_iterator(
mandatory_string_column_desc.clone(),
0.0,
Encoding::DELTA_LENGTH_BYTE_ARRAY,
true,
);
group.bench_function(
"const delta length byte array encoded, mandatory, no NULLs",
|b| {
b.iter(|| {
let array_reader = create_byte_array_reader(
delta_string_const_no_null_data.clone(),
mandatory_string_column_desc.clone(),
);
count = bench_array_reader(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
},
);

group.finish();

// binary benchmarks
Expand Down
Loading