Skip to content

Conversation

attilapiros
Copy link
Contributor

@attilapiros attilapiros commented Jul 29, 2025

What changes were proposed in this pull request?

Fixing race conditions at create table and create function when IF NOT EXISTS is given.

Why are the changes needed?

Even when "CREATE FUNCTION IF NOT EXISTS" is used in parallel can fail with the following exception:

2025-07-25 01:22:21,731 [AA-Rule-ThreadPoolExec-2] ERROR ***** - An error occured :
org.apache.spark.sql.AnalysisException: Function default.SparkTestUDF already exists; line 1 pos 6734
        at org.apache.spark.sql.errors.QueryCompilationErrors$.functionAlreadyExistsError(QueryCompilationErrors.scala:654)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.registerFunction(SessionCatalog.scala:1487)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.resolvePersistentFunctionInternal(SessionCatalog.scala:1719)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.resolvePersistentFunction(SessionCatalog.scala:1675)
        at ...

Regarding CREATE TABLE:

scala> import scala.collection.parallel.CollectionConverters._
import scala.collection.parallel.CollectionConverters._

scala> (1 to 5).toList.par.foreach(_ => spark.sql("create table if not exists spark52988(a int)"))
     |
25/08/11 15:47:18 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
25/08/11 15:47:18 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore [email protected]
25/08/11 15:47:18 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
25/08/11 15:47:19 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
25/08/11 15:47:19 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: [TABLE_OR_VIEW_ALREADY_EXISTS] Cannot create table or view `default`.`spark52988` because it already exists.
Choose a different name, drop or replace the existing object, or add the IF NOT EXISTS clause to tolerate pre-existing objects. SQLSTATE: 42P07
  at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$createTable$1(HiveExternalCatalog.scala:226)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
  at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:105)
  at org.apache.spark.sql.hive.HiveExternalCatalog.createTable(HiveExternalCatalog.scala:218)
  at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.createTable(ExternalCatalogWithListener.scala:94)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createTable(SessionCatalog.scala:422)
  at org.apache.spark.sql.execution.command.CreateDataSourceTableCommand.run(createDataSourceTables.scala:123)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:79)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:77)....

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Manually.

CREATE FUNCTION after this change:

scala> (1 to 100).foreach { j => (1 to 25).toList.par.foreach(_ => spark.sql(s"create function if not exists f$j(i int) returns int return i * i")) }
25/08/12 10:34:20 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
25/08/12 10:34:20 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
25/08/12 10:34:20 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label Jul 29, 2025
@attilapiros
Copy link
Contributor Author

cc @dongjoon-hyun

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for pinging me, @attilapiros .

  • Do you think your test code can be a part of test coverage?
  • createUserDefinedFunction seems to be missed if we want to cover all function create/delete/alter.
  • And, is this enough? For example, functionExists is okay because it's read-only?

cc @cloud-fan , @peter-toth , @yaooqinn , @LuciferYang , too

@attilapiros
Copy link
Contributor Author

  1. Do you think your test code can be a part of test coverage?

I do not think so. This change is fairly simple. With my test I just would liked to illustrate how easy to reproduce this.

  1. createUserDefinedFunction seems to be missed if we want to cover all function create/delete/alter.

It is not needed as the createUserDefinedFunction just delegates the call into createFunction (actually it only calls createFunction) which will be synchronized after this change.

  1. And, is this enough? For example, functionExists is okay because it's read-only?

In the sense of create/drop/alter it is not needed because of the synchronized methods the intrinsic lock will be already acquired by the thread execution the code. So when functionExists called from those we already have the lock.

dongjoon-hyun
dongjoon-hyun previously approved these changes Jul 29, 2025
@cloud-fan
Copy link
Contributor

instead of locking, can we do something like

try {
  externalCatalog.createFunction(db, newFuncDefinition)
} catch {
  case e if igoreIfExists =>
}

@yaooqinn
Copy link
Member

(1 to 5).toList.par.foreach(_ => spark.sql("create table if not exists spark52988(a int)"))

This seems not to be a function-specific issue

@attilapiros
Copy link
Contributor Author

attilapiros commented Jul 31, 2025

instead of locking, can we do something like

try {
  externalCatalog.createFunction(db, newFuncDefinition)
} catch {
  case e if igoreIfExists =>
}

@cloud-fan

Yes, that's also would work for the "CREATE FUNCTION IF NOT EXISTS" but is not the locking the safer / more correct solution especially when the SessionCatalog claims to be thread-safe:

https://github.com/attilapiros/spark/blame/554d67817e44498cca9d1a211d8bdc4a69dc9d0e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala#L66

Or is there some performance concern behind your suggestion?

@cloud-fan
Copy link
Contributor

Yea locking is not good for high concurrency, and in fact the locking is just best effort as we can't prevent other Spark applications/clients from creating/droping tables at the same time.

@dongjoon-hyun dongjoon-hyun dismissed their stale review August 1, 2025 01:48

Dismissed per the above discussion.

@attilapiros attilapiros changed the title [SPARK-52988][SQL] Fix race conditions in SessionCatalog's metastore function handling [SPARK-52988][SQL] Fix race conditions at create table and create function when IF NOT EXISTS is used Aug 11, 2025
@attilapiros attilapiros changed the title [SPARK-52988][SQL] Fix race conditions at create table and create function when IF NOT EXISTS is used [SPARK-52988][SQL] Fix race conditions at CREATE TABLE and FUNCTION when IF NOT EXISTS is used Aug 11, 2025
@attilapiros
Copy link
Contributor Author

cc @dongjoon-hyun, @cloud-fan

I have updated the PR by catching the exceptions.

@dongjoon-hyun
Copy link
Member

Thank you, @attilapiros, @cloud-fan , and @yaooqinn .

mzhang pushed a commit to mzhang/spark that referenced this pull request Aug 21, 2025
…hen IF NOT EXISTS is used

### What changes were proposed in this pull request?

Fixing race conditions at create table and create function when IF NOT EXISTS is given.

### Why are the changes needed?

Even when "CREATE FUNCTION IF NOT EXISTS" is used in parallel can fail with the following exception:

```
2025-07-25 01:22:21,731 [AA-Rule-ThreadPoolExec-2] ERROR ***** - An error occured :
org.apache.spark.sql.AnalysisException: Function default.SparkTestUDF already exists; line 1 pos 6734
        at org.apache.spark.sql.errors.QueryCompilationErrors$.functionAlreadyExistsError(QueryCompilationErrors.scala:654)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.registerFunction(SessionCatalog.scala:1487)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.resolvePersistentFunctionInternal(SessionCatalog.scala:1719)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.resolvePersistentFunction(SessionCatalog.scala:1675)
        at ...
```

Regarding `CREATE TABLE`:

```
scala> import scala.collection.parallel.CollectionConverters._
import scala.collection.parallel.CollectionConverters._

scala> (1 to 5).toList.par.foreach(_ => spark.sql("create table if not exists spark52988(a int)"))
     |
25/08/11 15:47:18 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
25/08/11 15:47:18 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore apiros10.96.131.100
25/08/11 15:47:18 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
25/08/11 15:47:19 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
25/08/11 15:47:19 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: [TABLE_OR_VIEW_ALREADY_EXISTS] Cannot create table or view `default`.`spark52988` because it already exists.
Choose a different name, drop or replace the existing object, or add the IF NOT EXISTS clause to tolerate pre-existing objects. SQLSTATE: 42P07
  at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$createTable$1(HiveExternalCatalog.scala:226)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
  at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:105)
  at org.apache.spark.sql.hive.HiveExternalCatalog.createTable(HiveExternalCatalog.scala:218)
  at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.createTable(ExternalCatalogWithListener.scala:94)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createTable(SessionCatalog.scala:422)
  at org.apache.spark.sql.execution.command.CreateDataSourceTableCommand.run(createDataSourceTables.scala:123)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:79)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:77)....
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually.

`CREATE FUNCTION` after this change:
```
scala> (1 to 100).foreach { j => (1 to 25).toList.par.foreach(_ => spark.sql(s"create function if not exists f$j(i int) returns int return i * i")) }
25/08/12 10:34:20 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
25/08/12 10:34:20 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
25/08/12 10:34:20 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
```

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#51696 from attilapiros/SPARK-52988.

Authored-by: attilapiros <[email protected]>
Signed-off-by: attilapiros <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants