diff --git a/datafusion/spark/Cargo.toml b/datafusion/spark/Cargo.toml index 02d747df6deeb..6f7eba55b8907 100644 --- a/datafusion/spark/Cargo.toml +++ b/datafusion/spark/Cargo.toml @@ -61,3 +61,7 @@ criterion = { workspace = true } [[bench]] harness = false name = "char" + +[[bench]] +harness = false +name = "space" diff --git a/datafusion/spark/benches/space.rs b/datafusion/spark/benches/space.rs new file mode 100644 index 0000000000000..8ace7219a1dcc --- /dev/null +++ b/datafusion/spark/benches/space.rs @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +extern crate criterion; + +use arrow::array::PrimitiveArray; +use arrow::datatypes::{DataType, Field, Int32Type}; +use criterion::{Criterion, criterion_group, criterion_main}; +use datafusion_common::config::ConfigOptions; +use datafusion_expr::{ColumnarValue, ScalarFunctionArgs}; +use datafusion_spark::function::string::space; +use rand::prelude::StdRng; +use rand::{Rng, SeedableRng}; +use std::hint::black_box; +use std::sync::Arc; + +fn criterion_benchmark(c: &mut Criterion) { + let space_func = space(); + let size = 1024; + let input: PrimitiveArray = { + let null_density = 0.2; + let mut rng = StdRng::seed_from_u64(42); + (0..size) + .map(|_| { + if rng.random::() < null_density { + None + } else { + Some(rng.random_range::(1i32..10)) + } + }) + .collect() + }; + let input = Arc::new(input); + let args = vec![ColumnarValue::Array(input)]; + let arg_fields = args + .iter() + .enumerate() + .map(|(idx, arg)| Field::new(format!("arg_{idx}"), arg.data_type(), true).into()) + .collect::>(); + let config_options = Arc::new(ConfigOptions::default()); + c.bench_function("space", |b| { + b.iter(|| { + black_box( + space_func + .invoke_with_args(ScalarFunctionArgs { + args: args.clone(), + arg_fields: arg_fields.clone(), + number_rows: size, + return_field: Arc::new(Field::new("f", DataType::Utf8, true)), + config_options: Arc::clone(&config_options), + }) + .unwrap(), + ) + }) + }); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/spark/src/function/string/mod.rs b/datafusion/spark/src/function/string/mod.rs index 480984f02159b..369d381a9c35b 100644 --- a/datafusion/spark/src/function/string/mod.rs +++ b/datafusion/spark/src/function/string/mod.rs @@ -24,6 +24,7 @@ pub mod ilike; pub mod length; pub mod like; pub mod luhn_check; +pub mod space; use datafusion_expr::ScalarUDF; use datafusion_functions::make_udf_function; @@ -38,6 +39,7 @@ make_udf_function!(elt::SparkElt, elt); make_udf_function!(like::SparkLike, like); make_udf_function!(luhn_check::SparkLuhnCheck, luhn_check); make_udf_function!(format_string::FormatStringFunc, format_string); +make_udf_function!(space::SparkSpace, space); pub mod expr_fn { use datafusion_functions::export_functions; @@ -87,6 +89,7 @@ pub mod expr_fn { "Returns a formatted string from printf-style format strings.", strfmt args )); + export_functions!((space, "Returns a string consisting of n spaces.", arg1)); } pub fn functions() -> Vec> { @@ -100,5 +103,6 @@ pub fn functions() -> Vec> { like(), luhn_check(), format_string(), + space(), ] } diff --git a/datafusion/spark/src/function/string/space.rs b/datafusion/spark/src/function/string/space.rs new file mode 100644 index 0000000000000..77daff28ff1a1 --- /dev/null +++ b/datafusion/spark/src/function/string/space.rs @@ -0,0 +1,232 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ + Array, ArrayRef, DictionaryArray, Int32Array, StringArray, StringBuilder, + as_dictionary_array, +}; +use arrow::datatypes::{DataType, Int32Type}; +use datafusion_common::cast::as_int32_array; +use datafusion_common::{Result, ScalarValue, exec_err}; +use datafusion_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; +use std::any::Any; +use std::sync::Arc; + +/// Spark-compatible `space` expression +/// +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct SparkSpace { + signature: Signature, +} + +impl Default for SparkSpace { + fn default() -> Self { + Self::new() + } +} + +impl SparkSpace { + pub fn new() -> Self { + Self { + signature: Signature::uniform( + 1, + vec![ + DataType::Int32, + DataType::Dictionary( + Box::new(DataType::Int32), + Box::new(DataType::Int32), + ), + ], + Volatility::Immutable, + ), + } + } +} + +impl ScalarUDFImpl for SparkSpace { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "space" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, args: &[DataType]) -> Result { + let return_type = match &args[0] { + DataType::Dictionary(key_type, _) => { + DataType::Dictionary(key_type.clone(), Box::new(DataType::Utf8)) + } + _ => DataType::Utf8, + }; + Ok(return_type) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + spark_space(&args.args) + } +} + +pub fn spark_space(args: &[ColumnarValue]) -> Result { + if args.len() != 1 { + return exec_err!("space function takes exactly one argument"); + } + match &args[0] { + ColumnarValue::Array(array) => { + let result = spark_space_array(array)?; + Ok(ColumnarValue::Array(result)) + } + ColumnarValue::Scalar(scalar) => { + let result = spark_space_scalar(scalar)?; + Ok(ColumnarValue::Scalar(result)) + } + } +} + +fn spark_space_array(array: &ArrayRef) -> Result { + match array.data_type() { + DataType::Int32 => { + let array = as_int32_array(array)?; + Ok(Arc::new(spark_space_array_inner(array))) + } + DataType::Dictionary(_, _) => { + let dict = as_dictionary_array::(array); + let values = spark_space_array(dict.values())?; + let result = DictionaryArray::try_new(dict.keys().clone(), values)?; + Ok(Arc::new(result)) + } + other => { + exec_err!("Unsupported data type {other:?} for function `space`") + } + } +} + +fn spark_space_scalar(scalar: &ScalarValue) -> Result { + match scalar { + ScalarValue::Int32(value) => { + let result = value.map(|v| { + if v <= 0 { + String::new() + } else { + " ".repeat(v as usize) + } + }); + Ok(ScalarValue::Utf8(result)) + } + other => { + exec_err!("Unsupported data type {other:?} for function `space`") + } + } +} + +fn spark_space_array_inner(array: &Int32Array) -> StringArray { + let mut builder = StringBuilder::with_capacity(array.len(), array.len() * 16); + let mut space_buf = String::new(); + for value in array.iter() { + match value { + None => builder.append_null(), + Some(l) if l > 0 => { + let l = l as usize; + if space_buf.len() < l { + space_buf = " ".repeat(l); + } + builder.append_value(&space_buf[..l]); + } + Some(_) => builder.append_value(""), + } + } + builder.finish() +} + +#[cfg(test)] +mod tests { + use crate::function::string::space::spark_space; + use arrow::array::{Array, Int32Array, Int32DictionaryArray}; + use arrow::datatypes::Int32Type; + use datafusion_common::cast::{as_dictionary_array, as_string_array}; + use datafusion_common::{Result, ScalarValue}; + use datafusion_expr::ColumnarValue; + use std::sync::Arc; + + #[test] + fn test_spark_space_int32_array() -> Result<()> { + let int32_array = ColumnarValue::Array(Arc::new(Int32Array::from(vec![ + Some(1), + Some(-3), + Some(0), + Some(5), + None, + ]))); + let ColumnarValue::Array(result) = spark_space(&[int32_array])? else { + unreachable!() + }; + let result = as_string_array(&result)?; + + assert_eq!(result.value(0), " "); + assert_eq!(result.value(1), ""); + assert_eq!(result.value(2), ""); + assert_eq!(result.value(3), " "); + assert!(result.is_null(4)); + Ok(()) + } + + #[test] + fn test_spark_space_dictionary() -> Result<()> { + let dictionary = ColumnarValue::Array(Arc::new(Int32DictionaryArray::new( + Int32Array::from(vec![0, 1, 2, 3, 4]), + Arc::new(Int32Array::from(vec![ + Some(1), + Some(-3), + Some(0), + Some(5), + None, + ])), + ))); + let ColumnarValue::Array(result) = spark_space(&[dictionary])? else { + unreachable!() + }; + let result = + as_string_array(as_dictionary_array::(&result)?.values())?; + assert_eq!(result.value(0), " "); + assert_eq!(result.value(1), ""); + assert_eq!(result.value(2), ""); + assert_eq!(result.value(3), " "); + assert!(result.is_null(4)); + Ok(()) + } + + #[test] + fn test_spark_space_scalar() -> Result<()> { + let scalar = ColumnarValue::Scalar(ScalarValue::Int32(Some(-5))); + let ColumnarValue::Scalar(result) = spark_space(&[scalar])? else { + unreachable!() + }; + match result { + ScalarValue::Utf8(Some(result)) => { + assert_eq!(result, ""); + } + _ => unreachable!(), + } + Ok(()) + } +} diff --git a/datafusion/sqllogictest/test_files/spark/string/space.slt b/datafusion/sqllogictest/test_files/spark/string/space.slt new file mode 100644 index 0000000000000..388f679c4da73 --- /dev/null +++ b/datafusion/sqllogictest/test_files/spark/string/space.slt @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +query T +SELECT concat(space(1::INT), 'Spark'); +---- + Spark + +query T +SELECT concat(space(5::INT), 'Spark'); +---- + Spark + +query T +SELECT space(0::INT); +---- +(empty) + +query T +SELECT space(-1::INT); +---- +(empty) + +query T +SELECT space(NULL); +---- +NULL