Skip to content

Commit 5db31ae

Browse files
committed
[SPARK-49700][CONNECT][SQL] Unified Scala Interface for Connect and Classic
### What changes were proposed in this pull request? This PR makes the shared SQL (JVM) interface the primary interface for Scala/JVM based Dataframe programming. The implementations are moved to the `classic` and `connect` sub packages. The connect client had to be moved to the sql/connect/common package because serialization requires the captured client classes to be on the classpath when deserialized on the server. ### Why are the changes needed? This is the final step in creating a unified Scala interface for both Classic and Connect. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#48818 from hvanhovell/SPARK-49700. Authored-by: Herman van Hovell <[email protected]> Signed-off-by: Herman van Hovell <[email protected]>
1 parent a03c4cb commit 5db31ae

File tree

399 files changed

+1851
-1730
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

399 files changed

+1851
-1730
lines changed

common/utils/src/main/resources/error/error-conditions.json

+5
Original file line numberDiff line numberDiff line change
@@ -5277,6 +5277,11 @@
52775277
"Resilient Distributed Datasets (RDDs)."
52785278
]
52795279
},
5280+
"REGISTER_UDAF" : {
5281+
"message" : [
5282+
"Registering User Defined Aggregate Functions (UDAFs)."
5283+
]
5284+
},
52805285
"SESSION_BASE_RELATION_TO_DATAFRAME" : {
52815286
"message" : [
52825287
"Invoking SparkSession 'baseRelationToDataFrame'. This is server side developer API"

connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/application/ConnectRepl.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ import ammonite.util.Util.newLine
3131

3232
import org.apache.spark.SparkBuildInfo.spark_version
3333
import org.apache.spark.annotation.DeveloperApi
34-
import org.apache.spark.sql.SparkSession
35-
import org.apache.spark.sql.SparkSession.withLocalConnectServer
34+
import org.apache.spark.sql.connect.SparkSession
35+
import org.apache.spark.sql.connect.SparkSession.withLocalConnectServer
3636
import org.apache.spark.sql.connect.client.{SparkConnectClient, SparkConnectClientParser}
3737

3838
/**

connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala

-168
This file was deleted.

connector/connect/client/jvm/src/test/java/org/apache/spark/sql/JavaEncoderSuite.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import static org.apache.spark.sql.functions.*;
2929
import static org.apache.spark.sql.RowFactory.create;
3030
import org.apache.spark.api.java.function.MapFunction;
31-
import org.apache.spark.sql.test.SparkConnectServerUtils;
31+
import org.apache.spark.sql.connect.test.SparkConnectServerUtils;
3232
import org.apache.spark.sql.types.StructType;
3333

3434
/**

connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DataFrameSubquerySuite.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
package org.apache.spark.sql
1919

2020
import org.apache.spark.SparkRuntimeException
21+
import org.apache.spark.sql.connect.test.{QueryTest, RemoteSparkSession}
2122
import org.apache.spark.sql.functions._
22-
import org.apache.spark.sql.test.{QueryTest, RemoteSparkSession}
2323

2424
class DataFrameSubquerySuite extends QueryTest with RemoteSparkSession {
2525
import testImplicits._

connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/DataFrameTableValuedFunctionsSuite.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717

1818
package org.apache.spark.sql
1919

20+
import org.apache.spark.sql.connect.test.{QueryTest, RemoteSparkSession}
2021
import org.apache.spark.sql.functions._
21-
import org.apache.spark.sql.test.{QueryTest, RemoteSparkSession}
2222

2323
class DataFrameTableValuedFunctionsSuite extends QueryTest with RemoteSparkSession {
2424
import testImplicits._

connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,13 @@ import org.apache.spark.sql.avro.{functions => avroFn}
3838
import org.apache.spark.sql.catalyst.ScalaReflection
3939
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
4040
import org.apache.spark.sql.catalyst.util.CollationFactory
41+
import org.apache.spark.sql.connect.{DataFrame, Dataset, SparkSession}
4142
import org.apache.spark.sql.connect.ConnectConversions._
4243
import org.apache.spark.sql.connect.client.SparkConnectClient
44+
import org.apache.spark.sql.connect.test.{ConnectFunSuite, IntegrationTestUtils}
4345
import org.apache.spark.sql.expressions.Window
4446
import org.apache.spark.sql.functions.lit
4547
import org.apache.spark.sql.protobuf.{functions => pbFn}
46-
import org.apache.spark.sql.test.{ConnectFunSuite, IntegrationTestUtils}
4748
import org.apache.spark.sql.types._
4849
import org.apache.spark.unsafe.types.CalendarInterval
4950
import org.apache.spark.util.SparkFileUtils

connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SQLExpressionsSuite.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.sql
1919

20-
import org.apache.spark.sql.test.{QueryTest, RemoteSparkSession}
20+
import org.apache.spark.sql.connect.test.{QueryTest, RemoteSparkSession}
2121
import org.apache.spark.unsafe.types.VariantVal
2222

2323
class SQLExpressionsSuite extends QueryTest with RemoteSparkSession {

connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import scala.util.Properties
2525
import org.apache.commons.io.output.ByteArrayOutputStream
2626
import org.scalatest.BeforeAndAfterEach
2727

28-
import org.apache.spark.sql.test.{ConnectFunSuite, IntegrationTestUtils, RemoteSparkSession}
28+
import org.apache.spark.sql.connect.test.{ConnectFunSuite, IntegrationTestUtils, RemoteSparkSession}
2929
import org.apache.spark.tags.AmmoniteTest
3030
import org.apache.spark.util.IvyTestUtils
3131
import org.apache.spark.util.MavenUtils.MavenCoordinate

connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CatalogSuite.scala renamed to connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/CatalogSuite.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,15 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.sql
18+
package org.apache.spark.sql.connect
1919

2020
import java.io.{File, FilenameFilter}
2121

2222
import org.apache.commons.io.FileUtils
2323

2424
import org.apache.spark.SparkException
25-
import org.apache.spark.sql.test.{ConnectFunSuite, RemoteSparkSession, SQLHelper}
25+
import org.apache.spark.sql.AnalysisException
26+
import org.apache.spark.sql.connect.test.{ConnectFunSuite, RemoteSparkSession, SQLHelper}
2627
import org.apache.spark.sql.types.{DoubleType, LongType, StructType}
2728
import org.apache.spark.storage.StorageLevel
2829

connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/CheckpointSuite.scala renamed to connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/CheckpointSuite.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.spark.sql
17+
package org.apache.spark.sql.connect
1818

1919
import java.io.{ByteArrayOutputStream, PrintStream}
2020

@@ -26,7 +26,7 @@ import org.scalatest.exceptions.TestFailedDueToTimeoutException
2626

2727
import org.apache.spark.SparkException
2828
import org.apache.spark.connect.proto
29-
import org.apache.spark.sql.test.{ConnectFunSuite, RemoteSparkSession, SQLHelper}
29+
import org.apache.spark.sql.connect.test.{ConnectFunSuite, RemoteSparkSession, SQLHelper}
3030
import org.apache.spark.storage.StorageLevel
3131

3232
class CheckpointSuite extends ConnectFunSuite with RemoteSparkSession with SQLHelper {

connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDataFrameStatSuite.scala renamed to connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientDataFrameStatSuite.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,15 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.sql
18+
package org.apache.spark.sql.connect
1919

2020
import java.util.Random
2121

2222
import org.scalatest.matchers.must.Matchers._
2323

2424
import org.apache.spark.SparkIllegalArgumentException
25-
import org.apache.spark.sql.test.{ConnectFunSuite, RemoteSparkSession}
25+
import org.apache.spark.sql.AnalysisException
26+
import org.apache.spark.sql.connect.test.{ConnectFunSuite, RemoteSparkSession}
2627

2728
class ClientDataFrameStatSuite extends ConnectFunSuite with RemoteSparkSession {
2829
private def toLetter(i: Int): String = (i + 97).toChar.toString

connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDatasetSuite.scala renamed to connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientDatasetSuite.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.spark.sql
17+
package org.apache.spark.sql.connect
1818

1919
import java.util.Properties
2020
import java.util.concurrent.TimeUnit
@@ -25,9 +25,10 @@ import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
2525
import org.scalatest.BeforeAndAfterEach
2626

2727
import org.apache.spark.connect.proto
28+
import org.apache.spark.sql.Column
2829
import org.apache.spark.sql.connect.client.{DummySparkConnectService, SparkConnectClient}
30+
import org.apache.spark.sql.connect.test.ConnectFunSuite
2931
import org.apache.spark.sql.functions._
30-
import org.apache.spark.sql.test.ConnectFunSuite
3132
import org.apache.spark.util.SparkSerDeUtils
3233

3334
// Add sample tests.

connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala renamed to connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala

+5-3
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.spark.sql
17+
package org.apache.spark.sql.connect
1818

1919
import java.io.{ByteArrayOutputStream, PrintStream}
2020
import java.nio.file.Files
@@ -34,15 +34,17 @@ import org.scalatest.PrivateMethodTester
3434
import org.apache.spark.{SparkArithmeticException, SparkException, SparkUpgradeException}
3535
import org.apache.spark.SparkBuildInfo.{spark_version => SPARK_VERSION}
3636
import org.apache.spark.internal.config.ConfigBuilder
37+
import org.apache.spark.sql.{functions, AnalysisException, Observation, Row, SaveMode}
3738
import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, TableAlreadyExistsException, TempTableAlreadyExistsException}
3839
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
3940
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
4041
import org.apache.spark.sql.catalyst.parser.ParseException
42+
import org.apache.spark.sql.connect.ConnectConversions._
4143
import org.apache.spark.sql.connect.client.{RetryPolicy, SparkConnectClient, SparkResult}
44+
import org.apache.spark.sql.connect.test.{ConnectFunSuite, IntegrationTestUtils, RemoteSparkSession, SQLHelper}
45+
import org.apache.spark.sql.connect.test.SparkConnectServerUtils.port
4246
import org.apache.spark.sql.functions._
4347
import org.apache.spark.sql.internal.SqlApiConf
44-
import org.apache.spark.sql.test.{ConnectFunSuite, IntegrationTestUtils, RemoteSparkSession, SQLHelper}
45-
import org.apache.spark.sql.test.SparkConnectServerUtils.port
4648
import org.apache.spark.sql.types._
4749
import org.apache.spark.util.SparkThreadUtils
4850

Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.spark.sql.internal
17+
package org.apache.spark.sql.connect
1818

1919
import org.apache.spark.SparkException
2020
import org.apache.spark.connect.proto
@@ -23,9 +23,10 @@ import org.apache.spark.sql.{Column, Encoder}
2323
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveIntEncoder, PrimitiveLongEncoder}
2424
import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, Origin}
2525
import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, ProtoDataTypes}
26+
import org.apache.spark.sql.connect.test.ConnectFunSuite
2627
import org.apache.spark.sql.expressions.{Aggregator, SparkUserDefinedFunction, UserDefinedAggregator}
27-
import org.apache.spark.sql.test.ConnectFunSuite
28-
import org.apache.spark.sql.types.{BinaryType, DataType, DoubleType, LongType, MetadataBuilder, ShortType, StringType, StructType}
28+
import org.apache.spark.sql.internal._
29+
import org.apache.spark.sql.types._
2930

3031
/**
3132
* Test suite for [[ColumnNode]] to [[proto.Expression]] conversions.
@@ -471,8 +472,8 @@ class ColumnNodeToProtoConverterSuite extends ConnectFunSuite {
471472
}
472473
}
473474

474-
private[internal] case class Nope(override val origin: Origin = CurrentOrigin.get)
475+
private[connect] case class Nope(override val origin: Origin = CurrentOrigin.get)
475476
extends ColumnNode {
476477
override def sql: String = "nope"
477-
override private[internal] def children: Seq[ColumnNodeLike] = Seq.empty
478+
override def children: Seq[ColumnNodeLike] = Seq.empty
478479
}

0 commit comments

Comments
 (0)