Skip to content

Commit 0931768

Browse files
committed
Convert RunEndEncoded field to Parquet
1 parent 04f217b commit 0931768

File tree

6 files changed

+145
-22
lines changed

6 files changed

+145
-22
lines changed

arrow-cast/src/cast/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,8 @@ pub fn can_cast_types(from_type: &DataType, to_type: &DataType) -> bool {
130130
| FixedSizeList(_, _)
131131
| Struct(_)
132132
| Map(_, _)
133-
| Dictionary(_, _),
133+
| Dictionary(_, _)
134+
| RunEndEncoded(_, _),
134135
) => true,
135136
// Dictionary/List conditions should be put in front of others
136137
(Dictionary(_, from_value_type), Dictionary(_, to_value_type)) => {
@@ -167,6 +168,7 @@ pub fn can_cast_types(from_type: &DataType, to_type: &DataType) -> bool {
167168
can_cast_types(from_key.data_type(), to_key.data_type()) && can_cast_types(from_value.data_type(), to_value.data_type()),
168169
_ => false
169170
},
171+
// TODO: RunEndEncoded here?
170172
// cast one decimal type to another decimal type
171173
(Decimal128(_, _), Decimal128(_, _)) => true,
172174
(Decimal256(_, _), Decimal256(_, _)) => true,
@@ -781,6 +783,7 @@ pub fn cast_with_options(
781783
"Casting from type {from_type:?} to dictionary type {to_type:?} not supported",
782784
))),
783785
},
786+
// TODO: RunEndEncoded here?
784787
(List(_), List(to)) => cast_list_values::<i32>(array, to, cast_options),
785788
(LargeList(_), LargeList(to)) => cast_list_values::<i64>(array, to, cast_options),
786789
(List(_), LargeList(list_to)) => cast_list::<i32, i64>(array, list_to, cast_options),

arrow-schema/src/datatype.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ pub enum DataType {
354354
/// that contain many repeated values using less memory, but with
355355
/// a higher CPU overhead for some operations.
356356
///
357-
/// This type mostly used to represent low cardinality string
357+
/// This type is mostly used to represent low cardinality string
358358
/// arrays or a limited set of primitive types as integers.
359359
Dictionary(Box<DataType>, Box<DataType>),
360360
/// Exact 32-bit width decimal value with precision and scale

parquet/src/arrow/arrow_writer/byte_array.rs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use crate::util::bit_util::num_required_bits;
2828
use crate::util::interner::{Interner, Storage};
2929
use arrow_array::{
3030
Array, ArrayAccessor, BinaryArray, BinaryViewArray, DictionaryArray, FixedSizeBinaryArray,
31-
LargeBinaryArray, LargeStringArray, StringArray, StringViewArray,
31+
LargeBinaryArray, LargeStringArray, RunArray, StringArray, StringViewArray,
3232
};
3333
use arrow_schema::DataType;
3434

@@ -59,6 +59,28 @@ macro_rules! downcast_dict_op {
5959
};
6060
}
6161

62+
macro_rules! downcast_ree_impl {
63+
($array:ident, $key:ident, $val:ident, $op:expr $(, $arg:expr)*) => {{
64+
$op($array
65+
.as_any()
66+
.downcast_ref::<RunArray<arrow_array::types::$key>>()
67+
.unwrap()
68+
.downcast::<$val>()
69+
.unwrap()$(, $arg)*)
70+
}};
71+
}
72+
73+
macro_rules! downcast_ree_op {
74+
($run_end_field:expr, $val:ident, $array:ident, $op:expr $(, $arg:expr)*) => {
75+
match $run_end_field.data_type() {
76+
DataType::Int16 => downcast_ree_impl!($array, Int16Type, $val, $op$(, $arg)*),
77+
DataType::Int32 => downcast_ree_impl!($array, Int32Type, $val, $op$(, $arg)*),
78+
DataType::Int64 => downcast_ree_impl!($array, Int64Type, $val, $op$(, $arg)*),
79+
_ => unreachable!(),
80+
}
81+
};
82+
}
83+
6284
macro_rules! downcast_op {
6385
($data_type:expr, $array:ident, $op:expr $(, $arg:expr)*) => {
6486
match $data_type {
@@ -90,6 +112,20 @@ macro_rules! downcast_op {
90112
}
91113
d => unreachable!("cannot downcast {} dictionary value to byte array", d),
92114
},
115+
DataType::RunEndEncoded(run_end, value) => match value.data_type() {
116+
DataType::Utf8 => downcast_ree_op!(run_end, StringArray, $array, $op$(, $arg)*),
117+
DataType::LargeUtf8 => {
118+
downcast_ree_op!(run_end, LargeStringArray, $array, $op$(, $arg)*)
119+
}
120+
DataType::Binary => downcast_ree_op!(run_end, BinaryArray, $array, $op$(, $arg)*),
121+
DataType::LargeBinary => {
122+
downcast_ree_op!(run_end, LargeBinaryArray, $array, $op$(, $arg)*)
123+
}
124+
DataType::FixedSizeBinary(_) => {
125+
downcast_ree_op!(run_end, FixedSizeBinaryArray, $array, $op$(, $arg)*)
126+
}
127+
d => unreachable!("cannot downcast {} run end encoded value to byte array", d),
128+
},
93129
d => unreachable!("cannot downcast {} to byte array", d),
94130
}
95131
};

parquet/src/arrow/arrow_writer/levels.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,10 @@ impl LevelInfoBuilder {
222222
_ => unreachable!(),
223223
})
224224
}
225+
DataType::RunEndEncoded(_, v) if is_leaf(v.data_type()) => {
226+
let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
227+
Ok(Self::Primitive(levels))
228+
}
225229
d => Err(nyi_err!("Datatype {} is not yet supported", d)),
226230
}
227231
}

parquet/src/arrow/arrow_writer/mod.rs

Lines changed: 70 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1033,15 +1033,15 @@ impl ArrowColumnWriterFactory {
10331033

10341034
match data_type {
10351035
_ if data_type.is_primitive() => out.push(col(leaves.next().unwrap())?),
1036-
ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => out.push(col(leaves.next().unwrap())?),
1036+
ArrowDataType::FixedSizeBinary(_) | ArrowDataType::Boolean | ArrowDataType::Null => {
1037+
out.push(col(leaves.next().unwrap())?)
1038+
}
10371039
ArrowDataType::LargeBinary
10381040
| ArrowDataType::Binary
10391041
| ArrowDataType::Utf8
10401042
| ArrowDataType::LargeUtf8
10411043
| ArrowDataType::BinaryView
1042-
| ArrowDataType::Utf8View => {
1043-
out.push(bytes(leaves.next().unwrap())?)
1044-
}
1044+
| ArrowDataType::Utf8View => out.push(bytes(leaves.next().unwrap())?),
10451045
ArrowDataType::List(f)
10461046
| ArrowDataType::LargeList(f)
10471047
| ArrowDataType::FixedSizeList(f, _) => {
@@ -1058,21 +1058,29 @@ impl ArrowColumnWriterFactory {
10581058
self.get_arrow_column_writer(f[1].data_type(), props, leaves, out)?
10591059
}
10601060
_ => unreachable!("invalid map type"),
1061-
}
1061+
},
10621062
ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
1063-
ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Binary | ArrowDataType::LargeBinary => {
1064-
out.push(bytes(leaves.next().unwrap())?)
1065-
}
1063+
ArrowDataType::Utf8
1064+
| ArrowDataType::LargeUtf8
1065+
| ArrowDataType::Binary
1066+
| ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?),
10661067
ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
10671068
out.push(bytes(leaves.next().unwrap())?)
10681069
}
1069-
ArrowDataType::FixedSizeBinary(_) => {
1070+
ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
1071+
_ => out.push(col(leaves.next().unwrap())?),
1072+
},
1073+
ArrowDataType::RunEndEncoded(_run_ends, value_type) => match value_type.data_type() {
1074+
ArrowDataType::Utf8
1075+
| ArrowDataType::LargeUtf8
1076+
| ArrowDataType::Binary
1077+
| ArrowDataType::LargeBinary => out.push(bytes(leaves.next().unwrap())?),
1078+
ArrowDataType::Utf8View | ArrowDataType::BinaryView => {
10701079
out.push(bytes(leaves.next().unwrap())?)
10711080
}
1072-
_ => {
1073-
out.push(col(leaves.next().unwrap())?)
1074-
}
1075-
}
1081+
ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
1082+
_ => out.push(col(leaves.next().unwrap())?),
1083+
},
10761084
_ => return Err(ParquetError::NYI(
10771085
format!(
10781086
"Attempting to write an Arrow type {data_type:?} to parquet that is not yet implemented"
@@ -1166,6 +1174,7 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usi
11661174
write_primitive(typed, array.values(), levels)
11671175
}
11681176
},
1177+
ArrowDataType::RunEndEncoded(_run_ends, _value_type) => todo!(),
11691178
_ => {
11701179
let array = arrow_cast::cast(column, &ArrowDataType::Int32)?;
11711180
let array = array.as_primitive::<Int32Type>();
@@ -1248,6 +1257,7 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usi
12481257
write_primitive(typed, array.values(), levels)
12491258
}
12501259
},
1260+
ArrowDataType::RunEndEncoded(_run_ends, _values) => todo!(),
12511261
_ => {
12521262
let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;
12531263
let array = array.as_primitive::<Int64Type>();
@@ -1324,6 +1334,7 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usi
13241334
let array = column.as_primitive::<Float16Type>();
13251335
get_float_16_array_slice(array, indices)
13261336
}
1337+
ArrowDataType::RunEndEncoded(_run_ends, _values) => todo!(),
13271338
_ => {
13281339
return Err(ParquetError::NYI(
13291340
"Attempting to write an Arrow type that is not yet implemented".to_string(),
@@ -4293,4 +4304,50 @@ mod tests {
42934304
assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
42944305
assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
42954306
}
4307+
4308+
#[test]
4309+
fn arrow_writer_run_end_encoded() {
4310+
// Create a run array of strings
4311+
let mut builder = StringRunBuilder::<Int16Type>::new();
4312+
builder.extend(
4313+
vec![Some("alpha"); 1000]
4314+
.into_iter()
4315+
.chain(vec![Some("beta"); 1000]),
4316+
);
4317+
let run_array: RunArray<Int16Type> = builder.finish();
4318+
println!("run_array type: {:?}", run_array.data_type());
4319+
let schema = Arc::new(Schema::new(vec![Field::new(
4320+
"ree",
4321+
run_array.data_type().clone(),
4322+
run_array.is_nullable(),
4323+
)]));
4324+
4325+
// Write to parquet
4326+
let mut parquet_bytes: Vec<u8> = Vec::new();
4327+
let mut writer = ArrowWriter::try_new(&mut parquet_bytes, schema.clone(), None).unwrap();
4328+
let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(run_array)]).unwrap();
4329+
writer.write(&batch).unwrap();
4330+
writer.close().unwrap();
4331+
4332+
// Schema of output is plain, not dictionary or REE encoded!!
4333+
let expected_schema = Arc::new(Schema::new(vec![Field::new(
4334+
"ree",
4335+
arrow_schema::DataType::Utf8,
4336+
false,
4337+
)]));
4338+
4339+
// Read from parquet
4340+
let bytes = Bytes::from(parquet_bytes);
4341+
let reader = ParquetRecordBatchReaderBuilder::try_new(bytes).unwrap();
4342+
assert_eq!(reader.schema(), &expected_schema);
4343+
let batches: Vec<_> = reader
4344+
.build()
4345+
.unwrap()
4346+
.collect::<ArrowResult<Vec<_>>>()
4347+
.unwrap();
4348+
assert_eq!(batches.len(), 2);
4349+
// Count rows in total
4350+
let total_rows = batches.iter().map(|b| b.num_rows()).sum::<usize>();
4351+
assert_eq!(total_rows, 2000);
4352+
}
42964353
}

parquet/src/arrow/schema/mod.rs

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ pub fn parquet_to_arrow_field_levels(
129129
match complex::convert_schema(schema, mask, hint)? {
130130
Some(field) => match &field.arrow_type {
131131
DataType::Struct(fields) => Ok(FieldLevels {
132-
fields: fields.clone(),
132+
fields: fields.to_owned(),
133133
levels: Some(field),
134134
}),
135135
_ => unreachable!(),
@@ -303,7 +303,7 @@ impl<'a> ArrowSchemaConverter<'a> {
303303
///
304304
/// Setting this option to `true` will result in Parquet files that can be
305305
/// read by more readers, but may lose precision for Arrow types such as
306-
/// [`DataType::Date64`] which have no direct [corresponding Parquet type].
306+
/// [`DataType::Date64`] which have no direct corresponding Parquet type.
307307
///
308308
/// By default, this converter does not coerce to native Parquet types. Enabling type
309309
/// coercion allows for meaningful representations that do not require
@@ -771,12 +771,17 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result<Type> {
771771
DataType::Union(_, _) => unimplemented!("See ARROW-8817."),
772772
DataType::Dictionary(_, ref value) => {
773773
// Dictionary encoding not handled at the schema level
774-
let dict_field = field.clone().with_data_type(value.as_ref().clone());
774+
let dict_field = field.to_owned().with_data_type(value.as_ref().clone());
775+
arrow_to_parquet_type(&dict_field, coerce_types)
776+
}
777+
DataType::RunEndEncoded(_run_end_type, value_type) => {
778+
// We want to write REE data as dictionary encoded data,
779+
// which is not handled at the schema level.
780+
let dict_field = field
781+
.to_owned()
782+
.with_data_type(value_type.data_type().to_owned());
775783
arrow_to_parquet_type(&dict_field, coerce_types)
776784
}
777-
DataType::RunEndEncoded(_, _) => Err(arrow_err!(
778-
"Converting RunEndEncodedType to parquet not supported",
779-
)),
780785
}
781786
}
782787

@@ -2272,4 +2277,22 @@ mod tests {
22722277

22732278
Ok(())
22742279
}
2280+
2281+
#[test]
2282+
fn test_run_end_encoded_conversion() {
2283+
use crate::basic::Type;
2284+
let run_ends_field = Arc::new(Field::new("run_ends", DataType::Int16, false));
2285+
let values_field = Arc::new(Field::new("values", DataType::Boolean, true));
2286+
let run_end_encoded_field = Field::new(
2287+
"run_end_encoded_16",
2288+
DataType::RunEndEncoded(run_ends_field, values_field),
2289+
false,
2290+
);
2291+
2292+
let result = arrow_to_parquet_type(&run_end_encoded_field, false).unwrap();
2293+
// Should convert to the underlying value type (Boolean in this case)
2294+
assert_eq!(result.get_physical_type(), Type::BOOLEAN);
2295+
assert_eq!(result.get_basic_info().repetition(), Repetition::REQUIRED); // field is not nullable
2296+
assert_eq!(result.name(), "run_end_encoded_16");
2297+
}
22752298
}

0 commit comments

Comments
 (0)