@@ -1033,15 +1033,15 @@ impl ArrowColumnWriterFactory {
10331033
10341034 match data_type {
10351035 _ if data_type. is_primitive ( ) => out. push ( col ( leaves. next ( ) . unwrap ( ) ) ?) ,
1036- ArrowDataType :: FixedSizeBinary ( _) | ArrowDataType :: Boolean | ArrowDataType :: Null => out. push ( col ( leaves. next ( ) . unwrap ( ) ) ?) ,
1036+ ArrowDataType :: FixedSizeBinary ( _) | ArrowDataType :: Boolean | ArrowDataType :: Null => {
1037+ out. push ( col ( leaves. next ( ) . unwrap ( ) ) ?)
1038+ }
10371039 ArrowDataType :: LargeBinary
10381040 | ArrowDataType :: Binary
10391041 | ArrowDataType :: Utf8
10401042 | ArrowDataType :: LargeUtf8
10411043 | ArrowDataType :: BinaryView
1042- | ArrowDataType :: Utf8View => {
1043- out. push ( bytes ( leaves. next ( ) . unwrap ( ) ) ?)
1044- }
1044+ | ArrowDataType :: Utf8View => out. push ( bytes ( leaves. next ( ) . unwrap ( ) ) ?) ,
10451045 ArrowDataType :: List ( f)
10461046 | ArrowDataType :: LargeList ( f)
10471047 | ArrowDataType :: FixedSizeList ( f, _) => {
@@ -1058,21 +1058,30 @@ impl ArrowColumnWriterFactory {
10581058 self . get_arrow_column_writer ( f[ 1 ] . data_type ( ) , props, leaves, out) ?
10591059 }
10601060 _ => unreachable ! ( "invalid map type" ) ,
1061- }
1061+ } ,
10621062 ArrowDataType :: Dictionary ( _, value_type) => match value_type. as_ref ( ) {
1063- ArrowDataType :: Utf8 | ArrowDataType :: LargeUtf8 | ArrowDataType :: Binary | ArrowDataType :: LargeBinary => {
1064- out. push ( bytes ( leaves. next ( ) . unwrap ( ) ) ?)
1065- }
1063+ ArrowDataType :: Utf8
1064+ | ArrowDataType :: LargeUtf8
1065+ | ArrowDataType :: Binary
1066+ | ArrowDataType :: LargeBinary => out. push ( bytes ( leaves. next ( ) . unwrap ( ) ) ?) ,
10661067 ArrowDataType :: Utf8View | ArrowDataType :: BinaryView => {
10671068 out. push ( bytes ( leaves. next ( ) . unwrap ( ) ) ?)
10681069 }
1069- ArrowDataType :: FixedSizeBinary ( _) => {
1070+ ArrowDataType :: FixedSizeBinary ( _) => out. push ( bytes ( leaves. next ( ) . unwrap ( ) ) ?) ,
1071+ _ => out. push ( col ( leaves. next ( ) . unwrap ( ) ) ?) ,
1072+ } ,
1073+ // TODO: Don't know what I'm doing here!
1074+ ArrowDataType :: RunEndEncoded ( _run_ends, value_type) => match value_type. data_type ( ) {
1075+ ArrowDataType :: Utf8
1076+ | ArrowDataType :: LargeUtf8
1077+ | ArrowDataType :: Binary
1078+ | ArrowDataType :: LargeBinary => out. push ( bytes ( leaves. next ( ) . unwrap ( ) ) ?) ,
1079+ ArrowDataType :: Utf8View | ArrowDataType :: BinaryView => {
10701080 out. push ( bytes ( leaves. next ( ) . unwrap ( ) ) ?)
10711081 }
1072- _ => {
1073- out. push ( col ( leaves. next ( ) . unwrap ( ) ) ?)
1074- }
1075- }
1082+ ArrowDataType :: FixedSizeBinary ( _) => out. push ( bytes ( leaves. next ( ) . unwrap ( ) ) ?) ,
1083+ _ => out. push ( col ( leaves. next ( ) . unwrap ( ) ) ?) ,
1084+ } ,
10761085 _ => return Err ( ParquetError :: NYI (
10771086 format ! (
10781087 "Attempting to write an Arrow type {data_type:?} to parquet that is not yet implemented"
@@ -1166,6 +1175,7 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usi
11661175 write_primitive ( typed, array. values ( ) , levels)
11671176 }
11681177 } ,
1178+ ArrowDataType :: RunEndEncoded ( _run_ends, _value_type) => todo ! ( ) ,
11691179 _ => {
11701180 let array = arrow_cast:: cast ( column, & ArrowDataType :: Int32 ) ?;
11711181 let array = array. as_primitive :: < Int32Type > ( ) ;
@@ -1248,6 +1258,7 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usi
12481258 write_primitive ( typed, array. values ( ) , levels)
12491259 }
12501260 } ,
1261+ ArrowDataType :: RunEndEncoded ( _run_ends, _values) => todo ! ( ) ,
12511262 _ => {
12521263 let array = arrow_cast:: cast ( column, & ArrowDataType :: Int64 ) ?;
12531264 let array = array. as_primitive :: < Int64Type > ( ) ;
@@ -1324,6 +1335,7 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usi
13241335 let array = column. as_primitive :: < Float16Type > ( ) ;
13251336 get_float_16_array_slice ( array, indices)
13261337 }
1338+ ArrowDataType :: RunEndEncoded ( _run_ends, _values) => todo ! ( ) ,
13271339 _ => {
13281340 return Err ( ParquetError :: NYI (
13291341 "Attempting to write an Arrow type that is not yet implemented" . to_string ( ) ,
@@ -4293,4 +4305,50 @@ mod tests {
42934305 assert_eq ! ( get_dict_page_size( col0_meta) , 1024 * 1024 ) ;
42944306 assert_eq ! ( get_dict_page_size( col1_meta) , 1024 * 1024 * 4 ) ;
42954307 }
4308+
4309+ #[ test]
4310+ fn arrow_writer_run_end_encoded ( ) {
4311+ // Create a run array of strings
4312+ let mut builder = StringRunBuilder :: < Int16Type > :: new ( ) ;
4313+ builder. extend (
4314+ vec ! [ Some ( "alpha" ) ; 1000 ]
4315+ . into_iter ( )
4316+ . chain ( vec ! [ Some ( "beta" ) ; 1000 ] ) ,
4317+ ) ;
4318+ let run_array: RunArray < Int16Type > = builder. finish ( ) ;
4319+ println ! ( "run_array type: {:?}" , run_array. data_type( ) ) ;
4320+ let schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new(
4321+ "ree" ,
4322+ run_array. data_type( ) . clone( ) ,
4323+ run_array. is_nullable( ) ,
4324+ ) ] ) ) ;
4325+
4326+ // Write to parquet
4327+ let mut parquet_bytes: Vec < u8 > = Vec :: new ( ) ;
4328+ let mut writer = ArrowWriter :: try_new ( & mut parquet_bytes, schema. clone ( ) , None ) . unwrap ( ) ;
4329+ let batch = RecordBatch :: try_new ( schema. clone ( ) , vec ! [ Arc :: new( run_array) ] ) . unwrap ( ) ;
4330+ writer. write ( & batch) . unwrap ( ) ;
4331+ writer. close ( ) . unwrap ( ) ;
4332+
4333+ // Schema of output is plain, not dictionary or REE encoded!!
4334+ let expected_schema = Arc :: new ( Schema :: new ( vec ! [ Field :: new(
4335+ "ree" ,
4336+ arrow_schema:: DataType :: Utf8 ,
4337+ false ,
4338+ ) ] ) ) ;
4339+
4340+ // Read from parquet
4341+ let bytes = Bytes :: from ( parquet_bytes) ;
4342+ let reader = ParquetRecordBatchReaderBuilder :: try_new ( bytes) . unwrap ( ) ;
4343+ assert_eq ! ( reader. schema( ) , & expected_schema) ;
4344+ let batches: Vec < _ > = reader
4345+ . build ( )
4346+ . unwrap ( )
4347+ . collect :: < ArrowResult < Vec < _ > > > ( )
4348+ . unwrap ( ) ;
4349+ assert_eq ! ( batches. len( ) , 2 ) ;
4350+ // Count rows in total
4351+ let total_rows = batches. iter ( ) . map ( |b| b. num_rows ( ) ) . sum :: < usize > ( ) ;
4352+ assert_eq ! ( total_rows, 2000 ) ;
4353+ }
42964354}
0 commit comments