Skip to content

Commit 0ccaacf

Browse files
xi-dbhvanhovell
authored andcommitted
[SPARK-54194][CONNECT] Spark Connect Proto Plan Compression
### What changes were proposed in this pull request? Currently, Spark Connect enforce gRPC message limits on both the client and the server. These limits are largely meant to protect the server from potential OOMs by rejecting abnormally large messages. However, there are several cases where genuine messages exceed the limit and cause execution failures. To improve Spark Connect stability, this PR implements compressing unresolved proto plans to mitigate the issue of oversized messages from the client to the server. The compression applies to ExecutePlan and AnalyzePlan - the only two methods that might hit the message limit. The other issue of message limit from the server to the client is a different issue, and it’s out of the scope (that one is already fixed in #52271). In the implementation, * Zstandard is leveraged to compress proto plan as it has consistent high performance in our benchmark and achieves a good balance between compression ratio and performance. * The config `spark.connect.maxPlanSize` is introduced to control the maximum size of a (decompressed) proto plan that can be executed in Spark Connect. It is mainly used to avoid decompression bomb attacks. (Scala client changes are being implemented in a follow-up PR.) To reproduce the existing issue we are solving here, run this code on Spark Connect: ``` import random import string def random_letters(length: int) -> str: return ''.join(random.choices(string.ascii_letters, k=length)) num_unique_small_relations = 5 size_per_small_relation = 512 * 1024 small_dfs = [spark.createDataFrame([(random_letters(size_per_small_relation),)],) for _ in range(num_unique_small_relations)] result_df = small_dfs[0] for _ in range(512): result_df = result_df.unionByName(small_dfs[random.randint(0, len(small_dfs) - 1)]) result_df.collect() ``` It fails with `StatusCode.RESOURCE_EXHAUSTED` error with message`Sent message larger than max (269178955 vs. 134217728)`, because the client was trying to send a too large message to the server. Note: repeated small local relations is just one way causing a large plan, the size of the plan can also be contributed by repeated subtrees of plan transformations, serialized UDFs, captured external variables by UDFs, etc. With the improvement introduced by the PR, the above code runs successfully and prints the expected result. ### Why are the changes needed? It improves Spark Connect stability when executing and analyzing large plans. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New tests on both the server side and the client side. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52894 from xi-db/plan-compression. Authored-by: Xi Lyu <[email protected]> Signed-off-by: Herman van Hovell <[email protected]>
1 parent 2d16bae commit 0ccaacf

File tree

30 files changed

+889
-223
lines changed

30 files changed

+889
-223
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -906,6 +906,28 @@
906906
},
907907
"sqlState" : "56K00"
908908
},
909+
"CONNECT_INVALID_PLAN" : {
910+
"message" : [
911+
"The Spark Connect plan is invalid."
912+
],
913+
"subClass" : {
914+
"CANNOT_PARSE" : {
915+
"message" : [
916+
"Cannot decompress or parse the input plan (<errorMsg>)",
917+
"This may be caused by a corrupted compressed plan.",
918+
"To disable plan compression, set 'spark.connect.session.planCompression.threshold' to -1."
919+
]
920+
},
921+
"PLAN_SIZE_LARGER_THAN_MAX" : {
922+
"message" : [
923+
"The plan size is larger than max (<planSize> vs. <maxPlanSize>)",
924+
"This typically occurs when building very complex queries with many operations, large literals, or deeply nested expressions.",
925+
"Consider splitting the query into smaller parts using temporary views for intermediate results or reducing the number of operations."
926+
]
927+
}
928+
},
929+
"sqlState" : "56K00"
930+
},
909931
"CONNECT_ML" : {
910932
"message" : [
911933
"Generic Spark Connect ML error."

dev/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ grpcio>=1.76.0
6565
grpcio-status>=1.76.0
6666
googleapis-common-protos>=1.71.0
6767
protobuf==6.33.0
68+
zstandard>=0.25.0
6869

6970
# Spark Connect python proto generation plugin (optional)
7071
mypy-protobuf==3.3.0

dev/spark-test-image/lint/Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ RUN python3.11 -m pip install \
8484
'grpc-stubs==1.24.11' \
8585
'grpcio-status==1.76.0' \
8686
'grpcio==1.76.0' \
87+
'zstandard==0.25.0' \
8788
'ipython' \
8889
'ipython_genutils' \
8990
'jinja2' \

dev/spark-test-image/numpy-213/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ RUN apt-get update && apt-get install -y \
7171
# Pin numpy==2.1.3
7272
ARG BASIC_PIP_PKGS="numpy==2.1.3 pyarrow>=22.0.0 six==1.16.0 pandas==2.2.3 scipy plotly<6.0.0 mlflow>=2.8.1 coverage matplotlib openpyxl memory-profiler>=0.61.0 scikit-learn>=1.3.2"
7373
# Python deps for Spark Connect
74-
ARG CONNECT_PIP_PKGS="grpcio==1.76.0 grpcio-status==1.76.0 protobuf==6.33.0 googleapis-common-protos==1.71.0 graphviz==0.20.3"
74+
ARG CONNECT_PIP_PKGS="grpcio==1.76.0 grpcio-status==1.76.0 protobuf==6.33.0 googleapis-common-protos==1.71.0 zstandard==0.25.0 graphviz==0.20.3"
7575

7676
# Install Python 3.11 packages
7777
RUN curl -sS https://bootstrap.pypa.io/get-pip.py | python3.11

dev/spark-test-image/python-310/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ RUN apt-get update && apt-get install -y \
6666

6767
ARG BASIC_PIP_PKGS="numpy pyarrow>=22.0.0 six==1.16.0 pandas==2.3.3 scipy plotly<6.0.0 mlflow>=2.8.1 coverage matplotlib openpyxl memory-profiler>=0.61.0 scikit-learn>=1.3.2"
6868
# Python deps for Spark Connect
69-
ARG CONNECT_PIP_PKGS="grpcio==1.76.0 grpcio-status==1.76.0 protobuf==6.33.0 googleapis-common-protos==1.71.0 graphviz==0.20.3"
69+
ARG CONNECT_PIP_PKGS="grpcio==1.76.0 grpcio-status==1.76.0 protobuf==6.33.0 googleapis-common-protos==1.71.0 zstandard==0.25.0 graphviz==0.20.3"
7070

7171
# Install Python 3.10 packages
7272
RUN curl -sS https://bootstrap.pypa.io/get-pip.py | python3.10

dev/spark-test-image/python-311/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ RUN apt-get update && apt-get install -y \
7070

7171
ARG BASIC_PIP_PKGS="numpy pyarrow>=22.0.0 six==1.16.0 pandas==2.3.3 scipy plotly<6.0.0 mlflow>=2.8.1 coverage matplotlib openpyxl memory-profiler>=0.61.0 scikit-learn>=1.3.2"
7272
# Python deps for Spark Connect
73-
ARG CONNECT_PIP_PKGS="grpcio==1.76.0 grpcio-status==1.76.0 protobuf==6.33.0 googleapis-common-protos==1.71.0 graphviz==0.20.3"
73+
ARG CONNECT_PIP_PKGS="grpcio==1.76.0 grpcio-status==1.76.0 protobuf==6.33.0 googleapis-common-protos==1.71.0 zstandard==0.25.0 graphviz==0.20.3"
7474

7575
# Install Python 3.11 packages
7676
RUN curl -sS https://bootstrap.pypa.io/get-pip.py | python3.11

dev/spark-test-image/python-312/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ RUN apt-get update && apt-get install -y \
7070

7171
ARG BASIC_PIP_PKGS="numpy pyarrow>=22.0.0 six==1.16.0 pandas==2.3.3 scipy plotly<6.0.0 mlflow>=2.8.1 coverage matplotlib openpyxl memory-profiler>=0.61.0 scikit-learn>=1.3.2"
7272
# Python deps for Spark Connect
73-
ARG CONNECT_PIP_PKGS="grpcio==1.76.0 grpcio-status==1.76.0 protobuf==6.33.0 googleapis-common-protos==1.71.0 graphviz==0.20.3"
73+
ARG CONNECT_PIP_PKGS="grpcio==1.76.0 grpcio-status==1.76.0 protobuf==6.33.0 googleapis-common-protos==1.71.0 zstandard==0.25.0 graphviz==0.20.3"
7474

7575
# Install Python 3.12 packages
7676
RUN curl -sS https://bootstrap.pypa.io/get-pip.py | python3.12

dev/spark-test-image/python-313-nogil/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ RUN apt-get update && apt-get install -y \
6969

7070

7171
ARG BASIC_PIP_PKGS="numpy pyarrow>=22.0.0 six==1.16.0 pandas==2.3.3 scipy plotly<6.0.0 mlflow>=2.8.1 coverage matplotlib openpyxl memory-profiler>=0.61.0 scikit-learn>=1.3.2"
72-
ARG CONNECT_PIP_PKGS="grpcio==1.76.0 grpcio-status==1.76.0 protobuf==6.33.0 googleapis-common-protos==1.71.0 graphviz==0.20.3"
72+
ARG CONNECT_PIP_PKGS="grpcio==1.76.0 grpcio-status==1.76.0 protobuf==6.33.0 googleapis-common-protos==1.71.0 zstandard==0.25.0 graphviz==0.20.3"
7373

7474

7575
# Install Python 3.13 packages

dev/spark-test-image/python-313/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ RUN apt-get update && apt-get install -y \
7070

7171
ARG BASIC_PIP_PKGS="numpy pyarrow>=22.0.0 six==1.16.0 pandas==2.3.3 scipy plotly<6.0.0 mlflow>=2.8.1 coverage matplotlib openpyxl memory-profiler>=0.61.0 scikit-learn>=1.3.2"
7272
# Python deps for Spark Connect
73-
ARG CONNECT_PIP_PKGS="grpcio==1.76.0 grpcio-status==1.76.0 protobuf==6.33.0 googleapis-common-protos==1.71.0 graphviz==0.20.3"
73+
ARG CONNECT_PIP_PKGS="grpcio==1.76.0 grpcio-status==1.76.0 protobuf==6.33.0 googleapis-common-protos==1.71.0 zstandard==0.25.0 graphviz==0.20.3"
7474

7575
# Install Python 3.13 packages
7676
RUN curl -sS https://bootstrap.pypa.io/get-pip.py | python3.13

dev/spark-test-image/python-314/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ RUN apt-get update && apt-get install -y \
7070

7171
ARG BASIC_PIP_PKGS="numpy pyarrow>=22.0.0 six==1.16.0 pandas==2.3.3 scipy plotly<6.0.0 mlflow>=2.8.1 coverage matplotlib openpyxl memory-profiler>=0.61.0 scikit-learn>=1.3.2"
7272
# Python deps for Spark Connect
73-
ARG CONNECT_PIP_PKGS="grpcio==1.76.0 grpcio-status==1.76.0 protobuf==6.33.0 googleapis-common-protos==1.71.0 graphviz==0.20.3"
73+
ARG CONNECT_PIP_PKGS="grpcio==1.76.0 grpcio-status==1.76.0 protobuf==6.33.0 googleapis-common-protos==1.71.0 zstandard==0.25.0 graphviz==0.20.3"
7474

7575
# Install Python 3.14 packages
7676
RUN curl -sS https://bootstrap.pypa.io/get-pip.py | python3.14

0 commit comments

Comments
 (0)