Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.connector.catalog;

import java.io.Closeable;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
Expand All @@ -42,7 +44,7 @@
* @since 3.0.0
*/
@Evolving
public interface CatalogPlugin {
public interface CatalogPlugin extends Closeable {
/**
* Called to initialize configuration.
* <p>
Expand Down Expand Up @@ -74,4 +76,7 @@ public interface CatalogPlugin {
default String[] defaultNamespace() {
return new String[0];
}

@Override
default void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ object ResolvedIdentifier {
object FakeSystemCatalog extends CatalogPlugin {
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {}
override def name(): String = "system"
override def close(): Unit = {}
}

/**
Expand All @@ -273,4 +274,5 @@ object FakeSystemCatalog extends CatalogPlugin {
object FakeLocalCatalog extends CatalogPlugin {
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {}
override def name(): String = "local"
override def close(): Unit = {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.connector.catalog

import java.io.Closeable

import scala.collection.mutable

import org.apache.spark.internal.Logging
Expand All @@ -40,7 +42,7 @@ import org.apache.spark.sql.internal.SQLConf
private[sql]
class CatalogManager(
defaultSessionCatalog: CatalogPlugin,
val v1SessionCatalog: SessionCatalog) extends SQLConfHelper with Logging {
val v1SessionCatalog: SessionCatalog) extends SQLConfHelper with Logging with Closeable {
import CatalogManager.SESSION_CATALOG_NAME
import CatalogV2Util._

Expand All @@ -57,6 +59,21 @@ class CatalogManager(
}
}

override def close(): Unit = synchronized {
val allCatalogs = (catalogs.values.toSet + defaultSessionCatalog).toSeq
allCatalogs.foreach {
case c: Closeable =>
try {
c.close()
} catch {
case e: Throwable =>
logWarning(s"Failed to close catalog of class ${c.getClass.getName}", e)
}
case _ =>
}
catalogs.clear()
}

def isCatalogRegistered(name: String): Boolean = {
try {
catalog(name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,18 @@ class CatalogManagerSuite extends SparkFunSuite with SQLHelper {
}
}
}

test("CatalogManager.close should close all closeable catalogs") {
val catalogManager = new CatalogManager(FakeV2SessionCatalog, createSessionCatalog())
withSQLConf("spark.sql.catalog.dummy" -> classOf[DummyCatalog].getName,
"spark.sql.catalog.closeable" -> classOf[CloseableCatalog].getName) {
catalogManager.setCurrentCatalog("dummy")
val closeable = catalogManager.catalog("closeable").asInstanceOf[CloseableCatalog]
assert(!closeable.isClosed)
catalogManager.close()
assert(closeable.isClosed)
}
}
}

class DummyCatalog extends CatalogPlugin {
Expand All @@ -136,4 +148,13 @@ class DummyCatalog extends CatalogPlugin {
private var _name: String = null
override def name(): String = _name
override def defaultNamespace(): Array[String] = Array("a", "b")
override def close(): Unit = {}
}

class CloseableCatalog extends DummyCatalog with java.io.Closeable {
private var closed = false
override def close(): Unit = {
closed = true
}
def isClosed: Boolean = closed
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
private case class DummyCatalogPlugin(override val name: String) extends CatalogPlugin {

override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = ()
override def close(): Unit = {}
}

class LookupCatalogSuite extends SparkFunSuite with LookupCatalog with Inside {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio

mlCache.clear()

session.sessionState.close()
session.cleanupPythonWorkerLogs()

eventManager.postClosed()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ object SparkConnectTestUtils {
sessionId = UUID.randomUUID().toString,
session = session)
SparkConnectService.sessionManager.putSessionForTesting(ret)
if (session != null) {
ret.initializeSession()
}
ret
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,24 @@ import org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper.RunnerCl
import org.apache.spark.sql.pipelines.graph.{DataflowGraph, PipelineUpdateContextImpl}
import org.apache.spark.sql.pipelines.logging.PipelineEvent
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.ArrayImplicits._

class SparkConnectSessionHolderSuite extends SharedSparkSession {

test("SessionHolder.close should close catalogs") {
val sessionHolder = SparkConnectTestUtils.createDummySessionHolder(spark)
val catalogName = "my_closeable_catalog"
sessionHolder.session.conf
.set(s"spark.sql.catalog.$catalogName", classOf[CloseableCatalog].getName)

val catalog = sessionHolder.session.sessionState.catalogManager.catalog(catalogName)
val closeableCatalog = catalog.asInstanceOf[CloseableCatalog]
assert(!closeableCatalog.isClosed)
sessionHolder.close()
assert(closeableCatalog.isClosed)
}

test("DataFrame cache: Successful put and get") {
val sessionHolder = SparkConnectTestUtils.createDummySessionHolder(spark)
import sessionHolder.session.implicits._
Expand Down Expand Up @@ -484,3 +498,21 @@ class SparkConnectSessionHolderSuite extends SharedSparkSession {
assertPlanCache(sessionHolder, Some(expected))
}
}

class CloseableCatalog
extends org.apache.spark.sql.connector.catalog.CatalogPlugin
with java.io.Closeable {
private var _name: String = _
private var closed = false

override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {
_name = name
}

override def name(): String = _name
override def close(): Unit = {
closed = true
}

def isClosed: Boolean = closed
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class V2SessionCatalog(catalog: SessionCatalog)
// This class is instantiated by Spark, so `initialize` method will not be called.
override def initialize(name: String, options: CaseInsensitiveStringMap): Unit = {}

override def close(): Unit = {}

override def capabilities(): util.Set[TableCatalogCapability] = {
Set(
TableCatalogCapability.SUPPORT_COLUMN_DEFAULT_VALUE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.internal

import java.io.File
import java.io.{Closeable, File}
import java.net.URI

import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -91,7 +91,7 @@ private[sql] class SessionState(
val columnarRules: Seq[ColumnarRule],
val adaptiveRulesHolder: AdaptiveRulesHolder,
val planNormalizationRules: Seq[Rule[LogicalPlan]],
val artifactManagerBuilder: () => ArtifactManager) {
val artifactManagerBuilder: () => ArtifactManager) extends Closeable {

// The following fields are lazy to avoid creating the Hive client when creating SessionState.
lazy val catalog: SessionCatalog = catalogBuilder()
Expand All @@ -110,6 +110,10 @@ private[sql] class SessionState(

def catalogManager: CatalogManager = analyzer.catalogManager

override def close(): Unit = {
catalogManager.close()
}

def newHadoopConf(): Configuration = SessionState.newHadoopConf(
sharedState.sparkContext.hadoopConfiguration,
conf)
Expand Down