Skip to content

Commit c0bc610

Browse files
committed
Convert RunEndEncoded field to Parquet
1 parent c149027 commit c0bc610

File tree

8 files changed

+319
-10
lines changed

8 files changed

+319
-10
lines changed

arrow-array/src/array/run_array.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,12 @@ use crate::{
3232

3333
/// An array of [run-end encoded values](https://arrow.apache.org/docs/format/Columnar.html#run-end-encoded-layout)
3434
///
35-
/// This encoding is variation on [run-length encoding (RLE)](https://en.wikipedia.org/wiki/Run-length_encoding)
35+
/// This encoding is a variation on [run-length encoding (RLE)](https://en.wikipedia.org/wiki/Run-length_encoding)
3636
/// and is good for representing data containing same values repeated consecutively.
3737
///
3838
/// [`RunArray`] contains `run_ends` array and `values` array of same length.
3939
/// The `run_ends` array stores the indexes at which the run ends. The `values` array
40-
/// stores the value of each run. Below example illustrates how a logical array is represented in
40+
/// stores the value of each run. The below example illustrates how a logical array is represented in
4141
/// [`RunArray`]
4242
///
4343
///

arrow-schema/src/datatype.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ pub enum DataType {
353353
/// that contain many repeated values using less memory, but with
354354
/// a higher CPU overhead for some operations.
355355
///
356-
/// This type mostly used to represent low cardinality string
356+
/// This type is mostly used to represent low cardinality string
357357
/// arrays or a limited set of primitive types as integers.
358358
Dictionary(Box<DataType>, Box<DataType>),
359359
/// Exact 32-bit width decimal value with precision and scale

parquet/src/arrow/array_reader/byte_array.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,28 @@ pub fn make_byte_array_reader(
6767
pages, data_type, reader,
6868
)))
6969
}
70+
// TODO eventually add a dedicated [`ArrayReader`] for REE
71+
ArrowType::RunEndEncoded(_, ref val_field) => match val_field.data_type() {
72+
ArrowType::Binary
73+
| ArrowType::Utf8
74+
| ArrowType::Decimal128(_, _)
75+
| ArrowType::Decimal256(_, _) => {
76+
let reader = GenericRecordReader::new(column_desc);
77+
Ok(Box::new(ByteArrayReader::<i32>::new(
78+
pages, data_type, reader,
79+
)))
80+
}
81+
ArrowType::LargeUtf8 | ArrowType::LargeBinary => {
82+
let reader = GenericRecordReader::new(column_desc);
83+
Ok(Box::new(ByteArrayReader::<i64>::new(
84+
pages, data_type, reader,
85+
)))
86+
}
87+
_ => Err(general_err!(
88+
"invalid run end encoded value type for byte array reader - {}",
89+
data_type
90+
)),
91+
},
7092
_ => Err(general_err!(
7193
"invalid data type for byte array reader - {}",
7294
data_type
@@ -147,6 +169,11 @@ impl<I: OffsetSizeTrait> ArrayReader for ByteArrayReader<I> {
147169
.with_precision_and_scale(p, s)?;
148170
Arc::new(decimal)
149171
}
172+
// TODO eventually add a dedicated [`ArrayReader`] for REE
173+
ArrowType::RunEndEncoded(_, ref val_field) => {
174+
let array = buffer.into_array(null_buffer, val_field.data_type().clone());
175+
arrow_cast::cast(&array, &self.data_type)?
176+
}
150177
_ => buffer.into_array(null_buffer, self.data_type.clone()),
151178
};
152179

parquet/src/arrow/arrow_writer/byte_array.rs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use crate::util::bit_util::num_required_bits;
3030
use crate::util::interner::{Interner, Storage};
3131
use arrow_array::{
3232
Array, ArrayAccessor, BinaryArray, BinaryViewArray, DictionaryArray, FixedSizeBinaryArray,
33-
LargeBinaryArray, LargeStringArray, StringArray, StringViewArray,
33+
LargeBinaryArray, LargeStringArray, RunArray, StringArray, StringViewArray,
3434
};
3535
use arrow_schema::DataType;
3636

@@ -61,6 +61,28 @@ macro_rules! downcast_dict_op {
6161
};
6262
}
6363

64+
macro_rules! downcast_ree_impl {
65+
($array:ident, $key:ident, $val:ident, $op:expr $(, $arg:expr)*) => {{
66+
$op($array
67+
.as_any()
68+
.downcast_ref::<RunArray<arrow_array::types::$key>>()
69+
.unwrap()
70+
.downcast::<$val>()
71+
.unwrap()$(, $arg)*)
72+
}};
73+
}
74+
75+
macro_rules! downcast_ree_op {
76+
($run_end_field:expr, $val:ident, $array:ident, $op:expr $(, $arg:expr)*) => {
77+
match $run_end_field.data_type() {
78+
DataType::Int16 => downcast_ree_impl!($array, Int16Type, $val, $op$(, $arg)*),
79+
DataType::Int32 => downcast_ree_impl!($array, Int32Type, $val, $op$(, $arg)*),
80+
DataType::Int64 => downcast_ree_impl!($array, Int64Type, $val, $op$(, $arg)*),
81+
_ => unreachable!(),
82+
}
83+
};
84+
}
85+
6486
macro_rules! downcast_op {
6587
($data_type:expr, $array:ident, $op:expr $(, $arg:expr)*) => {
6688
match $data_type {
@@ -92,6 +114,20 @@ macro_rules! downcast_op {
92114
}
93115
d => unreachable!("cannot downcast {} dictionary value to byte array", d),
94116
},
117+
DataType::RunEndEncoded(run_end, value) => match value.data_type() {
118+
DataType::Utf8 => downcast_ree_op!(run_end, StringArray, $array, $op$(, $arg)*),
119+
DataType::LargeUtf8 => {
120+
downcast_ree_op!(run_end, LargeStringArray, $array, $op$(, $arg)*)
121+
}
122+
DataType::Binary => downcast_ree_op!(run_end, BinaryArray, $array, $op$(, $arg)*),
123+
DataType::LargeBinary => {
124+
downcast_ree_op!(run_end, LargeBinaryArray, $array, $op$(, $arg)*)
125+
}
126+
DataType::FixedSizeBinary(_) => {
127+
downcast_ree_op!(run_end, FixedSizeBinaryArray, $array, $op$(, $arg)*)
128+
}
129+
d => unreachable!("cannot downcast {} run end encoded value to byte array", d),
130+
},
95131
d => unreachable!("cannot downcast {} to byte array", d),
96132
}
97133
};

parquet/src/arrow/arrow_writer/levels.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,10 @@ impl LevelInfoBuilder {
222222
_ => unreachable!(),
223223
})
224224
}
225+
DataType::RunEndEncoded(_, v) if is_leaf(v.data_type()) => {
226+
let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
227+
Ok(Self::Primitive(levels))
228+
}
225229
d => Err(nyi_err!("Datatype {} is not yet supported", d)),
226230
}
227231
}

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1122,6 +1122,17 @@ impl ArrowColumnWriterFactory {
11221122
ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
11231123
_ => out.push(col(leaves.next().unwrap())?),
11241124
},
1125+
ArrowDataType::RunEndEncoded(_, value_type) => match value_type.data_type() {
1126+
ArrowDataType::Utf8
1127+
| ArrowDataType::LargeUtf8
1128+
| ArrowDataType::Binary
1129+
| ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?),
1130+
ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
1131+
out.push(bytes(leaves.next().unwrap())?)
1132+
}
1133+
ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
1134+
_ => out.push(col(leaves.next().unwrap())?),
1135+
},
11251136
_ => {
11261137
return Err(ParquetError::NYI(format!(
11271138
"Attempting to write an Arrow type {data_type} to parquet that is not yet implemented"
@@ -1215,6 +1226,41 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usi
12151226
write_primitive(typed, array.values(), levels)
12161227
}
12171228
},
1229+
ArrowDataType::RunEndEncoded(_, value_type) => match value_type.data_type() {
1230+
ArrowDataType::Decimal32(_, _) => {
1231+
let array = arrow_cast::cast(column, value_type.data_type())?;
1232+
let array = array
1233+
.as_primitive::<Decimal32Type>()
1234+
.unary::<_, Int32Type>(|v| v);
1235+
write_primitive(typed, array.values(), levels)
1236+
}
1237+
ArrowDataType::Decimal64(_, _) => {
1238+
let array = arrow_cast::cast(column, value_type.data_type())?;
1239+
let array = array
1240+
.as_primitive::<Decimal64Type>()
1241+
.unary::<_, Int32Type>(|v| v as i32);
1242+
write_primitive(typed, array.values(), levels)
1243+
}
1244+
ArrowDataType::Decimal128(_, _) => {
1245+
let array = arrow_cast::cast(column, value_type.data_type())?;
1246+
let array = array
1247+
.as_primitive::<Decimal128Type>()
1248+
.unary::<_, Int32Type>(|v| v as i32);
1249+
write_primitive(typed, array.values(), levels)
1250+
}
1251+
ArrowDataType::Decimal256(_, _) => {
1252+
let array = arrow_cast::cast(column, value_type.data_type())?;
1253+
let array = array
1254+
.as_primitive::<Decimal256Type>()
1255+
.unary::<_, Int32Type>(|v| v.as_i128() as i32);
1256+
write_primitive(typed, array.values(), levels)
1257+
}
1258+
_ => {
1259+
let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
1260+
let array = array.as_primitive::<Int32Type>();
1261+
write_primitive(typed, array.values(), levels)
1262+
}
1263+
},
12181264
_ => {
12191265
let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
12201266
let array = array.as_primitive::<Int32Type>();
@@ -1297,6 +1343,12 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usi
12971343
write_primitive(typed, array.values(), levels)
12981344
}
12991345
},
1346+
ArrowDataType::RunEndEncoded(_run_ends, _values) => {
1347+
Err(ParquetError::NYI(
1348+
"Int64ColumnWriter: Attempting to write an Arrow REE type that is not yet implemented"
1349+
.to_string(),
1350+
))
1351+
}
13001352
_ => {
13011353
let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
13021354
let array = array.as_primitive::<Int64Type>();
@@ -1371,6 +1423,12 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usi
13711423
let array = column.as_primitive::<Float16Type>();
13721424
get_float_16_array_slice(array, indices)
13731425
}
1426+
ArrowDataType::RunEndEncoded(_run_ends, _values) => {
1427+
return Err(ParquetError::NYI(
1428+
"FixedLenByteArrayColumnWriter: Attempting to write an Arrow REE type that is not yet implemented"
1429+
.to_string(),
1430+
));
1431+
}
13741432
_ => {
13751433
return Err(ParquetError::NYI(
13761434
"Attempting to write an Arrow type that is not yet implemented".to_string(),
@@ -4481,4 +4539,153 @@ mod tests {
44814539
assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
44824540
assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
44834541
}
4542+
4543+
#[test]
4544+
fn arrow_writer_run_end_encoded_string() {
4545+
// Create a run array of strings
4546+
let mut builder = StringRunBuilder::<Int32Type>::new();
4547+
builder.extend(
4548+
vec![Some("alpha"); 100000]
4549+
.into_iter()
4550+
.chain(vec![Some("beta"); 100000]),
4551+
);
4552+
let run_array: RunArray<Int32Type> = builder.finish();
4553+
let schema = Arc::new(Schema::new(vec![Field::new(
4554+
"ree",
4555+
run_array.data_type().clone(),
4556+
run_array.is_nullable(),
4557+
)]));
4558+
4559+
// Write to parquet
4560+
let mut parquet_bytes: Vec<u8> = Vec::new();
4561+
let mut writer = ArrowWriter::try_new(&mut parquet_bytes, schema.clone(), None).unwrap();
4562+
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(run_array)]).unwrap();
4563+
writer.write(&batch).unwrap();
4564+
writer.close().unwrap();
4565+
4566+
// Read back and verify
4567+
let bytes = Bytes::from(parquet_bytes);
4568+
let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4569+
4570+
// Check if dictionary was used by examining the metadata
4571+
let metadata = reader.metadata();
4572+
let row_group = &metadata.row_groups()[0];
4573+
let col_meta = &row_group.columns()[0];
4574+
4575+
// If dictionary encoding worked, we should see RLE_DICTIONARY encoding
4576+
// and have a dictionary page offset
4577+
let has_dict_encoding = col_meta.encodings().any(|e| e == Encoding::RLE_DICTIONARY);
4578+
let has_dict_page = col_meta.dictionary_page_offset().is_some();
4579+
4580+
// Verify the schema is REE encoded when we read it back
4581+
let expected_schema = Arc::new(Schema::new(vec![Field::new(
4582+
"ree",
4583+
DataType::RunEndEncoded(
4584+
Arc::new(Field::new("run_ends", arrow_schema::DataType::Int32, false)),
4585+
Arc::new(Field::new("values", arrow_schema::DataType::Utf8, true)),
4586+
),
4587+
false,
4588+
)]));
4589+
assert_eq!(&expected_schema, reader.schema());
4590+
4591+
// Read the data back
4592+
let batches: Vec<_> = reader
4593+
.build()
4594+
.unwrap()
4595+
.collect::<ArrowResult<Vec<_>>>()
4596+
.unwrap();
4597+
assert_eq!(batches.len(), 196);
4598+
// Count rows in total
4599+
let total_rows = batches.iter().map(|b| b.num_rows()).sum::<usize>();
4600+
assert_eq!(total_rows, 200000);
4601+
4602+
// Ensure dictionary encoding
4603+
assert!(has_dict_encoding, "RunArray should be dictionary encoded");
4604+
assert!(has_dict_page, "RunArray should have dictionary page");
4605+
}
4606+
4607+
#[test]
4608+
fn arrow_writer_run_end_encoded_int() {
4609+
// Create a run array of strings
4610+
let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
4611+
builder.extend(
4612+
vec![Some(1); 100000]
4613+
.into_iter()
4614+
.chain(vec![Some(2); 100000]),
4615+
);
4616+
let run_array: RunArray<Int32Type> = builder.finish();
4617+
let schema = Arc::new(Schema::new(vec![Field::new(
4618+
"ree",
4619+
run_array.data_type().clone(),
4620+
run_array.is_nullable(),
4621+
)]));
4622+
4623+
// Write to parquet
4624+
let mut parquet_bytes: Vec<u8> = Vec::new();
4625+
let mut writer = ArrowWriter::try_new(&mut parquet_bytes, schema.clone(), None).unwrap();
4626+
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(run_array)]).unwrap();
4627+
writer.write(&batch).unwrap();
4628+
writer.close().unwrap();
4629+
4630+
// Read back and verify
4631+
let bytes = Bytes::from(parquet_bytes);
4632+
let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4633+
4634+
// Check if dictionary was used by examining the metadata
4635+
let metadata = reader.metadata();
4636+
let row_group = &metadata.row_groups()[0];
4637+
let col_meta = &row_group.columns()[0];
4638+
let has_dict_encoding = col_meta.encodings().any(|e| e == Encoding::RLE_DICTIONARY);
4639+
4640+
// If dictionary encoding worked, we should see RLE_DICTIONARY encoding
4641+
// and have a dictionary page offset
4642+
// let has_dict_encoding = col_meta.encodings().contains(&Encoding::RLE_DICTIONARY);
4643+
let has_dict_page = col_meta.dictionary_page_offset().is_some();
4644+
4645+
// Verify the schema is REE encoded when we read it back
4646+
let expected_schema = Arc::new(Schema::new(vec![Field::new(
4647+
"ree",
4648+
DataType::RunEndEncoded(
4649+
Arc::new(Field::new("run_ends", arrow_schema::DataType::Int32, false)),
4650+
Arc::new(Field::new("values", arrow_schema::DataType::Int32, true)),
4651+
),
4652+
false,
4653+
)]));
4654+
assert_eq!(&expected_schema, reader.schema());
4655+
4656+
// Read the data back
4657+
let batches: Vec<_> = reader
4658+
.build()
4659+
.unwrap()
4660+
.collect::<ArrowResult<Vec<_>>>()
4661+
.unwrap();
4662+
assert_eq!(batches.len(), 196);
4663+
// Count rows in total
4664+
let total_rows = batches.iter().map(|b| b.num_rows()).sum::<usize>();
4665+
assert_eq!(total_rows, 200000);
4666+
4667+
// Ensure dictionary encoding
4668+
assert!(has_dict_encoding, "RunArray should be dictionary encoded");
4669+
assert!(has_dict_page, "RunArray should have dictionary page");
4670+
}
4671+
4672+
#[test]
4673+
fn arrow_writer_round_trip_run_end_encoded_string() {
4674+
// Create a run array of strings (cannot have more than 1024 values per record batch)
4675+
let mut builder = StringRunBuilder::<Int32Type>::new();
4676+
builder.extend(
4677+
vec![Some("alpha"); 512]
4678+
.into_iter()
4679+
.chain(vec![Some("beta"); 512]),
4680+
);
4681+
let run_array: RunArray<Int32Type> = builder.finish();
4682+
let schema = Arc::new(Schema::new(vec![Field::new(
4683+
"ree",
4684+
run_array.data_type().clone(),
4685+
run_array.is_nullable(),
4686+
)]));
4687+
4688+
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(run_array)]).unwrap();
4689+
roundtrip(batch, None);
4690+
}
44844691
}

0 commit comments

Comments
 (0)