Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions datafusion/spark/src/function/math/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod expm1;
pub mod factorial;
pub mod hex;
pub mod modulus;
pub mod negative;
pub mod rint;
pub mod trigonometry;
pub mod unhex;
Expand All @@ -40,6 +41,7 @@ make_udf_function!(unhex::SparkUnhex, unhex);
make_udf_function!(width_bucket::SparkWidthBucket, width_bucket);
make_udf_function!(trigonometry::SparkCsc, csc);
make_udf_function!(trigonometry::SparkSec, sec);
make_udf_function!(negative::SparkNegative, negative);

pub mod expr_fn {
use datafusion_functions::export_functions;
Expand All @@ -63,6 +65,11 @@ pub mod expr_fn {
export_functions!((width_bucket, "Returns the bucket number into which the value of this expression would fall after being evaluated.", arg1 arg2 arg3 arg4));
export_functions!((csc, "Returns the cosecant of expr.", arg1));
export_functions!((sec, "Returns the secant of expr.", arg1));
export_functions!((
negative,
"Returns the negation of expr (unary minus).",
arg1
));
}

pub fn functions() -> Vec<Arc<ScalarUDF>> {
Expand All @@ -78,5 +85,6 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
width_bucket(),
csc(),
sec(),
negative(),
]
}
280 changes: 280 additions & 0 deletions datafusion/spark/src/function/math/negative.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow::array::types::*;
use arrow::array::*;
use arrow::datatypes::{DataType, IntervalDayTime, IntervalMonthDayNano};
use bigdecimal::num_traits::WrappingNeg;
use datafusion_common::utils::take_function_args;
use datafusion_common::{Result, ScalarValue, not_impl_err};

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To return a user-facing execution error for invalid inputs, exec_err should be imported from datafusion_common. This will be used to handle unsupported operations on unsigned integers.

Suggested change
use datafusion_common::{Result, ScalarValue, not_impl_err};
use datafusion_common::{exec_err, not_impl_err, Result, ScalarValue};

use datafusion_expr::{
ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility,
};
use std::any::Any;
use std::sync::Arc;

/// Spark-compatible `negative` expression
/// <https://spark.apache.org/docs/latest/api/sql/index.html#negative>
///
/// Returns the negation of input (equivalent to unary minus)
/// Returns NULL if input is NULL, returns NaN if input is NaN.
///
/// ANSI mode support see (<https://github.com/apache/datafusion/issues/20034>):
/// - Spark's ANSI-compliant dialect, when off (i.e. `spark.sql.ansi.enabled=false`),
/// negating the minimal value of a signed integer wraps around.
/// For example: negative(i32::MIN) returns i32::MIN (wraps instead of error).
/// This is the current implementation (legacy mode only).
/// - Spark's ANSI mode (when `spark.sql.ansi.enabled=true`) should throw an
/// ARITHMETIC_OVERFLOW error on integer overflow instead of wrapping.
/// This is not yet implemented - all operations currently use wrapping behavior.
///
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct SparkNegative {
signature: Signature,
}

impl Default for SparkNegative {
fn default() -> Self {
Self::new()
}
}

impl SparkNegative {
pub fn new() -> Self {
Self {
signature: Signature::numeric(1, Volatility::Immutable),
}
}
}

impl ScalarUDFImpl for SparkNegative {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"negative"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
Ok(arg_types[0].clone())
}

fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
spark_negative(&args.args)
}
}

/// Core implementation of Spark's negative function
fn spark_negative(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let [_] = take_function_args("negative", args)?;

match &args[0] {
ColumnarValue::Array(array) => match array.data_type() {
DataType::Null
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For UInt* arrays this returns the input unchanged, so negative becomes a no-op for unsigned columns, but unsigned scalars fall through to not_impl_err! — meaning behavior depends on whether the argument is scalar vs array.

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback:The Augment AI reviewer is correct! The unsigned integers are supported for arrays but not for scalars. This is inconsistent and support for scalars should be added. Prevents returning an error for using unsigned integers as scalar values.

| DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64 => Ok(args[0].clone()),
Comment on lines +92 to +96

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation for unsigned integer types (UInt*) in array form incorrectly returns the original value. For example, negative(1) would return 1. Since negating a positive unsigned integer is not a valid operation within its type, this should result in an error. This change corrects the behavior to return an execution error, which is more appropriate and aligns with expectations for such an operation.

Suggested change
DataType::Null
| DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64 => Ok(args[0].clone()),
DataType::Null => Ok(args[0].clone()),
DataType::UInt8
| DataType::UInt16
| DataType::UInt32
| DataType::UInt64 => {
return exec_err!("The operator 'negative' is not defined for unsigned integers.");
},

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-but-wont-fix; category:bug; feedback: The Gemini AI reviewer is correct! Doing nothing for unsigned integers would be surprising but none of the other DB solutions support this too. Spark does not support unsigned integers, Postgres/DuckDB do not support negative() function.

Comment on lines +92 to +96
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# First, let's see the file structure and read the negative.rs file
cd datafusion/spark/src/function/math || exit 1
wc -l negative.rs

Repository: martin-augment/datafusion

Length of output: 84


🏁 Script executed:

#!/bin/bash
# Read the array handling section (lines 85-105 to get context)
sed -n '85,105p' datafusion/spark/src/function/math/negative.rs

Repository: martin-augment/datafusion

Length of output: 943


🏁 Script executed:

#!/bin/bash
# Read the scalar handling section (lines 185-200 to see the scalar branch)
sed -n '185,200p' datafusion/spark/src/function/math/negative.rs

Repository: martin-augment/datafusion

Length of output: 717


🏁 Script executed:

#!/bin/bash
# Get all UInt cases in the scalar branch (lines 185-280)
sed -n '185,280p' datafusion/spark/src/function/math/negative.rs | grep -n "UInt\|DataType::" | head -30

Repository: martin-augment/datafusion

Length of output: 51


🏁 Script executed:

#!/bin/bash
# Check how other math UDFs handle unsigned types
ls -la datafusion/spark/src/function/math/*.rs | head -10

Repository: martin-augment/datafusion

Length of output: 1017


🏁 Script executed:

#!/bin/bash
# Search for patterns in other math functions (e.g., abs.rs, cast behavior)
rg -A 3 "UInt8|UInt16|UInt32|UInt64" datafusion/spark/src/function/math/ -g '*.rs' | head -50

Repository: martin-augment/datafusion

Length of output: 3246


🏁 Script executed:

#!/bin/bash
# Read the entire scalar branch section (line 192 to the end)
sed -n '192,280p' datafusion/spark/src/function/math/negative.rs

Repository: martin-augment/datafusion

Length of output: 3703


Add missing UInt cases to scalar branch for consistent behavior.*

Array branch (lines 92-96) passes unsigned integer types unchanged, but the scalar branch (lines 192+) lacks any ScalarValue::UInt* cases and falls through to not_impl_err!, creating inconsistent behavior. When a UInt array flows through, it returns unchanged; a UInt scalar fails instead. For consistency with other math functions like abs.rs, add scalar UInt handling:

ScalarValue::Int64(Some(v)) => {
    let result = v.wrapping_neg();
    Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(result))))
}

+ // Unsigned integers - return unchanged
+ ScalarValue::UInt8(_)
+ | ScalarValue::UInt16(_)
+ | ScalarValue::UInt32(_)
+ | ScalarValue::UInt64(_) => Ok(args[0].clone()),
+
// Floating point - simple negation
ScalarValue::Float16(Some(v)) => {
🤖 Prompt for AI Agents
In `@datafusion/spark/src/function/math/negative.rs` around lines 92 - 96, The
scalar match in negative.rs is missing arms for unsigned integer ScalarValue
variants, causing UInt scalars to fall through to not_impl_err! while UInt
arrays are returned unchanged; add match arms for ScalarValue::UInt8,
ScalarValue::UInt16, ScalarValue::UInt32, and ScalarValue::UInt64 to the scalar
branch (the same section that currently handles Int*/Float*/Null) and return
Ok(args[0].clone()) for each so scalar UInts behave consistently with the array
branch (mirror how abs.rs handles UInt scalar cases).

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback:The CodeRabbit AI reviewer is correct! The unsigned integers are supported for arrays but not for scalars. This is inconsistent and support for scalars should be added. Prevents returning an error for using unsigned integers as scalar values.


// Signed integers - use wrapping negation (Spark legacy mode behavior)
DataType::Int8 => {
let array = array.as_primitive::<Int8Type>();
let result: PrimitiveArray<Int8Type> = array.unary(|x| x.wrapping_neg());
Ok(ColumnarValue::Array(Arc::new(result)))
}
DataType::Int16 => {
let array = array.as_primitive::<Int16Type>();
let result: PrimitiveArray<Int16Type> = array.unary(|x| x.wrapping_neg());
Ok(ColumnarValue::Array(Arc::new(result)))
}
DataType::Int32 => {
let array = array.as_primitive::<Int32Type>();
let result: PrimitiveArray<Int32Type> = array.unary(|x| x.wrapping_neg());
Ok(ColumnarValue::Array(Arc::new(result)))
}
DataType::Int64 => {
let array = array.as_primitive::<Int64Type>();
let result: PrimitiveArray<Int64Type> = array.unary(|x| x.wrapping_neg());
Ok(ColumnarValue::Array(Arc::new(result)))
}

// Floating point - simple negation (no overflow possible)
DataType::Float16 => {
let array = array.as_primitive::<Float16Type>();
let result: PrimitiveArray<Float16Type> = array.unary(|x| -x);
Ok(ColumnarValue::Array(Arc::new(result)))
}
DataType::Float32 => {
let array = array.as_primitive::<Float32Type>();
let result: PrimitiveArray<Float32Type> = array.unary(|x| -x);
Ok(ColumnarValue::Array(Arc::new(result)))
}
DataType::Float64 => {
let array = array.as_primitive::<Float64Type>();
let result: PrimitiveArray<Float64Type> = array.unary(|x| -x);
Ok(ColumnarValue::Array(Arc::new(result)))
}

// Decimal types - wrapping negation
DataType::Decimal32(_, _) => {
let array = array.as_primitive::<Decimal32Type>();
let result: PrimitiveArray<Decimal32Type> =
array.unary(|x| x.wrapping_neg());
Ok(ColumnarValue::Array(Arc::new(result)))
}
DataType::Decimal64(_, _) => {
let array = array.as_primitive::<Decimal64Type>();
let result: PrimitiveArray<Decimal64Type> =
array.unary(|x| x.wrapping_neg());
Ok(ColumnarValue::Array(Arc::new(result)))
}
DataType::Decimal128(_, _) => {
let array = array.as_primitive::<Decimal128Type>();
let result: PrimitiveArray<Decimal128Type> =
array.unary(|x| x.wrapping_neg());
Ok(ColumnarValue::Array(Arc::new(result)))
}
DataType::Decimal256(_, _) => {
let array = array.as_primitive::<Decimal256Type>();
let result: PrimitiveArray<Decimal256Type> =
array.unary(|x| x.wrapping_neg());
Ok(ColumnarValue::Array(Arc::new(result)))
}

// interval type
DataType::Interval(arrow::datatypes::IntervalUnit::YearMonth) => {
let array = array.as_primitive::<IntervalYearMonthType>();
let result: PrimitiveArray<IntervalYearMonthType> =
array.unary(|x| x.wrapping_neg());
Ok(ColumnarValue::Array(Arc::new(result)))
}
DataType::Interval(arrow::datatypes::IntervalUnit::DayTime) => {
let array = array.as_primitive::<IntervalDayTimeType>();
let result: PrimitiveArray<IntervalDayTimeType> =
array.unary(|x| IntervalDayTime {
days: x.days.wrapping_neg(),
milliseconds: x.milliseconds.wrapping_neg(),
});
Ok(ColumnarValue::Array(Arc::new(result)))
}
DataType::Interval(arrow::datatypes::IntervalUnit::MonthDayNano) => {
let array = array.as_primitive::<IntervalMonthDayNanoType>();
let result: PrimitiveArray<IntervalMonthDayNanoType> =
array.unary(|x| IntervalMonthDayNano {
months: x.months.wrapping_neg(),
days: x.days.wrapping_neg(),
nanoseconds: x.nanoseconds.wrapping_neg(),
});
Ok(ColumnarValue::Array(Arc::new(result)))
}

dt => not_impl_err!("Not supported datatype for Spark NEGATIVE: {dt}"),
},
ColumnarValue::Scalar(sv) => match sv {
ScalarValue::Null => Ok(args[0].clone()),
sv if sv.is_null() => Ok(args[0].clone()),

// Signed integers - wrapping negation
ScalarValue::Int8(Some(v)) => {
let result = v.wrapping_neg();
Ok(ColumnarValue::Scalar(ScalarValue::Int8(Some(result))))
}
ScalarValue::Int16(Some(v)) => {
let result = v.wrapping_neg();
Ok(ColumnarValue::Scalar(ScalarValue::Int16(Some(result))))
}
ScalarValue::Int32(Some(v)) => {
let result = v.wrapping_neg();
Ok(ColumnarValue::Scalar(ScalarValue::Int32(Some(result))))
}
ScalarValue::Int64(Some(v)) => {
let result = v.wrapping_neg();
Ok(ColumnarValue::Scalar(ScalarValue::Int64(Some(result))))
}

// Floating point - simple negation
ScalarValue::Float16(Some(v)) => {
Ok(ColumnarValue::Scalar(ScalarValue::Float16(Some(-v))))
}
ScalarValue::Float32(Some(v)) => {
Ok(ColumnarValue::Scalar(ScalarValue::Float32(Some(-v))))
}
ScalarValue::Float64(Some(v)) => {
Ok(ColumnarValue::Scalar(ScalarValue::Float64(Some(-v))))
}

// Decimal types - wrapping negation
ScalarValue::Decimal32(Some(v), precision, scale) => {
let result = v.wrapping_neg();
Ok(ColumnarValue::Scalar(ScalarValue::Decimal32(
Some(result),
*precision,
*scale,
)))
}
ScalarValue::Decimal64(Some(v), precision, scale) => {
let result = v.wrapping_neg();
Ok(ColumnarValue::Scalar(ScalarValue::Decimal64(
Some(result),
*precision,
*scale,
)))
}
ScalarValue::Decimal128(Some(v), precision, scale) => {
let result = v.wrapping_neg();
Ok(ColumnarValue::Scalar(ScalarValue::Decimal128(
Some(result),
*precision,
*scale,
)))
}
ScalarValue::Decimal256(Some(v), precision, scale) => {
let result = v.wrapping_neg();
Ok(ColumnarValue::Scalar(ScalarValue::Decimal256(
Some(result),
*precision,
*scale,
)))
}

//interval type
ScalarValue::IntervalYearMonth(Some(v)) => Ok(ColumnarValue::Scalar(
ScalarValue::IntervalYearMonth(Some(v.wrapping_neg())),
)),
ScalarValue::IntervalDayTime(Some(v)) => Ok(ColumnarValue::Scalar(
ScalarValue::IntervalDayTime(Some(IntervalDayTime {
days: v.days.wrapping_neg(),
milliseconds: v.milliseconds.wrapping_neg(),
})),
)),
ScalarValue::IntervalMonthDayNano(Some(v)) => Ok(ColumnarValue::Scalar(
ScalarValue::IntervalMonthDayNano(Some(IntervalMonthDayNano {
months: v.months.wrapping_neg(),
days: v.days.wrapping_neg(),
nanoseconds: v.nanoseconds.wrapping_neg(),
})),
)),

dt => not_impl_err!("Not supported datatype for Spark NEGATIVE: {dt}"),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The scalar implementation for negative does not handle unsigned integer types, causing it to fall through to a not_impl_err!. This is inconsistent with the array implementation and not user-friendly. It should return a proper execution error. This change adds handling for unsigned scalar values to return an execution error, making the behavior consistent for both scalar and array inputs.

            ScalarValue::UInt8(Some(_))
            | ScalarValue::UInt16(Some(_))
            | ScalarValue::UInt32(Some(_))
            | ScalarValue::UInt64(Some(_)) => {
                return exec_err!("The operator 'negative' is not defined for unsigned integers.");
            }
            dt => not_impl_err!("Not supported datatype for Spark NEGATIVE: {dt}"),

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback:The Gemini AI reviewer is correct! The unsigned integers are supported for arrays but not for scalars. This is inconsistent and support for scalars should be added. Prevents returning an error for using unsigned integers as scalar values.

},
}
}
Loading