[SPARK-54367][Connect]Propagate close() from SessionState to CatalogPlugins to prevent leak #53078
+92
−4
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
To fix this, I added the following changes:
CatalogPlugininterface extendsjava.io.Closeableclose()method inCatalogManagerthat iterates through all registered catalogs and calls theirclose()methodSessionStateimplementsCloseableand callscatalogManager.close()from itsclose()method.session.sessionState.close()fromSessionHolder.close()when a Spark Connect session is stoppedAbove changes create a clean lifecycle for catalogs when a session ended, a
close()call is propagated down the chain which allow each catalog to release its resources.Why are the changes needed?
Spark Connect server is leaking
SparkSessionobjects each time a client connects and disconnects when dealing with Apache Iceberg ([Apache Iceberg PR|https://github.com/apache/iceberg/pull/14590]).The
SessionHolder.close()method in Spark Connect is responsible for cleaning up a session. It does perform some cleanup such as artifacts and streaming queries but it doesn't perform cleanup on the mainSessionState. This is where theCatalogManagerlives which holds reference toRESTCatalogandS3FileIO. Since theSessionStateis never closed, theseCloseablecatalogs are never closed and their threads leak.Does this PR introduce any user-facing change?
N/A
How was this patch tested?
I have a local setup which can easily reproduce this issue. Here is setup details:
REST catalog: Apache Polaris (created the basic polaris entities via getting start example)
Spark Connect server:
Spark Connect client: install public released apache spark package via pip
Testing config:
spark.connect.session.manager.defaultSessionTimeoutfrom default60mto1mTesting:
org.apache.spark.sql.classic.SparkSessionandorg.apache.spark.sql.internal.SessionStateclose()on spark session implicitly:org.apache.spark.sql.classic.SparkSessionandorg.apache.spark.sql.internal.SessionStateagain and noticed resources are not getting cleanuporg.apache.spark.sql.classic.SparkSessionandorg.apache.spark.sql.internal.SessionStateagain. We will noticed the instances of these classes (along with many others) are not getting cleanup with current code. Also, heap usage will stay high and not able to garbage collected.Was this patch authored or co-authored using generative AI tooling?
No