Skip to content

Conversation

@xi-db
Copy link
Contributor

@xi-db xi-db commented Nov 5, 2025

What changes were proposed in this pull request?

Currently, Spark Connect enforce gRPC message limits on both the client and the server. These limits are largely meant to protect the server from potential OOMs by rejecting abnormally large messages. However, there are several cases where genuine messages exceed the limit and cause execution failures.

To improve Spark Connect stability, this PR implements compressing unresolved proto plans to mitigate the issue of oversized messages from the client to the server. The compression applies to ExecutePlan and AnalyzePlan - the only two methods that might hit the message limit. The other issue of message limit from the server to the client is a different issue, and it’s out of the scope (that one is already fixed in #52271).

In the implementation,

  • Zstandard is leveraged to compress proto plan as it has consistent high performance in our benchmark and achieves a good balance between compression ratio and performance.
  • The config spark.connect.maxPlanSize is introduced to control the maximum size of a (decompressed) proto plan that can be executed in Spark Connect. It is mainly used to avoid decompression bomb attacks.

(Scala client changes are being implemented in a follow-up PR.)

To reproduce the existing issue we are solving here, run this code on Spark Connect:

import random
import string

def random_letters(length: int) -> str:
    return ''.join(random.choices(string.ascii_letters, k=length))

num_unique_small_relations = 5
size_per_small_relation = 512 * 1024
small_dfs = [spark.createDataFrame([(random_letters(size_per_small_relation),)],) for _ in range(num_unique_small_relations)]
result_df = small_dfs[0]
for _ in range(512):
    result_df = result_df.unionByName(small_dfs[random.randint(0, len(small_dfs) - 1)])
result_df.collect()

It fails with StatusCode.RESOURCE_EXHAUSTED error with messageSent message larger than max (269178955 vs. 134217728), because the client was trying to send a too large message to the server.

Note: repeated small local relations is just one way causing a large plan, the size of the plan can also be contributed by repeated subtrees of plan transformations, serialized UDFs, captured external variables by UDFs, etc.

With the improvement introduced by the PR, the above code runs successfully and prints the expected result.

Why are the changes needed?

It improves Spark Connect stability when executing and analyzing large plans.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New tests on both the server side and the client side.

Was this patch authored or co-authored using generative AI tooling?

No.

xi-db added 6 commits October 2, 2025 12:43
# Conflicts:
#	python/pyspark/sql/connect/client/core.py
#	python/pyspark/sql/connect/proto/base_pb2.py
#	sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
@xi-db xi-db changed the title [SPARK-54194] Spark Connect Proto Plan Compression [SPARK-54194][CONNECT] Spark Connect Proto Plan Compression Nov 5, 2025
Comment on lines 908 to 917
"subClass" : {
"CANNOT_PARSE" : {
"message" : [
"Cannot decompress or parse the input plan (<errorMsg>)"
]
},
"PLAN_SIZE_LARGER_THAN_MAX" : {
"message" : [
"The plan size is larger than max (<planSize> vs. <maxPlanSize>)"
]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add more info here on what the user facing resolution is.
For CANNOT_PARSE, it would be to disable compression and for the large plan size, add some information on when these situations may occur and offer an alternative (e.g split/reduce operations, utilise temp views for intermediate materialization etc)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I've updated the message to include the possible resolution.

@github-actions github-actions bot added the BUILD label Nov 8, 2025
xi-db added 7 commits November 8, 2025 10:21
# Conflicts:
#	dev/requirements.txt
#	dev/spark-test-image/numpy-213/Dockerfile
#	dev/spark-test-image/python-310/Dockerfile
#	dev/spark-test-image/python-311/Dockerfile
#	dev/spark-test-image/python-312/Dockerfile
#	dev/spark-test-image/python-313-nogil/Dockerfile
#	dev/spark-test-image/python-313/Dockerfile
#	dev/spark-test-image/python-314/Dockerfile
#	dev/spark-test-image/python-minimum/Dockerfile
#	dev/spark-test-image/python-ps-minimum/Dockerfile
Copy link
Contributor

@hvanhovell hvanhovell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @xi-db and @hvanhovell .

May I ask why we choose 0.23 here?

Since this is a new dependency, can we use the latest bug-fixed stable version, 0.25.0 (Sep 14, 2025), instead of the old one, 0.23.0 (Jul 14, 2024)?

zstandard>=0.23.0

Technically, 0.24.0 is the first version to support Python 3.14 officially.

There are many changes since 0.23.0.

@xi-db
Copy link
Contributor Author

xi-db commented Nov 11, 2025

Thanks @dongjoon-hyun , there is no specific reason why 0.23.0 is chosen. It's simply I didn't realize the new stable version was released a few months ago.

I've updated to use the latest stable version 0.25.0. cc @hvanhovell

@hvanhovell
Copy link
Contributor

Merging to master/4.1. Thanks!

asf-gitbox-commits pushed a commit that referenced this pull request Nov 11, 2025
### What changes were proposed in this pull request?

Currently, Spark Connect enforce gRPC message limits on both the client and the server. These limits are largely meant to protect the server from potential OOMs by rejecting abnormally large messages. However, there are several cases where genuine messages exceed the limit and cause execution failures.

To improve Spark Connect stability, this PR implements compressing unresolved proto plans to mitigate the issue of oversized messages from the client to the server. The compression applies to ExecutePlan and AnalyzePlan - the only two methods that might hit the message limit. The other issue of message limit from the server to the client is a different issue, and it’s out of the scope (that one is already fixed in #52271).

In the implementation,

* Zstandard is leveraged to compress proto plan as it has consistent high performance in our benchmark and achieves a good balance between compression ratio and performance.
* The config `spark.connect.maxPlanSize` is introduced to control the maximum size of a (decompressed) proto plan that can be executed in Spark Connect. It is mainly used to avoid decompression bomb attacks.

(Scala client changes are being implemented in a follow-up PR.)

To reproduce the existing issue we are solving here, run this code on Spark Connect:

```
import random
import string

def random_letters(length: int) -> str:
    return ''.join(random.choices(string.ascii_letters, k=length))

num_unique_small_relations = 5
size_per_small_relation = 512 * 1024
small_dfs = [spark.createDataFrame([(random_letters(size_per_small_relation),)],) for _ in range(num_unique_small_relations)]
result_df = small_dfs[0]
for _ in range(512):
    result_df = result_df.unionByName(small_dfs[random.randint(0, len(small_dfs) - 1)])
result_df.collect()
```

It fails with `StatusCode.RESOURCE_EXHAUSTED` error with message`Sent message larger than max (269178955 vs. 134217728)`, because the client was trying to send a too large message to the server.

Note: repeated small local relations is just one way causing a large plan, the size of the plan can also be contributed by repeated subtrees of plan transformations, serialized UDFs, captured external variables by UDFs, etc.

With the improvement introduced by the PR, the above code runs successfully and prints the expected result.

### Why are the changes needed?

It improves Spark Connect stability when executing and analyzing large plans.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New tests on both the server side and the client side.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #52894 from xi-db/plan-compression.

Authored-by: Xi Lyu <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
(cherry picked from commit 0ccaacf)
Signed-off-by: Herman van Hovell <[email protected]>
@zhengruifeng
Copy link
Contributor

@xi-db
Copy link
Contributor Author

xi-db commented Nov 12, 2025

Thanks @zhengruifeng , I've just added the doc change to the followup PR: #53003.

@dongjoon-hyun
Copy link
Member

It seems that we missed MacOS CI. I made a PR to recover the CI, @xi-db , @hvanhovell .

dongjoon-hyun added a commit that referenced this pull request Nov 13, 2025
### What changes were proposed in this pull request?

This PR aims to recover MacOS CIs by installing `zstandard==0.25.0`.

### Why are the changes needed?

After SPARK-54194, `zstandard` Python package is required for `Connect`.
- #52894

Currently, MacOS CIs are broken.
- https://github.com/apache/spark/actions/workflows/build_python_3.11_macos26.yml
- https://github.com/apache/spark/actions/workflows/build_python_3.11_macos.yml

```
Traceback (most recent call last):
  File "/Users/runner/work/spark/spark/python/pyspark/sql/connect/utils.py", line 105, in require_minimum_zstandard_version
    import zstandard  # noqa
    ^^^^^^^^^^^^^^^^
ModuleNotFoundError: No module named 'zstandard'
```

### Does this PR introduce _any_ user-facing change?

No behavior change.

### How was this patch tested?

Pass the CIs and manual review because MacOS CIs are triggered daily CIs only.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #53025 from dongjoon-hyun/SPARK-54326.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
dongjoon-hyun added a commit that referenced this pull request Nov 13, 2025
### What changes were proposed in this pull request?

This PR aims to recover MacOS CIs by installing `zstandard==0.25.0`.

### Why are the changes needed?

After SPARK-54194, `zstandard` Python package is required for `Connect`.
- #52894

Currently, MacOS CIs are broken.
- https://github.com/apache/spark/actions/workflows/build_python_3.11_macos26.yml
- https://github.com/apache/spark/actions/workflows/build_python_3.11_macos.yml

```
Traceback (most recent call last):
  File "/Users/runner/work/spark/spark/python/pyspark/sql/connect/utils.py", line 105, in require_minimum_zstandard_version
    import zstandard  # noqa
    ^^^^^^^^^^^^^^^^
ModuleNotFoundError: No module named 'zstandard'
```

### Does this PR introduce _any_ user-facing change?

No behavior change.

### How was this patch tested?

Pass the CIs and manual review because MacOS CIs are triggered daily CIs only.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #53025 from dongjoon-hyun/SPARK-54326.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 5c16a73)
Signed-off-by: Dongjoon Hyun <[email protected]>
@xi-db
Copy link
Contributor Author

xi-db commented Nov 13, 2025

Thank you @dongjoon-hyun !

@dongjoon-hyun
Copy link
Member

Hi, @xi-db , @hvanhovell , @zhengruifeng . This seems to break our Python Classic CIs. I made a follow-up. Could you review the PR?

dongjoon-hyun added a commit that referenced this pull request Nov 13, 2025
… conditionally

### What changes were proposed in this pull request?

This PR is a follow-up of the following to fix `connectutils.py` to import `pb2` conditionally.
- #52894

### Why are the changes needed?

Currently, Python CIs are broken like the following.
- https://github.com/apache/spark/actions/workflows/build_python_3.11_classic_only.yml
    - https://github.com/apache/spark/actions/runs/19316448951/job/55248810741
- https://github.com/apache/spark/actions/workflows/build_python_3.12.yml
    - https://github.com/apache/spark/actions/runs/19275741458/job/55212353468

```
  File "/__w/spark/spark/python/pyspark/testing/connectutils.py", line 26, in <module>
    import pyspark.sql.connect.proto as pb2
  File "/__w/spark/spark/python/pyspark/sql/connect/proto/__init__.py", line 18, in <module>
    from pyspark.sql.connect.proto.base_pb2_grpc import *
  File "/__w/spark/spark/python/pyspark/sql/connect/proto/base_pb2_grpc.py", line 19, in <module>
    import grpc
ModuleNotFoundError: No module named 'grpc'
```

### Does this PR introduce _any_ user-facing change?

No behavior change. We has been importing `pyspark.sql.connect` conditionally before #52894 .

### How was this patch tested?

Pass the CIs and manual test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #53037 from dongjoon-hyun/SPARK-54194.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
dongjoon-hyun added a commit that referenced this pull request Nov 13, 2025
… conditionally

### What changes were proposed in this pull request?

This PR is a follow-up of the following to fix `connectutils.py` to import `pb2` conditionally.
- #52894

### Why are the changes needed?

Currently, Python CIs are broken like the following.
- https://github.com/apache/spark/actions/workflows/build_python_3.11_classic_only.yml
    - https://github.com/apache/spark/actions/runs/19316448951/job/55248810741
- https://github.com/apache/spark/actions/workflows/build_python_3.12.yml
    - https://github.com/apache/spark/actions/runs/19275741458/job/55212353468

```
  File "/__w/spark/spark/python/pyspark/testing/connectutils.py", line 26, in <module>
    import pyspark.sql.connect.proto as pb2
  File "/__w/spark/spark/python/pyspark/sql/connect/proto/__init__.py", line 18, in <module>
    from pyspark.sql.connect.proto.base_pb2_grpc import *
  File "/__w/spark/spark/python/pyspark/sql/connect/proto/base_pb2_grpc.py", line 19, in <module>
    import grpc
ModuleNotFoundError: No module named 'grpc'
```

### Does this PR introduce _any_ user-facing change?

No behavior change. We has been importing `pyspark.sql.connect` conditionally before #52894 .

### How was this patch tested?

Pass the CIs and manual test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #53037 from dongjoon-hyun/SPARK-54194.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 63bcc87)
Signed-off-by: Dongjoon Hyun <[email protected]>
asf-gitbox-commits pushed a commit that referenced this pull request Nov 14, 2025
… - Scala Client

### What changes were proposed in this pull request?

In the previous PR #52894 of Spark Connect Proto Plan Compression, both Server-side and PySpark client changes were implemented.

In this PR, the corresponding Scala client changes are implemented, so plan compression are now supported on the Scala client as well.

To reproduce the existing issue we are solving here, run this code on Spark Connect Scala client:
```
import scala.util.Random
import org.apache.spark.sql.DataFrame
import spark.implicits._

def randomLetters(n: Int): String = {
  Iterator.continually(Random.nextPrintableChar())
    .filter(_.isLetter)
    .take(n)
    .mkString
}

val numUniqueSmallRelations = 5
val sizePerSmallRelation = 512 * 1024
val smallDfs: Seq[DataFrame] =
  (0 until numUniqueSmallRelations).map { _ =>
    Seq(randomLetters(sizePerSmallRelation)).toDF("value")
  }

var resultDf = smallDfs.head
for (_ <- 0 until 500) {
  val idx = Random.nextInt(smallDfs.length)
  resultDf = resultDf.unionByName(smallDfs(idx))
}

resultDf.collect()
```
It fails with RESOURCE_EXHAUSTED error with message `gRPC message exceeds maximum size 134217728: 269207219`, because the server is trying to send an ExecutePlanResponse of ~260MB to the client.

With the improvement introduced by the PR, the above code runs successfully and prints the expected result.

### Why are the changes needed?

It improves Spark Connect stability when handling large plans.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #53003 from xi-db/plan-compression-scala-client.

Authored-by: Xi Lyu <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
asf-gitbox-commits pushed a commit that referenced this pull request Nov 14, 2025
… - Scala Client

### What changes were proposed in this pull request?

In the previous PR #52894 of Spark Connect Proto Plan Compression, both Server-side and PySpark client changes were implemented.

In this PR, the corresponding Scala client changes are implemented, so plan compression are now supported on the Scala client as well.

To reproduce the existing issue we are solving here, run this code on Spark Connect Scala client:
```
import scala.util.Random
import org.apache.spark.sql.DataFrame
import spark.implicits._

def randomLetters(n: Int): String = {
  Iterator.continually(Random.nextPrintableChar())
    .filter(_.isLetter)
    .take(n)
    .mkString
}

val numUniqueSmallRelations = 5
val sizePerSmallRelation = 512 * 1024
val smallDfs: Seq[DataFrame] =
  (0 until numUniqueSmallRelations).map { _ =>
    Seq(randomLetters(sizePerSmallRelation)).toDF("value")
  }

var resultDf = smallDfs.head
for (_ <- 0 until 500) {
  val idx = Random.nextInt(smallDfs.length)
  resultDf = resultDf.unionByName(smallDfs(idx))
}

resultDf.collect()
```
It fails with RESOURCE_EXHAUSTED error with message `gRPC message exceeds maximum size 134217728: 269207219`, because the server is trying to send an ExecutePlanResponse of ~260MB to the client.

With the improvement introduced by the PR, the above code runs successfully and prints the expected result.

### Why are the changes needed?

It improves Spark Connect stability when handling large plans.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #53003 from xi-db/plan-compression-scala-client.

Authored-by: Xi Lyu <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
(cherry picked from commit 6cb88c1)
Signed-off-by: Herman van Hovell <[email protected]>
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @xi-db and @hvanhovell .

Could you confirm that Apache Spark 3.5 Spark Connect client still can talk with Apache Spark 4.1.0 Spark Connect Server by default without any additional setting?

.internal()
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(ConnectPlanCompressionAlgorithm.values.map(_.toString))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to unset this feature? It seems that we enforce to use ZSTD because it's the only one, @xi-db and @hvanhovell ?

object ConnectPlanCompressionAlgorithm extends Enumeration {
  val ZSTD = Value
}

@dongjoon-hyun
Copy link
Member

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants