Skip to content

Conversation

cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Jun 23, 2025

What changes were proposed in this pull request?

This is an extension of #47611 . It's impossible to translate all catalyst expressions returning boolean type into v2 Predicate, as the return type of a catalyst expression can be dynamic, and for example we can't make v2 Cast to extend Predicate only when it returns boolean type.

This PR adds a new type of v2 Predicate: BOOLEAN_EXPRESSION. It's a simple wrapper over any expression that returns boolean type. By doing so, Spark can push down any catalyst expression that returns boolean type as predicates.

Why are the changes needed?

To pushdown more v2 predicates.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

updated test cases in PushablePredicateSuite

Was this patch authored or co-authored using generative AI tooling?

no

@gengliangwang
Copy link
Member

@cloud-fan thanks for making the PR.
FYI @aokolnychyi and I discussed on this offline. We are thinking about adding a new method or trait for the expressions which can be constant folding without context

  • example of constant folding without context:1 > 0, cast(null as boolean)
  • example of constant folding with context: current_date(), current_catalog(), cast('2020-01-01 00:00:00' as timestamp) (requires context of time zone)

We can always evaluate the constant-folding-without-context expressions before passing it to V2. WDYT?

@HyukjinKwon
Copy link
Member

Oh just read the comment. I will leave it to you @gengliangwang and @cloud-fan

@cloud-fan
Copy link
Contributor Author

@gengliangwang I don't think #51282 can guarantee that any boolean catalyst expression can be translated to v2 Predicate properly. Cast and null boolean literal are just examples. What do you think?

if (isPredicate) {
val translated = build()
val translated0 = build()
val conf = SQLConf.get
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hold the SQLConf will lead to unable to obtain real-time changes to SQLConf.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is within a single method, I don't think we want to be that dynamic. And in practise this should be run within the same session so it won't change.

val translated0 = build()
val conf = SQLConf.get
val alwaysCreateV2Predicate = conf.getConf(SQLConf.DATA_SOURCE_ALWAYS_CREATE_V2_PREDICATE)
val translated = if (alwaysCreateV2Predicate && isPredicate && e.dataType == BooleanType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove isPredicate here.


public Predicate(String name, Expression[] children) {
super(name, children);
if ("BOOLEAN_EXPRESSION".equals(name)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use final to modify "BOOLEAN_EXPRESSION".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all other functions names are hardcoded and we just match the string literal in the code.

Copy link
Contributor

@beliefer beliefer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except a minor comment.

@cloud-fan
Copy link
Contributor Author

thanks for the review, merging to master!

@cloud-fan cloud-fan closed this in 6f54429 Jun 30, 2025
withSQLConf(SQLConf.DATA_SOURCE_DONT_ASSERT_ON_PREDICATE.key -> "false") {
intercept[java.lang.AssertionError] {
PushablePredicate.unapply(Literal.create("string"))
test("non-trivial boolean expression") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like it fails when ANSI is off:

2025-07-01T01:53:41.0866397Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m- non-trivial boolean expression *** FAILED *** (21 milliseconds)�[0m�[0m
2025-07-01T01:53:41.0867934Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  pushable.isDefined was false (PushablePredicateSuite.scala:73)�[0m�[0m
2025-07-01T01:53:41.0869296Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  org.scalatest.exceptions.TestFailedException:�[0m�[0m
2025-07-01T01:53:41.0896382Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)�[0m�[0m
2025-07-01T01:53:41.0927708Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)�[0m�[0m
2025-07-01T01:53:41.0929141Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)�[0m�[0m
2025-07-01T01:53:41.0930649Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)�[0m�[0m
2025-07-01T01:53:41.0932361Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.connector.PushablePredicateSuite.$anonfun$new$13(PushablePredicateSuite.scala:73)�[0m�[0m
2025-07-01T01:53:41.0934090Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.SQLConfHelper.withSQLConf(SQLConfHelper.scala:56)�[0m�[0m
2025-07-01T01:53:41.0935881Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.catalyst.SQLConfHelper.withSQLConf$(SQLConfHelper.scala:38)�[0m�[0m
2025-07-01T01:53:41.0938224Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.connector.PushablePredicateSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(PushablePredicateSuite.scala:28)�[0m�[0m
2025-07-01T01:53:41.0940417Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:253)�[0m�[0m
2025-07-01T01:53:41.0941802Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:251)�[0m�[0m
2025-07-01T01:53:41.0943319Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.connector.PushablePredicateSuite.withSQLConf(PushablePredicateSuite.scala:28)�[0m�[0m
2025-07-01T01:53:41.0995346Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.connector.PushablePredicateSuite.$anonfun$new$12(PushablePredicateSuite.scala:69)�[0m�[0m
2025-07-01T01:53:41.0997347Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.connector.PushablePredicateSuite.$anonfun$new$12$adapted(PushablePredicateSuite.scala:66)�[0m�[0m
2025-07-01T01:53:41.0998896Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at scala.collection.immutable.List.foreach(List.scala:334)�[0m�[0m
2025-07-01T01:53:41.1000441Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.connector.PushablePredicateSuite.$anonfun$new$11(PushablePredicateSuite.scala:66)�[0m�[0m
2025-07-01T01:53:41.1002318Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.connector.PushablePredicateSuite.$anonfun$new$11$adapted(PushablePredicateSuite.scala:65)�[0m�[0m
2025-07-01T01:53:41.1003808Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at scala.collection.immutable.List.foreach(List.scala:334)�[0m�[0m
2025-07-01T01:53:41.1005547Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.sql.connector.PushablePredicateSuite.$anonfun$new$10(PushablePredicateSuite.scala:65)�[0m�[0m
2025-07-01T01:53:41.1007134Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)�[0m�[0m
2025-07-01T01:53:41.1008445Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)�[0m�[0m
2025-07-01T01:53:41.1009782Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)�[0m�[0m
2025-07-01T01:53:41.1011135Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)�[0m�[0m
2025-07-01T01:53:41.1012467Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)�[0m�[0m
2025-07-01T01:53:41.1013779Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)�[0m�[0m
2025-07-01T01:53:41.1015275Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)�[0m�[0m
2025-07-01T01:53:41.1016475Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)�[0m�[0m
2025-07-01T01:53:41.1017584Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)�[0m�[0m
2025-07-01T01:53:41.1018705Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)�[0m�[0m
2025-07-01T01:53:41.1019812Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.Transformer.apply(Transformer.scala:22)�[0m�[0m
2025-07-01T01:53:41.1020878Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.Transformer.apply(Transformer.scala:20)�[0m�[0m
2025-07-01T01:53:41.1022135Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)�[0m�[0m
2025-07-01T01:53:41.1023509Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)�[0m�[0m
2025-07-01T01:53:41.1025129Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)�[0m�[0m
2025-07-01T01:53:41.1040426Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)�[0m�[0m
2025-07-01T01:53:41.1042058Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)�[0m�[0m
2025-07-01T01:53:41.1043439Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)�[0m�[0m
2025-07-01T01:53:41.1045114Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)�[0m�[0m
2025-07-01T01:53:41.1046784Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)�[0m�[0m
2025-07-01T01:53:41.1048366Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)�[0m�[0m
2025-07-01T01:53:41.1049792Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)�[0m�[0m
2025-07-01T01:53:41.1051094Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:69)�[0m�[0m
2025-07-01T01:53:41.1052533Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)�[0m�[0m
2025-07-01T01:53:41.1053958Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)�[0m�[0m
2025-07-01T01:53:41.1055463Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at scala.collection.immutable.List.foreach(List.scala:334)�[0m�[0m
2025-07-01T01:53:41.1056718Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)�[0m�[0m
2025-07-01T01:53:41.1057981Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)�[0m�[0m
2025-07-01T01:53:41.1059186Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)�[0m�[0m
2025-07-01T01:53:41.1060522Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)�[0m�[0m
2025-07-01T01:53:41.1061986Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)�[0m�[0m
2025-07-01T01:53:41.1063360Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)�[0m�[0m
2025-07-01T01:53:41.1064491Z �[0m[�[0m�[0minfo�[0m] �[0m�[0m�[31m  at org.scalatest.Suite.run(Suite.scala:1114)�[0m�[0m

https://github.com/apache/spark/actions/runs/15987607479/job/45094925023

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, let me force enable ansi for this suite

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay ,, it can be only pushed when ANSI is on. Let me just enable aNSI for tihs test case:

    case Cast(child, dataType, _, evalMode)
        if evalMode == EvalMode.ANSI || Cast.canUpCast(child.dataType, dataType) =>
      generateExpression(child).map(v => new V2Cast(v, child.dataType, dataType))
      ```

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HyukjinKwon pushed a commit that referenced this pull request Jul 1, 2025
### What changes were proposed in this pull request?

This is a followup of #51247 to fix tests in the non ANSI build

### Why are the changes needed?

fix test

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

N/A

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #51330

Closes #51329 from cloud-fan/follow.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
asl3 pushed a commit to asl3/spark that referenced this pull request Jul 14, 2025
### What changes were proposed in this pull request?

This is an extension of apache#47611 . It's impossible to translate all catalyst expressions returning boolean type into v2 `Predicate`, as the return type of a catalyst expression can be dynamic, and for example we can't make v2 `Cast` to extend `Predicate` only when it returns boolean type.

This PR adds a new type of v2 `Predicate`: `BOOLEAN_EXPRESSION`. It's a simple wrapper over any expression that returns boolean type. By doing so, Spark can push down any catalyst expression that returns boolean type as predicates.

### Why are the changes needed?

To pushdown more v2 predicates.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

updated test cases in `PushablePredicateSuite`

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#51247 from cloud-fan/v2predicate.

Lead-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
asl3 pushed a commit to asl3/spark that referenced this pull request Jul 14, 2025
### What changes were proposed in this pull request?

This is a followup of apache#51247 to fix tests in the non ANSI build

### Why are the changes needed?

fix test

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

N/A

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#51330

Closes apache#51329 from cloud-fan/follow.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
haoyangeng-db pushed a commit to haoyangeng-db/apache-spark that referenced this pull request Jul 22, 2025
### What changes were proposed in this pull request?

This is an extension of apache#47611 . It's impossible to translate all catalyst expressions returning boolean type into v2 `Predicate`, as the return type of a catalyst expression can be dynamic, and for example we can't make v2 `Cast` to extend `Predicate` only when it returns boolean type.

This PR adds a new type of v2 `Predicate`: `BOOLEAN_EXPRESSION`. It's a simple wrapper over any expression that returns boolean type. By doing so, Spark can push down any catalyst expression that returns boolean type as predicates.

### Why are the changes needed?

To pushdown more v2 predicates.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

updated test cases in `PushablePredicateSuite`

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#51247 from cloud-fan/v2predicate.

Lead-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
haoyangeng-db pushed a commit to haoyangeng-db/apache-spark that referenced this pull request Jul 22, 2025
### What changes were proposed in this pull request?

This is a followup of apache#51247 to fix tests in the non ANSI build

### Why are the changes needed?

fix test

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

N/A

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#51330

Closes apache#51329 from cloud-fan/follow.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants