Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 43 additions & 26 deletions datafusion/physical-expr/src/expressions/case.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::expressions::case::literal_lookup_table::LiteralLookupTable;
use arrow::compute::kernels::merge::{MergeIndex, merge, merge_n};
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion};
use datafusion_physical_expr_common::datum::compare_with_eq;
use datafusion_physical_expr_common::utils::scatter;
use itertools::Itertools;
use std::fmt::{Debug, Formatter};

Expand Down Expand Up @@ -659,7 +660,7 @@ impl CaseExpr {
&& body.else_expr.as_ref().unwrap().as_any().is::<Literal>()
{
EvalMethod::ScalarOrScalar
} else if body.when_then_expr.len() == 1 && body.else_expr.is_some() {
} else if body.when_then_expr.len() == 1 {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change makes EvalMethod::ExpressionOrExpression apply even when else_expr is None (i.e. CASE WHEN … THEN … END). If that’s intentional, consider updating the ExpressionOrExpression doc comment to reflect that ELSE is now optional for this eval method.

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:bug; feedback: The Augment AI reviewer is correct! The docstring of ExpressionOrExpression mentions both the then and the else expressions. The proposed change makes the else expression optional and this requires an update of the docstring too.

EvalMethod::ExpressionOrExpression(body.project()?)
} else {
EvalMethod::NoExpression(body.project()?)
Expand Down Expand Up @@ -961,32 +962,40 @@ impl CaseBody {
let then_batch = filter_record_batch(batch, &when_filter)?;
let then_value = self.when_then_expr[0].1.evaluate(&then_batch)?;

let else_selection = not(&when_value)?;
let else_filter = create_filter(&else_selection, optimize_filter);
let else_batch = filter_record_batch(batch, &else_filter)?;

// keep `else_expr`'s data type and return type consistent
let e = self.else_expr.as_ref().unwrap();
let return_type = self.data_type(&batch.schema())?;
let else_expr = try_cast(Arc::clone(e), &batch.schema(), return_type.clone())
.unwrap_or_else(|_| Arc::clone(e));

let else_value = else_expr.evaluate(&else_batch)?;

Ok(ColumnarValue::Array(match (then_value, else_value) {
(ColumnarValue::Array(t), ColumnarValue::Array(e)) => {
merge(&when_value, &t, &e)
}
(ColumnarValue::Scalar(t), ColumnarValue::Array(e)) => {
merge(&when_value, &t.to_scalar()?, &e)
}
(ColumnarValue::Array(t), ColumnarValue::Scalar(e)) => {
merge(&when_value, &t, &e.to_scalar()?)
match &self.else_expr {
None => {
let then_array = then_value.to_array(when_value.true_count())?;
scatter(&when_value, then_array.as_ref()).map(ColumnarValue::Array)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new else_expr == None fast-path relies on scatter to reconstruct the full output, but I don’t see a test that specifically exercises this path when the then expression is not eligible for InfallibleExprOrNull. Consider adding a regression test for CASE WHEN <cond> THEN <non-infallible expr> END (or ELSE NULL) to cover mask/null semantics.

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Augment AI reviewer is correct! There are neither old unit tests for this case nor new ones. Prevents adding new functionality without test cases which could lead to unnoticed regressions in the future.

}
(ColumnarValue::Scalar(t), ColumnarValue::Scalar(e)) => {
merge(&when_value, &t.to_scalar()?, &e.to_scalar()?)
Some(else_expr) => {
let else_selection = not(&when_value)?;
let else_filter = create_filter(&else_selection, optimize_filter);
let else_batch = filter_record_batch(batch, &else_filter)?;

// keep `else_expr`'s data type and return type consistent
let return_type = self.data_type(&batch.schema())?;
let else_expr =
try_cast(Arc::clone(else_expr), &batch.schema(), return_type.clone())
.unwrap_or_else(|_| Arc::clone(else_expr));
Comment on lines +977 to +979

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using unwrap_or_else here to ignore a potential error from try_cast seems risky. If try_cast fails, it likely indicates a bug in the planner where types were not correctly coerced. By ignoring the error and using the original expression, a subsequent error might occur during merge which would be harder to debug.

Other parts of this file handle the result of try_cast using ? to propagate the error (e.g., lines 745, 848, 936). For consistency and robustness, it would be better to do the same here.

Suggested change
let else_expr =
try_cast(Arc::clone(else_expr), &batch.schema(), return_type.clone())
.unwrap_or_else(|_| Arc::clone(else_expr));
let else_expr = try_cast(Arc::clone(else_expr), &batch.schema(), return_type.clone())?;

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:bug; feedback: The Gemini AI reviewer is correct! The type of the else expression is expected to match the batch's schema, so there is no need to fallback.


let else_value = else_expr.evaluate(&else_batch)?;

Ok(ColumnarValue::Array(match (then_value, else_value) {
(ColumnarValue::Array(t), ColumnarValue::Array(e)) => {
merge(&when_value, &t, &e)
}
(ColumnarValue::Scalar(t), ColumnarValue::Array(e)) => {
merge(&when_value, &t.to_scalar()?, &e)
}
(ColumnarValue::Array(t), ColumnarValue::Scalar(e)) => {
merge(&when_value, &t, &e.to_scalar()?)
}
(ColumnarValue::Scalar(t), ColumnarValue::Scalar(e)) => {
merge(&when_value, &t.to_scalar()?, &e.to_scalar()?)
}
}?))
}
}?))
}
}
}

Expand Down Expand Up @@ -1137,7 +1146,15 @@ impl CaseExpr {
self.body.when_then_expr[0].1.evaluate(batch)
} else if true_count == 0 {
// All input rows are false/null, just call the 'else' expression
self.body.else_expr.as_ref().unwrap().evaluate(batch)
match &self.body.else_expr {
Some(else_expr) => else_expr.evaluate(batch),
None => {
let return_type = self.data_type(&batch.schema())?;
Ok(ColumnarValue::Scalar(ScalarValue::try_new_null(
&return_type,
)?))
}
}
} else if projected.projection.len() < batch.num_columns() {
// The case expressions do not use all the columns of the input batch.
// Project first to reduce time spent filtering.
Expand Down