Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
768b3e9
impl map_from_entries
Dec 14, 2025
c68c342
Revert "impl map_from_entries"
Dec 16, 2025
d887555
Merge branch 'apache:main' into main
kazantsev-maksim Dec 16, 2025
231aa90
Merge branch 'apache:main' into main
kazantsev-maksim Dec 17, 2025
9500bbb
Merge branch 'apache:main' into main
kazantsev-maksim Dec 24, 2025
5d153e9
work
Dec 27, 2025
f0f03d4
WIP
Dec 28, 2025
4b02dd6
WIP
Dec 28, 2025
9577481
Merge branch 'apache:main' into main
kazantsev-maksim Dec 28, 2025
0f98a3c
Add benchmark test
Dec 29, 2025
d7a6036
WIP
Dec 30, 2025
3791557
Merge branch 'apache:main' into main
kazantsev-maksim Jan 2, 2026
7c2f082
Merge branch 'apache:main' into main
kazantsev-maksim Jan 3, 2026
609a605
Merge branch 'apache:main' into main
kazantsev-maksim Jan 6, 2026
a151b2c
Merge branch 'apache:main' into main
kazantsev-maksim Jan 7, 2026
e89c8f2
Merge remote-tracking branch 'origin/main' into to_csv
Jan 7, 2026
902eb3a
Add benchmark
Jan 7, 2026
86c17e8
add more options
Jan 7, 2026
1bbc314
Revert
Jan 8, 2026
55388da
Work
Jan 8, 2026
c93d256
Work
Jan 9, 2026
3a51b62
Fix tests
Jan 9, 2026
93458cf
Fix tests
Jan 9, 2026
773aaba
Fix clippy warnings
Jan 9, 2026
cf544c7
Fix tests
Jan 9, 2026
ad3e7f5
Merge branch 'apache:main' into main
kazantsev-maksim Jan 10, 2026
ea92e4b
Merge branch 'apache:main' into main
kazantsev-maksim Jan 14, 2026
8dfeca3
Merge branch 'apache:main' into main
kazantsev-maksim Jan 17, 2026
53a1418
Merge remote-tracking branch 'origin/main' into to_csv
Jan 17, 2026
cea1f1a
Add more edge cases
Jan 17, 2026
559741e
Merge branch 'apache:main' into main
kazantsev-maksim Jan 20, 2026
ebda14e
Merge branch 'apache:main' into main
kazantsev-maksim Jan 21, 2026
aea8c0b
Merge remote-tracking branch 'origin/main' into to_csv
Jan 21, 2026
37b9736
Resolve conflicts
Jan 21, 2026
78c2e23
Fix PR issues
Jan 21, 2026
408152e
Merge branch 'apache:main' into main
kazantsev-maksim Jan 23, 2026
f1e3866
Merge remote-tracking branch 'origin/main' into to_csv
Jan 23, 2026
f8655f0
Fix tests
Jan 24, 2026
d7857b2
Merge branch 'apache:main' into main
kazantsev-maksim Jan 24, 2026
d4b3b69
Merge remote-tracking branch 'origin/main' into to_csv
Jan 24, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr_build_linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ jobs:
org.apache.comet.CometStringExpressionSuite
org.apache.comet.CometBitwiseExpressionSuite
org.apache.comet.CometMapExpressionSuite
org.apache.comet.CometCsvExpressionSuite
org.apache.comet.CometJsonExpressionSuite
org.apache.comet.expressions.conditional.CometIfSuite
org.apache.comet.expressions.conditional.CometCoalesceSuite
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr_build_macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ jobs:
org.apache.comet.CometBitwiseExpressionSuite
org.apache.comet.CometMapExpressionSuite
org.apache.comet.CometJsonExpressionSuite
org.apache.comet.CometCsvExpressionSuite
org.apache.comet.expressions.conditional.CometIfSuite
org.apache.comet.expressions.conditional.CometCoalesceSuite
org.apache.comet.expressions.conditional.CometCaseWhenSuite
Expand Down
21 changes: 20 additions & 1 deletion native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ use datafusion::{
};
use datafusion_comet_spark_expr::{
create_comet_physical_fun, create_comet_physical_fun_with_eval_mode, BinaryOutputStyle,
BloomFilterAgg, BloomFilterMightContain, EvalMode, SumInteger,
BloomFilterAgg, BloomFilterMightContain, CsvWriteOptions, EvalMode, SumInteger, ToCsv,
};
use iceberg::expr::Bind;

Expand Down Expand Up @@ -585,6 +585,25 @@ impl PhysicalPlanner {
ExprStruct::MonotonicallyIncreasingId(_) => Ok(Arc::new(
MonotonicallyIncreasingId::from_partition_id(self.partition),
)),
ExprStruct::ToCsv(expr) => {
let csv_struct_expr =
self.create_expr(expr.child.as_ref().unwrap(), Arc::clone(&input_schema))?;
let options = expr.options.clone().unwrap();
let csv_write_options = CsvWriteOptions::new(
options.delimiter,
options.quote,
options.escape,
options.null_value,
options.quote_all,
options.ignore_leading_white_space,
options.ignore_trailing_white_space,
);
Ok(Arc::new(ToCsv::new(
csv_struct_expr,
&options.timezone,
csv_write_options,
)))
}
expr => Err(GeneralError(format!("Not implemented: {expr:?}"))),
}
}
Expand Down
17 changes: 17 additions & 0 deletions native/proto/src/proto/expr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ message Expr {
EmptyExpr monotonically_increasing_id = 64;
UnixTimestamp unix_timestamp = 65;
FromJson from_json = 66;
ToCsv to_csv = 67;
}
}

Expand Down Expand Up @@ -276,6 +277,22 @@ message FromJson {
string timezone = 3;
}

message ToCsv {
Expr child = 1;
CsvWriteOptions options = 2;
}

message CsvWriteOptions {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

string delimiter = 1;
string quote = 2;
string escape = 3;
string null_value = 4;
bool quote_all = 5;
bool ignore_leading_white_space = 6;
bool ignore_trailing_white_space = 7;
string timezone = 8;
}

enum BinaryOutputStyle {
UTF8 = 0;
BASIC = 1;
Expand Down
4 changes: 4 additions & 0 deletions native/spark-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ harness = false
name = "normalize_nan"
harness = false

[[bench]]
name = "to_csv"
harness = false

[[test]]
name = "test_udf_registration"
path = "tests/spark_expr_reg.rs"
108 changes: 108 additions & 0 deletions native/spark-expr/benches/to_csv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::{
BooleanBuilder, Int16Builder, Int32Builder, Int64Builder, Int8Builder, StringBuilder,
StructArray, StructBuilder,
};
use arrow::datatypes::{DataType, Field};
use criterion::{criterion_group, criterion_main, Criterion};
use datafusion_comet_spark_expr::{to_csv_inner, CsvWriteOptions, EvalMode, SparkCastOptions};
use std::hint::black_box;

fn create_struct_array(array_size: usize) -> StructArray {
let fields = vec![
Field::new("f1", DataType::Boolean, true),
Field::new("f2", DataType::Int8, true),
Field::new("f3", DataType::Int16, true),
Field::new("f4", DataType::Int32, true),
Field::new("f5", DataType::Int64, true),
Field::new("f6", DataType::Utf8, true),
];
let mut struct_builder = StructBuilder::from_fields(fields, array_size);
for i in 0..array_size {
struct_builder
.field_builder::<BooleanBuilder>(0)
.unwrap()
.append_option(if i % 10 == 0 { None } else { Some(i % 2 == 0) });

struct_builder
.field_builder::<Int8Builder>(1)
.unwrap()
.append_option(if i % 10 == 0 {
None
} else {
Some((i % 128) as i8)
});

struct_builder
.field_builder::<Int16Builder>(2)
.unwrap()
.append_option(if i % 10 == 0 { None } else { Some(i as i16) });

struct_builder
.field_builder::<Int32Builder>(3)
.unwrap()
.append_option(if i % 10 == 0 { None } else { Some(i as i32) });

struct_builder
.field_builder::<Int64Builder>(4)
.unwrap()
.append_option(if i % 10 == 0 { None } else { Some(i as i64) });

struct_builder
.field_builder::<StringBuilder>(5)
.unwrap()
.append_option(if i % 10 == 0 {
None
} else {
Some(format!("string_{}", i))
});

struct_builder.append(true);
}
struct_builder.finish()
}

fn criterion_benchmark(c: &mut Criterion) {
let array_size = 8192;
let timezone = "UTC";
let struct_array = create_struct_array(array_size);
let default_delimiter = ",";
let default_null_value = "";
let default_quote = "\"";
let default_escape = "\\";
let mut cast_options = SparkCastOptions::new(EvalMode::Legacy, timezone, false);
cast_options.null_string = default_null_value.to_string();
let csv_write_options = CsvWriteOptions::new(
default_delimiter.to_string(),
default_quote.to_string(),
default_escape.to_string(),
default_null_value.to_string(),
false,
true,
true,
);
c.bench_function("to_csv", |b| {
b.iter(|| {
black_box(to_csv_inner(&struct_array, &cast_options, &csv_write_options).unwrap())
})
});
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
61 changes: 61 additions & 0 deletions native/spark-expr/src/csv_funcs/csv_write_options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::fmt::{Display, Formatter};

#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct CsvWriteOptions {
pub delimiter: String,
pub quote: String,
pub escape: String,
pub null_value: String,
pub quote_all: bool,
pub ignore_leading_white_space: bool,
pub ignore_trailing_white_space: bool,
}

impl Display for CsvWriteOptions {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"csv_write_options(delimiter={}, quote={}, escape={}, null_value={}, quote_all={}, ignore_leading_white_space={}, ignore_trailing_white_space={})",
self.delimiter, self.quote, self.escape, self.null_value, self.quote_all, self.ignore_leading_white_space, self.ignore_trailing_white_space
)
}
}

impl CsvWriteOptions {
pub fn new(
delimiter: String,
quote: String,
escape: String,
null_value: String,
quote_all: bool,
ignore_leading_white_space: bool,
ignore_trailing_white_space: bool,
) -> Self {
Self {
delimiter,
quote,
escape,
null_value,
quote_all,
ignore_leading_white_space,
ignore_trailing_white_space,
}
}
}
22 changes: 22 additions & 0 deletions native/spark-expr/src/csv_funcs/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

mod csv_write_options;
mod to_csv;

pub use csv_write_options::CsvWriteOptions;
pub use to_csv::{to_csv_inner, ToCsv};
Loading
Loading