Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 16ce998

Browse files
committedDec 24, 2024·
[connector] support spark catalog
1 parent de8a015 commit 16ce998

File tree

14 files changed

+1831
-2
lines changed

14 files changed

+1831
-2
lines changed
 
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package com.alibaba.fluss.connector.flink.utils;
17+
package com.alibaba.fluss.utils;
1818

1919
import com.alibaba.fluss.exception.DatabaseAlreadyExistException;
2020
import com.alibaba.fluss.exception.DatabaseNotEmptyException;

‎fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@
2222
import com.alibaba.fluss.config.ConfigOptions;
2323
import com.alibaba.fluss.config.Configuration;
2424
import com.alibaba.fluss.connector.flink.lakehouse.LakeCatalog;
25-
import com.alibaba.fluss.connector.flink.utils.CatalogExceptionUtils;
2625
import com.alibaba.fluss.connector.flink.utils.FlinkConversions;
2726
import com.alibaba.fluss.exception.FlussRuntimeException;
2827
import com.alibaba.fluss.metadata.TableDescriptor;
2928
import com.alibaba.fluss.metadata.TableInfo;
3029
import com.alibaba.fluss.metadata.TablePath;
30+
import com.alibaba.fluss.utils.CatalogExceptionUtils;
3131
import com.alibaba.fluss.utils.ExceptionUtils;
3232
import com.alibaba.fluss.utils.IOUtils;
3333

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
Copyright (c) 2024 Alibaba Group Holding Ltd.
4+
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
-->
17+
18+
<project xmlns="http://maven.apache.org/POM/4.0.0"
19+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
20+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
21+
<modelVersion>4.0.0</modelVersion>
22+
<parent>
23+
<groupId>com.alibaba.fluss</groupId>
24+
<artifactId>fluss-connector-spark</artifactId>
25+
<version>0.6-SNAPSHOT</version>
26+
</parent>
27+
28+
<artifactId>fluss-connector-spark-3.3</artifactId>
29+
30+
<name>Fluss : Connector : Spark : 3.3</name>
31+
32+
<properties>
33+
<spark.version>3.3.3</spark.version>
34+
</properties>
35+
36+
<dependencies>
37+
<dependency>
38+
<groupId>org.apache.spark</groupId>
39+
<artifactId>spark-sql_${scala.binary.version}</artifactId>
40+
<version>${spark.version}</version>
41+
<scope>provided</scope>
42+
</dependency>
43+
44+
<dependency>
45+
<groupId>org.apache.spark</groupId>
46+
<artifactId>spark-core_${scala.binary.version}</artifactId>
47+
<version>${spark.version}</version>
48+
<scope>provided</scope>
49+
</dependency>
50+
51+
<dependency>
52+
<groupId>com.alibaba.fluss</groupId>
53+
<artifactId>fluss-connector-spark-common</artifactId>
54+
<version>${project.version}</version>
55+
</dependency>
56+
</dependencies>
57+
58+
<build>
59+
<plugins>
60+
<plugin>
61+
<groupId>org.apache.maven.plugins</groupId>
62+
<artifactId>maven-shade-plugin</artifactId>
63+
<executions>
64+
<execution>
65+
<id>shade-fluss</id>
66+
<phase>package</phase>
67+
<goals>
68+
<goal>shade</goal>
69+
</goals>
70+
<configuration>
71+
<artifactSet>
72+
<includes combine.children="append">
73+
<include>com.alibaba.fluss:fluss-connector-spark-common</include>
74+
<include>com.alibaba.fluss:fluss-client</include>
75+
</includes>
76+
</artifactSet>
77+
</configuration>
78+
</execution>
79+
</executions>
80+
</plugin>
81+
</plugins>
82+
</build>
83+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
Copyright (c) 2024 Alibaba Group Holding Ltd.
4+
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
-->
17+
18+
<project xmlns="http://maven.apache.org/POM/4.0.0"
19+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
20+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
21+
<modelVersion>4.0.0</modelVersion>
22+
<parent>
23+
<groupId>com.alibaba.fluss</groupId>
24+
<artifactId>fluss-connector-spark</artifactId>
25+
<version>0.6-SNAPSHOT</version>
26+
</parent>
27+
28+
<artifactId>fluss-connector-spark-common</artifactId>
29+
30+
<name>Fluss : Connector : Spark : Common</name>
31+
32+
<properties>
33+
<scala.common.version>2.12.15</scala.common.version>
34+
<spark.common.version>3.5.3</spark.common.version>
35+
</properties>
36+
37+
<dependencies>
38+
<!-- Fluss dependency -->
39+
<dependency>
40+
<groupId>com.alibaba.fluss</groupId>
41+
<artifactId>fluss-client</artifactId>
42+
<version>${project.version}</version>
43+
</dependency>
44+
45+
<dependency>
46+
<groupId>org.scala-lang</groupId>
47+
<artifactId>scala-library</artifactId>
48+
<version>${scala.common.version}</version>
49+
</dependency>
50+
51+
<dependency>
52+
<groupId>org.scala-lang</groupId>
53+
<artifactId>scala-compiler</artifactId>
54+
<version>${scala.common.version}</version>
55+
</dependency>
56+
57+
<dependency>
58+
<groupId>org.apache.spark</groupId>
59+
<artifactId>spark-sql_${scala.binary.version}</artifactId>
60+
<version>${spark.common.version}</version>
61+
<scope>provided</scope>
62+
</dependency>
63+
64+
<dependency>
65+
<groupId>org.apache.spark</groupId>
66+
<artifactId>spark-core_${scala.binary.version}</artifactId>
67+
<version>${spark.common.version}</version>
68+
<scope>provided</scope>
69+
<exclusions>
70+
<exclusion>
71+
<groupId>org.apache.logging.log4j</groupId>
72+
<artifactId>log4j-slf4j2-impl</artifactId>
73+
</exclusion>
74+
</exclusions>
75+
</dependency>
76+
77+
<dependency>
78+
<groupId>org.apache.curator</groupId>
79+
<artifactId>curator-test</artifactId>
80+
<version>${curator.version}</version>
81+
<scope>test</scope>
82+
</dependency>
83+
84+
<dependency>
85+
<groupId>com.alibaba.fluss</groupId>
86+
<artifactId>fluss-server</artifactId>
87+
<version>${project.version}</version>
88+
<scope>test</scope>
89+
<type>test-jar</type>
90+
</dependency>
91+
92+
<dependency>
93+
<groupId>com.alibaba.fluss</groupId>
94+
<artifactId>fluss-test-utils</artifactId>
95+
<version>${project.version}</version>
96+
<scope>test</scope>
97+
</dependency>
98+
</dependencies>
99+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Copyright (c) 2024 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.connector.spark;
18+
19+
import com.alibaba.fluss.config.ConfigOption;
20+
import com.alibaba.fluss.config.FlussConfigUtils;
21+
22+
import java.time.Duration;
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
26+
import static com.alibaba.fluss.config.ConfigBuilder.key;
27+
28+
/** Options for spark connector. */
29+
public class SparkConnectorOptions {
30+
31+
public static final ConfigOption<Integer> BUCKET_NUMBER =
32+
key("bucket.num")
33+
.intType()
34+
.noDefaultValue()
35+
.withDescription("The number of buckets of a Fluss table.");
36+
37+
public static final ConfigOption<String> BUCKET_KEY =
38+
key("bucket.key")
39+
.stringType()
40+
.noDefaultValue()
41+
.withDescription(
42+
"Specific the distribution policy of the Fluss table. "
43+
+ "Data will be distributed to each bucket according to the hash value of bucket-key. "
44+
+ "If you specify multiple fields, delimiter is ','. "
45+
+ "If the table is with primary key, you can't specific bucket key currently. "
46+
+ "The bucket keys will always be the primary key. "
47+
+ "If the table is not with primary key, you can specific bucket key, and when the bucket key is not specified, "
48+
+ "the data will be distributed to each bucket randomly.");
49+
50+
public static final ConfigOption<String> BOOTSTRAP_SERVERS =
51+
key("bootstrap.servers")
52+
.stringType()
53+
.noDefaultValue()
54+
.withDescription(
55+
"A list of host/port pairs to use for establishing the initial connection to the Fluss cluster. "
56+
+ "The list should be in the form host1:port1,host2:port2,....");
57+
58+
public static final ConfigOption<String> PRIMARY_KEY =
59+
key("primary.key")
60+
.stringType()
61+
.noDefaultValue()
62+
.withDescription("the primary key of fluss table, such as key1,key2,...");
63+
64+
// --------------------------------------------------------------------------------------------
65+
// Lookup specific options
66+
// --------------------------------------------------------------------------------------------
67+
68+
public static final ConfigOption<Boolean> LOOKUP_ASYNC =
69+
key("lookup.async")
70+
.booleanType()
71+
.defaultValue(true)
72+
.withDescription("Whether to set async lookup. Default is true.");
73+
74+
// --------------------------------------------------------------------------------------------
75+
// Scan specific options
76+
// --------------------------------------------------------------------------------------------
77+
78+
public static final ConfigOption<ScanStartupMode> SCAN_STARTUP_MODE =
79+
key("scan.startup.mode")
80+
.enumType(ScanStartupMode.class)
81+
.defaultValue(ScanStartupMode.INITIAL)
82+
.withDescription(
83+
"Optional startup mode for Fluss source. Default is 'initial'.");
84+
85+
public static final ConfigOption<String> SCAN_STARTUP_TIMESTAMP =
86+
key("scan.startup.timestamp")
87+
.stringType()
88+
.noDefaultValue()
89+
.withDescription(
90+
"Optional timestamp for Fluss source in case of startup mode is timestamp. "
91+
+ "The format is 'timestamp' or 'yyyy-MM-dd HH:mm:ss'. "
92+
+ "Like '1678883047356' or '2023-12-09 23:09:12'.");
93+
94+
public static final ConfigOption<Duration> SCAN_PARTITION_DISCOVERY_INTERVAL =
95+
key("scan.partition.discovery.interval")
96+
.durationType()
97+
.defaultValue(Duration.ofSeconds(10))
98+
.withDescription(
99+
"The interval in milliseconds for the Fluss source to discover "
100+
+ "the new partitions for partitioned table while scanning."
101+
+ " A non-positive value disables the partition discovery.");
102+
103+
// --------------------------------------------------------------------------------------------
104+
// table storage specific options
105+
// --------------------------------------------------------------------------------------------
106+
107+
public static final List<ConfigOption<?>> TABLE_OPTIONS =
108+
new ArrayList<>(FlussConfigUtils.TABLE_OPTIONS.values());
109+
110+
// --------------------------------------------------------------------------------------------
111+
// client specific options
112+
// --------------------------------------------------------------------------------------------
113+
114+
public static final List<ConfigOption<?>> CLIENT_OPTIONS =
115+
new ArrayList<>(FlussConfigUtils.CLIENT_OPTIONS.values());
116+
117+
// ------------------------------------------------------------------------------------------
118+
119+
/** Startup mode for the fluss scanner, see {@link #SCAN_STARTUP_MODE}. */
120+
public enum ScanStartupMode {
121+
INITIAL(
122+
"initial",
123+
"Performs an initial snapshot n the table upon first startup, "
124+
+ "ans continue to read the latest changelog with exactly once guarantee. "
125+
+ "If the table to read is a log table, the initial snapshot means "
126+
+ "reading from earliest log offset. If the table to read is a primary key table, "
127+
+ "the initial snapshot means reading a latest snapshot which "
128+
+ "materializes all changes on the table."),
129+
EARLIEST("earliest", "Start reading logs from the earliest offset."),
130+
LATEST("latest", "Start reading logs from the latest offset."),
131+
TIMESTAMP("timestamp", "Start reading logs from user-supplied timestamp.");
132+
133+
private final String value;
134+
private final String description;
135+
136+
ScanStartupMode(String value, String description) {
137+
this.value = value;
138+
this.description = description;
139+
}
140+
141+
@Override
142+
public String toString() {
143+
return value;
144+
}
145+
}
146+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,365 @@
1+
/*
2+
* Copyright (c) 2024 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.connector.spark.catalog;
18+
19+
import com.alibaba.fluss.client.Connection;
20+
import com.alibaba.fluss.client.ConnectionFactory;
21+
import com.alibaba.fluss.client.admin.Admin;
22+
import com.alibaba.fluss.config.ConfigOptions;
23+
import com.alibaba.fluss.config.Configuration;
24+
import com.alibaba.fluss.connector.spark.SparkConnectorOptions;
25+
import com.alibaba.fluss.connector.spark.exception.CatalogException;
26+
import com.alibaba.fluss.connector.spark.table.SparkTable;
27+
import com.alibaba.fluss.connector.spark.utils.SparkConversions;
28+
import com.alibaba.fluss.metadata.TableDescriptor;
29+
import com.alibaba.fluss.metadata.TableInfo;
30+
import com.alibaba.fluss.metadata.TablePath;
31+
import com.alibaba.fluss.utils.CatalogExceptionUtils;
32+
import com.alibaba.fluss.utils.ExceptionUtils;
33+
import com.alibaba.fluss.utils.IOUtils;
34+
import com.alibaba.fluss.utils.Preconditions;
35+
36+
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
37+
import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
38+
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
39+
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
40+
import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException;
41+
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
42+
import org.apache.spark.sql.connector.catalog.FunctionCatalog;
43+
import org.apache.spark.sql.connector.catalog.Identifier;
44+
import org.apache.spark.sql.connector.catalog.NamespaceChange;
45+
import org.apache.spark.sql.connector.catalog.StagedTable;
46+
import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
47+
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
48+
import org.apache.spark.sql.connector.catalog.Table;
49+
import org.apache.spark.sql.connector.catalog.TableCatalog;
50+
import org.apache.spark.sql.connector.catalog.TableChange;
51+
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
52+
import org.apache.spark.sql.connector.expressions.Transform;
53+
import org.apache.spark.sql.types.StructType;
54+
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
55+
56+
import java.io.Closeable;
57+
import java.util.Arrays;
58+
import java.util.Collections;
59+
import java.util.List;
60+
import java.util.Map;
61+
62+
/** A Spark Catalog for Fluss. */
63+
public class SparkCatalog
64+
implements StagingTableCatalog,
65+
SupportsNamespaces,
66+
FunctionCatalog,
67+
TableCatalog,
68+
Closeable {
69+
70+
private static final String[] DEFAULT_NAMESPACE = new String[] {"fluss"};
71+
72+
private String bootstrapServers;
73+
private String catalogName;
74+
private Connection connection;
75+
private Admin admin;
76+
77+
@Override
78+
public void initialize(String name, CaseInsensitiveStringMap options) {
79+
this.catalogName = name;
80+
this.bootstrapServers = options.get(SparkConnectorOptions.BOOTSTRAP_SERVERS.key());
81+
Configuration flussConfigs = new Configuration();
82+
flussConfigs.setString(ConfigOptions.BOOTSTRAP_SERVERS.key(), bootstrapServers);
83+
connection = ConnectionFactory.createConnection(flussConfigs);
84+
admin = connection.getAdmin();
85+
}
86+
87+
@Override
88+
public String[] defaultNamespace() {
89+
return DEFAULT_NAMESPACE;
90+
}
91+
92+
@Override
93+
public String name() {
94+
return this.catalogName;
95+
}
96+
97+
@Override
98+
public boolean namespaceExists(String[] namespace) {
99+
isValidateNamespace(namespace);
100+
try {
101+
return admin.databaseExists(namespace[0]).get();
102+
} catch (Exception e) {
103+
throw new CatalogException(
104+
String.format("Failed to check if database %s exists in %s", namespace, name()),
105+
e);
106+
}
107+
}
108+
109+
@Override
110+
public String[][] listNamespaces() {
111+
try {
112+
List<String> databases = admin.listDatabases().get();
113+
String[][] namespaces = new String[databases.size()][];
114+
115+
for (int i = 0; i < databases.size(); ++i) {
116+
namespaces[i] = new String[] {databases.get(i)};
117+
}
118+
119+
return namespaces;
120+
} catch (Exception e) {
121+
throw new CatalogException(
122+
String.format("Failed to list all databases in %s", name()), e);
123+
}
124+
}
125+
126+
@Override
127+
public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException {
128+
if (namespace.length == 0) {
129+
return listNamespaces();
130+
} else {
131+
isValidateNamespace(namespace);
132+
if (namespaceExists(namespace)) {
133+
return new String[0][];
134+
}
135+
throw new NoSuchNamespaceException(namespace);
136+
}
137+
}
138+
139+
@Override
140+
public Map<String, String> loadNamespaceMetadata(String[] namespace)
141+
throws NoSuchNamespaceException {
142+
isValidateNamespace(namespace);
143+
if (namespaceExists(namespace)) {
144+
return Collections.emptyMap();
145+
}
146+
throw new NoSuchNamespaceException(namespace);
147+
}
148+
149+
@Override
150+
public void createNamespace(String[] namespace, Map<String, String> metadata)
151+
throws NamespaceAlreadyExistsException {
152+
isValidateNamespace(namespace);
153+
try {
154+
admin.createDatabase(namespace[0], false).get();
155+
} catch (Exception e) {
156+
Throwable t = ExceptionUtils.stripExecutionException(e);
157+
if (CatalogExceptionUtils.isDatabaseAlreadyExist(t)) {
158+
throw new NamespaceAlreadyExistsException(namespace);
159+
} else {
160+
throw new CatalogException(
161+
String.format("Failed to create database %s in %s", namespace, name()), t);
162+
}
163+
}
164+
}
165+
166+
@Override
167+
public void alterNamespace(String[] namespace, NamespaceChange... changes)
168+
throws NoSuchNamespaceException {
169+
throw new UnsupportedOperationException();
170+
}
171+
172+
@Override
173+
public boolean dropNamespace(String[] namespace, boolean cascade)
174+
throws NoSuchNamespaceException, NonEmptyNamespaceException {
175+
isValidateNamespace(namespace);
176+
try {
177+
admin.deleteDatabase(namespace[0], false, cascade).get();
178+
return true;
179+
} catch (Exception e) {
180+
Throwable t = ExceptionUtils.stripExecutionException(e);
181+
if (CatalogExceptionUtils.isDatabaseNotExist(t)) {
182+
throw new NoSuchNamespaceException(namespace);
183+
} else if (CatalogExceptionUtils.isDatabaseNotEmpty(t)) {
184+
throw new NonEmptyNamespaceException(namespace);
185+
} else {
186+
throw new CatalogException(
187+
String.format("Failed to drop database %s in %s", namespace, name()), t);
188+
}
189+
}
190+
}
191+
192+
@Override
193+
public StagedTable stageCreate(
194+
Identifier ident,
195+
StructType schema,
196+
Transform[] partitions,
197+
Map<String, String> properties)
198+
throws TableAlreadyExistsException, NoSuchNamespaceException {
199+
throw new UnsupportedOperationException();
200+
}
201+
202+
@Override
203+
public StagedTable stageReplace(
204+
Identifier ident,
205+
StructType schema,
206+
Transform[] partitions,
207+
Map<String, String> properties)
208+
throws NoSuchNamespaceException, NoSuchTableException {
209+
throw new UnsupportedOperationException();
210+
}
211+
212+
@Override
213+
public StagedTable stageCreateOrReplace(
214+
Identifier ident,
215+
StructType schema,
216+
Transform[] partitions,
217+
Map<String, String> properties)
218+
throws NoSuchNamespaceException {
219+
throw new UnsupportedOperationException();
220+
}
221+
222+
@Override
223+
public Table loadTable(Identifier ident, String version) throws NoSuchTableException {
224+
throw new UnsupportedOperationException();
225+
}
226+
227+
@Override
228+
public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException {
229+
throw new UnsupportedOperationException();
230+
}
231+
232+
@Override
233+
public void invalidateTable(Identifier ident) {
234+
throw new UnsupportedOperationException();
235+
}
236+
237+
@Override
238+
public boolean tableExists(Identifier ident) {
239+
try {
240+
return admin.tableExists(toTablePath(ident)).get();
241+
} catch (Exception e) {
242+
throw new CatalogException(
243+
String.format("Failed to check if table %s exists in %s", ident, name()),
244+
ExceptionUtils.stripExecutionException(e));
245+
}
246+
}
247+
248+
@Override
249+
public boolean purgeTable(Identifier ident) throws UnsupportedOperationException {
250+
throw new UnsupportedOperationException();
251+
}
252+
253+
@Override
254+
public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException {
255+
isValidateNamespace(namespace);
256+
try {
257+
List<String> tables = admin.listTables(namespace[0]).get();
258+
Identifier[] identifiers = new Identifier[tables.size()];
259+
for (int i = 0; i < tables.size(); i++) {
260+
identifiers[i] = Identifier.of(namespace, tables.get(i));
261+
}
262+
return identifiers;
263+
} catch (Exception e) {
264+
Throwable t = ExceptionUtils.stripExecutionException(e);
265+
if (CatalogExceptionUtils.isDatabaseNotExist(t)) {
266+
throw new NoSuchNamespaceException(namespace);
267+
}
268+
throw new CatalogException(
269+
String.format(
270+
"Failed to list all tables in database %s in %s", namespace, name()),
271+
t);
272+
}
273+
}
274+
275+
@Override
276+
public Table loadTable(Identifier ident) throws NoSuchTableException {
277+
try {
278+
TableInfo tableInfo = admin.getTable(toTablePath(ident)).get();
279+
return new SparkTable(tableInfo.getTablePath(), tableInfo.getTableDescriptor());
280+
} catch (Exception e) {
281+
Throwable t = ExceptionUtils.stripExecutionException(e);
282+
if (CatalogExceptionUtils.isTableNotExist(t)) {
283+
throw new NoSuchTableException(ident);
284+
} else {
285+
throw new CatalogException(
286+
String.format("Failed to get table %s in %s", ident, name()), t);
287+
}
288+
}
289+
}
290+
291+
@Override
292+
public Table createTable(
293+
Identifier ident,
294+
StructType schema,
295+
Transform[] partitions,
296+
Map<String, String> properties)
297+
throws TableAlreadyExistsException, NoSuchNamespaceException {
298+
try {
299+
TableDescriptor tableDescriptor =
300+
SparkConversions.toFlussTable(schema, partitions, properties);
301+
TablePath tablePath = toTablePath(ident);
302+
admin.createTable(tablePath, tableDescriptor, false).get();
303+
return new SparkTable(tablePath, tableDescriptor);
304+
} catch (Exception e) {
305+
Throwable t = ExceptionUtils.stripExecutionException(e);
306+
if (CatalogExceptionUtils.isDatabaseNotExist(t)) {
307+
throw new NoSuchNamespaceException(ident.namespace());
308+
} else if (CatalogExceptionUtils.isTableAlreadyExist(t)) {
309+
throw new TableAlreadyExistsException(ident);
310+
} else {
311+
throw new CatalogException(
312+
String.format("Failed to create table %s in %s", ident, name()), t);
313+
}
314+
}
315+
}
316+
317+
@Override
318+
public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException {
319+
throw new UnsupportedOperationException();
320+
}
321+
322+
@Override
323+
public boolean dropTable(Identifier ident) {
324+
try {
325+
admin.deleteTable(toTablePath(ident), false).get();
326+
return true;
327+
} catch (Exception e) {
328+
Throwable t = ExceptionUtils.stripExecutionException(e);
329+
throw new CatalogException(
330+
String.format("Failed to drop table %s in %s", ident, name()), t);
331+
}
332+
}
333+
334+
@Override
335+
public void renameTable(Identifier oldIdent, Identifier newIdent)
336+
throws NoSuchTableException, TableAlreadyExistsException {
337+
throw new UnsupportedOperationException();
338+
}
339+
340+
@Override
341+
public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException {
342+
throw new UnsupportedOperationException();
343+
}
344+
345+
@Override
346+
public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException {
347+
throw new UnsupportedOperationException();
348+
}
349+
350+
@Override
351+
public void close() {
352+
IOUtils.closeQuietly(admin, "fluss-admin");
353+
IOUtils.closeQuietly(connection, "fluss-connection");
354+
}
355+
356+
private void isValidateNamespace(String[] namespace) {
357+
Preconditions.checkArgument(
358+
namespace.length == 1, "Namespace %s is not valid", Arrays.toString(namespace));
359+
}
360+
361+
private TablePath toTablePath(Identifier ident) {
362+
isValidateNamespace(ident.namespace());
363+
return TablePath.of(ident.namespace()[0], ident.name());
364+
}
365+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright (c) 2024 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.connector.spark.exception;
18+
19+
/** The exception which was throw when spark catalog process failed. */
20+
public class CatalogException extends RuntimeException {
21+
22+
public CatalogException(String message) {
23+
super(message);
24+
}
25+
26+
public CatalogException(Throwable cause) {
27+
super(cause);
28+
}
29+
30+
public CatalogException(String message, Throwable cause) {
31+
super(message, cause);
32+
}
33+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright (c) 2024 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.connector.spark.table;
18+
19+
import com.alibaba.fluss.connector.spark.utils.SparkConversions;
20+
import com.alibaba.fluss.metadata.TableDescriptor;
21+
import com.alibaba.fluss.metadata.TablePath;
22+
23+
import org.apache.spark.sql.connector.catalog.Table;
24+
import org.apache.spark.sql.connector.catalog.TableCapability;
25+
import org.apache.spark.sql.connector.expressions.Transform;
26+
import org.apache.spark.sql.types.StructType;
27+
28+
import java.util.Collections;
29+
import java.util.HashMap;
30+
import java.util.Map;
31+
import java.util.Set;
32+
33+
/** spark table. */
34+
public class SparkTable implements Table {
35+
36+
private final TablePath tablePath;
37+
private final TableDescriptor tableDescriptor;
38+
private final StructType sparkSchema;
39+
private final Transform[] transforms;
40+
private final Map<String, String> properties;
41+
42+
public SparkTable(TablePath tablePath, TableDescriptor tableDescriptor) {
43+
this.tablePath = tablePath;
44+
this.tableDescriptor = tableDescriptor;
45+
this.sparkSchema = SparkConversions.toSparkSchema(tableDescriptor.getSchema());
46+
this.transforms = SparkConversions.toSparkTransforms(tableDescriptor.getPartitionKeys());
47+
this.properties = new HashMap<>();
48+
this.properties.putAll(tableDescriptor.getProperties());
49+
this.properties.putAll(tableDescriptor.getCustomProperties());
50+
}
51+
52+
@Override
53+
public Transform[] partitioning() {
54+
return transforms;
55+
}
56+
57+
@Override
58+
public Map<String, String> properties() {
59+
return properties;
60+
}
61+
62+
@Override
63+
public String name() {
64+
return tablePath.getTableName();
65+
}
66+
67+
@Override
68+
public StructType schema() {
69+
return sparkSchema;
70+
}
71+
72+
@Override
73+
public Set<TableCapability> capabilities() {
74+
return Collections.emptySet();
75+
}
76+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
/*
2+
* Copyright (c) 2024 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.connector.spark.utils;
18+
19+
import com.alibaba.fluss.types.ArrayType;
20+
import com.alibaba.fluss.types.BigIntType;
21+
import com.alibaba.fluss.types.BinaryType;
22+
import com.alibaba.fluss.types.BooleanType;
23+
import com.alibaba.fluss.types.BytesType;
24+
import com.alibaba.fluss.types.CharType;
25+
import com.alibaba.fluss.types.DataTypeVisitor;
26+
import com.alibaba.fluss.types.DateType;
27+
import com.alibaba.fluss.types.DecimalType;
28+
import com.alibaba.fluss.types.DoubleType;
29+
import com.alibaba.fluss.types.FloatType;
30+
import com.alibaba.fluss.types.IntType;
31+
import com.alibaba.fluss.types.LocalZonedTimestampType;
32+
import com.alibaba.fluss.types.MapType;
33+
import com.alibaba.fluss.types.RowType;
34+
import com.alibaba.fluss.types.SmallIntType;
35+
import com.alibaba.fluss.types.StringType;
36+
import com.alibaba.fluss.types.TimeType;
37+
import com.alibaba.fluss.types.TimestampType;
38+
import com.alibaba.fluss.types.TinyIntType;
39+
40+
import org.apache.spark.sql.types.DataType;
41+
import org.apache.spark.sql.types.DataTypes;
42+
43+
/** Convert Fluss's {@link com.alibaba.fluss.types.DataType} to Spark's {@link DataType}. */
44+
public class FlussTypeToSparkType implements DataTypeVisitor<DataType> {
45+
46+
static final FlussTypeToSparkType INSTANCE = new FlussTypeToSparkType();
47+
48+
@Override
49+
public DataType visit(CharType charType) {
50+
return new org.apache.spark.sql.types.CharType(charType.getLength());
51+
}
52+
53+
@Override
54+
public DataType visit(StringType stringType) {
55+
return DataTypes.StringType;
56+
}
57+
58+
@Override
59+
public DataType visit(BooleanType booleanType) {
60+
return DataTypes.BooleanType;
61+
}
62+
63+
@Override
64+
public DataType visit(BinaryType binaryType) {
65+
return DataTypes.BinaryType;
66+
}
67+
68+
@Override
69+
public DataType visit(BytesType bytesType) {
70+
return DataTypes.BinaryType;
71+
}
72+
73+
@Override
74+
public DataType visit(DecimalType decimalType) {
75+
return DataTypes.createDecimalType(decimalType.getPrecision(), decimalType.getScale());
76+
}
77+
78+
@Override
79+
public DataType visit(TinyIntType tinyIntType) {
80+
return DataTypes.ByteType;
81+
}
82+
83+
@Override
84+
public DataType visit(SmallIntType smallIntType) {
85+
return DataTypes.ShortType;
86+
}
87+
88+
@Override
89+
public DataType visit(IntType intType) {
90+
return DataTypes.IntegerType;
91+
}
92+
93+
@Override
94+
public DataType visit(BigIntType bigIntType) {
95+
return DataTypes.LongType;
96+
}
97+
98+
@Override
99+
public DataType visit(FloatType floatType) {
100+
return DataTypes.FloatType;
101+
}
102+
103+
@Override
104+
public DataType visit(DoubleType doubleType) {
105+
return DataTypes.DoubleType;
106+
}
107+
108+
@Override
109+
public DataType visit(DateType dateType) {
110+
return DataTypes.DateType;
111+
}
112+
113+
@Override
114+
public DataType visit(TimeType timeType) {
115+
// spark 3.3 does not support Time type, use long to represent it
116+
return DataTypes.LongType;
117+
}
118+
119+
@Override
120+
public DataType visit(TimestampType timestampType) {
121+
// spark 3.3 does not support Timestamp without time zone type, use long to represent it
122+
return DataTypes.LongType;
123+
}
124+
125+
@Override
126+
public DataType visit(LocalZonedTimestampType localZonedTimestampType) {
127+
// with local time zone
128+
return DataTypes.TimestampType;
129+
}
130+
131+
@Override
132+
public DataType visit(ArrayType arrayType) {
133+
// TODO: support ArrayType
134+
throw new UnsupportedOperationException("UnSupport ArrayType now");
135+
}
136+
137+
@Override
138+
public DataType visit(MapType mapType) {
139+
// TODO: support MapType
140+
throw new UnsupportedOperationException("UnSupport MapType now");
141+
}
142+
143+
@Override
144+
public DataType visit(RowType rowType) {
145+
// TODO: support RowType
146+
throw new UnsupportedOperationException("UnSupport RowType now");
147+
}
148+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
/*
2+
* Copyright (c) 2024 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.connector.spark.utils;
18+
19+
import com.alibaba.fluss.config.ConfigOption;
20+
import com.alibaba.fluss.connector.spark.SparkConnectorOptions;
21+
import com.alibaba.fluss.metadata.Schema;
22+
import com.alibaba.fluss.metadata.TableDescriptor;
23+
import com.alibaba.fluss.types.BigIntType;
24+
import com.alibaba.fluss.types.BooleanType;
25+
import com.alibaba.fluss.types.BytesType;
26+
import com.alibaba.fluss.types.CharType;
27+
import com.alibaba.fluss.types.DataType;
28+
import com.alibaba.fluss.types.DateType;
29+
import com.alibaba.fluss.types.DecimalType;
30+
import com.alibaba.fluss.types.DoubleType;
31+
import com.alibaba.fluss.types.FloatType;
32+
import com.alibaba.fluss.types.IntType;
33+
import com.alibaba.fluss.types.LocalZonedTimestampType;
34+
import com.alibaba.fluss.types.SmallIntType;
35+
import com.alibaba.fluss.types.StringType;
36+
import com.alibaba.fluss.types.TinyIntType;
37+
38+
import org.apache.spark.sql.connector.expressions.Expressions;
39+
import org.apache.spark.sql.connector.expressions.Transform;
40+
import org.apache.spark.sql.types.Metadata;
41+
import org.apache.spark.sql.types.StructField;
42+
import org.apache.spark.sql.types.StructType;
43+
44+
import java.util.ArrayList;
45+
import java.util.Arrays;
46+
import java.util.Collections;
47+
import java.util.HashMap;
48+
import java.util.List;
49+
import java.util.Map;
50+
import java.util.stream.Collectors;
51+
52+
import static com.alibaba.fluss.connector.spark.SparkConnectorOptions.BUCKET_KEY;
53+
import static com.alibaba.fluss.connector.spark.SparkConnectorOptions.BUCKET_NUMBER;
54+
import static com.alibaba.fluss.connector.spark.SparkConnectorOptions.PRIMARY_KEY;
55+
56+
/** Utils for conversion between Spark and Fluss. */
57+
public class SparkConversions {
58+
59+
/** Convert Spark's table to Fluss's table. */
60+
public static TableDescriptor toFlussTable(
61+
StructType sparkSchema, Transform[] partitions, Map<String, String> properties) {
62+
// schema
63+
Schema.Builder schemBuilder = Schema.newBuilder();
64+
65+
if (properties.containsKey(PRIMARY_KEY.key())) {
66+
List<String> primaryKey =
67+
Arrays.stream(properties.get(PRIMARY_KEY.key()).split(","))
68+
.map(String::trim)
69+
.collect(Collectors.toList());
70+
schemBuilder.primaryKey(primaryKey);
71+
}
72+
73+
Schema schema =
74+
schemBuilder
75+
.fromColumns(
76+
Arrays.stream(sparkSchema.fields())
77+
.map(
78+
field ->
79+
new Schema.Column(
80+
field.name(),
81+
SparkConversions.toFlussType(field),
82+
field.getComment()
83+
.getOrElse(() -> null)))
84+
.collect(Collectors.toList()))
85+
.build();
86+
87+
// partition keys
88+
List<String> partitionKeys =
89+
Arrays.stream(partitions)
90+
.map(partition -> partition.references()[0].describe())
91+
.collect(Collectors.toList());
92+
93+
// bucket keys
94+
List<String> bucketKey;
95+
if (properties.containsKey(BUCKET_KEY.key())) {
96+
bucketKey =
97+
Arrays.stream(properties.get(BUCKET_KEY.key()).split(","))
98+
.map(String::trim)
99+
.collect(Collectors.toList());
100+
} else {
101+
// use primary keys - partition keys
102+
bucketKey =
103+
schema.getPrimaryKey()
104+
.map(
105+
pk -> {
106+
List<String> bucketKeys =
107+
new ArrayList<>(pk.getColumnNames());
108+
bucketKeys.removeAll(partitionKeys);
109+
return bucketKeys;
110+
})
111+
.orElse(Collections.emptyList());
112+
}
113+
Integer bucketNum = null;
114+
if (properties.containsKey(BUCKET_NUMBER.key())) {
115+
bucketNum = Integer.parseInt(properties.get(BUCKET_NUMBER.key()));
116+
}
117+
118+
// process properties
119+
Map<String, String> flussTableProperties =
120+
convertSparkOptionsToFlussTableProperties(properties);
121+
122+
// comment
123+
String comment = properties.get("comment");
124+
125+
// TODO: process watermark
126+
return TableDescriptor.builder()
127+
.schema(schema)
128+
.partitionedBy(partitionKeys)
129+
.distributedBy(bucketNum, bucketKey)
130+
.comment(comment)
131+
.properties(flussTableProperties)
132+
.customProperties(properties)
133+
.build();
134+
}
135+
136+
/** Convert Fluss's schema to Spark's schema. */
137+
public static StructType toSparkSchema(Schema flussSchema) {
138+
StructField[] fields = new StructField[flussSchema.getColumns().size()];
139+
for (int i = 0; i < flussSchema.getColumns().size(); i++) {
140+
fields[i] = toSparkStructField(flussSchema.getColumns().get(i));
141+
}
142+
return new StructType(fields);
143+
}
144+
145+
/** Convert Fluss's partition keys to Spark's transforms. */
146+
public static Transform[] toSparkTransforms(List<String> partitionKeys) {
147+
if (partitionKeys == null || partitionKeys.isEmpty()) {
148+
return new Transform[0];
149+
}
150+
Transform[] transforms = new Transform[partitionKeys.size()];
151+
for (int i = 0; i < partitionKeys.size(); i++) {
152+
transforms[i] = Expressions.identity(partitionKeys.get(i));
153+
}
154+
return transforms;
155+
}
156+
157+
/** Convert Fluss's column to Spark's field. */
158+
public static StructField toSparkStructField(Schema.Column flussColumn) {
159+
StructField field =
160+
new StructField(
161+
flussColumn.getName(),
162+
toSparkType(flussColumn.getDataType()),
163+
flussColumn.getDataType().isNullable(),
164+
Metadata.empty());
165+
return flussColumn.getComment().isPresent()
166+
? field.withComment(flussColumn.getComment().get())
167+
: field;
168+
}
169+
170+
/** Convert Fluss's type to Spark's type. */
171+
public static org.apache.spark.sql.types.DataType toSparkType(DataType flussDataType) {
172+
return flussDataType.accept(FlussTypeToSparkType.INSTANCE);
173+
}
174+
175+
/** Convert Spark's type to Fluss's type. */
176+
public static DataType toFlussType(StructField sparkField) {
177+
org.apache.spark.sql.types.DataType sparkType = sparkField.dataType();
178+
boolean isNullable = sparkField.nullable();
179+
if (sparkType instanceof org.apache.spark.sql.types.CharType) {
180+
return new CharType(
181+
isNullable, ((org.apache.spark.sql.types.CharType) sparkType).length());
182+
} else if (sparkType instanceof org.apache.spark.sql.types.StringType) {
183+
return new StringType(isNullable);
184+
} else if (sparkType instanceof org.apache.spark.sql.types.BooleanType) {
185+
return new BooleanType(isNullable);
186+
} else if (sparkType instanceof org.apache.spark.sql.types.BinaryType) {
187+
return new BytesType(isNullable);
188+
} else if (sparkType instanceof org.apache.spark.sql.types.DecimalType) {
189+
return new DecimalType(
190+
isNullable,
191+
((org.apache.spark.sql.types.DecimalType) sparkType).precision(),
192+
((org.apache.spark.sql.types.DecimalType) sparkType).scale());
193+
} else if (sparkType instanceof org.apache.spark.sql.types.ByteType) {
194+
return new TinyIntType(isNullable);
195+
} else if (sparkType instanceof org.apache.spark.sql.types.ShortType) {
196+
return new SmallIntType(isNullable);
197+
} else if (sparkType instanceof org.apache.spark.sql.types.IntegerType) {
198+
return new IntType(isNullable);
199+
} else if (sparkType instanceof org.apache.spark.sql.types.LongType) {
200+
return new BigIntType(isNullable);
201+
} else if (sparkType instanceof org.apache.spark.sql.types.FloatType) {
202+
return new FloatType(isNullable);
203+
} else if (sparkType instanceof org.apache.spark.sql.types.DoubleType) {
204+
return new DoubleType(isNullable);
205+
} else if (sparkType instanceof org.apache.spark.sql.types.DateType) {
206+
return new DateType(isNullable);
207+
} else if (sparkType instanceof org.apache.spark.sql.types.TimestampType) {
208+
return new LocalZonedTimestampType(isNullable, 9);
209+
} else {
210+
// TODO: support more data type
211+
throw new UnsupportedOperationException("Unsupported data type: " + sparkType);
212+
}
213+
}
214+
215+
private static Map<String, String> convertSparkOptionsToFlussTableProperties(
216+
Map<String, String> options) {
217+
Map<String, String> properties = new HashMap<>();
218+
for (ConfigOption<?> option : SparkConnectorOptions.TABLE_OPTIONS) {
219+
if (options.containsKey(option.key())) {
220+
properties.put(option.key(), options.get(option.key()));
221+
}
222+
}
223+
return properties;
224+
}
225+
}

‎fluss-connectors/fluss-connector-spark/fluss-connector-spark-common/src/test/java/com/alibaba/fluss/connector/spark/catalog/SparkCatalogITCase.java

+410
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
/*
2+
* Copyright (c) 2024 Alibaba Group Holding Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.alibaba.fluss.connector.spark.utils;
18+
19+
import com.alibaba.fluss.connector.spark.SparkConnectorOptions;
20+
import com.alibaba.fluss.metadata.Schema;
21+
import com.alibaba.fluss.metadata.TableDescriptor;
22+
import com.alibaba.fluss.types.DataTypes;
23+
24+
import org.apache.spark.sql.connector.expressions.Transform;
25+
import org.apache.spark.sql.types.Metadata;
26+
import org.apache.spark.sql.types.StructField;
27+
import org.apache.spark.sql.types.StructType;
28+
import org.junit.jupiter.api.Test;
29+
30+
import java.util.ArrayList;
31+
import java.util.Arrays;
32+
import java.util.HashMap;
33+
import java.util.List;
34+
import java.util.Map;
35+
36+
import static org.assertj.core.api.Assertions.assertThat;
37+
38+
/** Test for {@link SparkConversions}. */
39+
public class SparkConversionsTest {
40+
41+
@Test
42+
void testTypeConversion() {
43+
// fluss columns
44+
List<Schema.Column> flussColumns =
45+
Arrays.asList(
46+
new Schema.Column("a", DataTypes.BOOLEAN().copy(false), null),
47+
new Schema.Column("b", DataTypes.TINYINT().copy(false), null),
48+
new Schema.Column("c", DataTypes.SMALLINT(), "comment1"),
49+
new Schema.Column("d", DataTypes.INT(), "comment2"),
50+
new Schema.Column("e", DataTypes.BIGINT(), null),
51+
new Schema.Column("f", DataTypes.FLOAT(), null),
52+
new Schema.Column("g", DataTypes.DOUBLE(), null),
53+
new Schema.Column("h", DataTypes.CHAR(1), null),
54+
new Schema.Column("i", DataTypes.STRING(), null),
55+
new Schema.Column("j", DataTypes.DECIMAL(10, 2), null),
56+
new Schema.Column("k", DataTypes.BYTES(), null),
57+
new Schema.Column("l", DataTypes.DATE(), null),
58+
new Schema.Column("m", DataTypes.TIMESTAMP_LTZ(9), null));
59+
60+
// spark columns
61+
List<StructField> sparkColumns =
62+
Arrays.asList(
63+
new StructField(
64+
"a",
65+
org.apache.spark.sql.types.DataTypes.BooleanType,
66+
false,
67+
Metadata.empty()),
68+
new StructField(
69+
"b",
70+
org.apache.spark.sql.types.DataTypes.ByteType,
71+
false,
72+
Metadata.empty()),
73+
new StructField(
74+
"c",
75+
org.apache.spark.sql.types.DataTypes.ShortType,
76+
true,
77+
Metadata.empty())
78+
.withComment("comment1"),
79+
new StructField(
80+
"d",
81+
org.apache.spark.sql.types.DataTypes.IntegerType,
82+
true,
83+
Metadata.empty())
84+
.withComment("comment2"),
85+
new StructField(
86+
"e",
87+
org.apache.spark.sql.types.DataTypes.LongType,
88+
true,
89+
Metadata.empty()),
90+
new StructField(
91+
"f",
92+
org.apache.spark.sql.types.DataTypes.FloatType,
93+
true,
94+
Metadata.empty()),
95+
new StructField(
96+
"g",
97+
org.apache.spark.sql.types.DataTypes.DoubleType,
98+
true,
99+
Metadata.empty()),
100+
new StructField(
101+
"h",
102+
new org.apache.spark.sql.types.CharType(1),
103+
true,
104+
Metadata.empty()),
105+
new StructField(
106+
"i",
107+
org.apache.spark.sql.types.DataTypes.StringType,
108+
true,
109+
Metadata.empty()),
110+
new StructField(
111+
"j",
112+
org.apache.spark.sql.types.DataTypes.createDecimalType(10, 2),
113+
true,
114+
Metadata.empty()),
115+
new StructField(
116+
"k",
117+
org.apache.spark.sql.types.DataTypes.BinaryType,
118+
true,
119+
Metadata.empty()),
120+
new StructField(
121+
"l",
122+
org.apache.spark.sql.types.DataTypes.DateType,
123+
true,
124+
Metadata.empty()),
125+
new StructField(
126+
"m",
127+
org.apache.spark.sql.types.DataTypes.TimestampType,
128+
true,
129+
Metadata.empty()));
130+
131+
// test from fluss columns to spark columns
132+
List<StructField> actualSparkColumns = new ArrayList<>();
133+
for (Schema.Column flussColumn : flussColumns) {
134+
actualSparkColumns.add(SparkConversions.toSparkStructField(flussColumn));
135+
}
136+
assertThat(actualSparkColumns).isEqualTo(sparkColumns);
137+
138+
// test from spark columns to fluss columns
139+
List<Schema.Column> actualFlussColumns = new ArrayList<>();
140+
for (StructField sparkColumn : sparkColumns) {
141+
actualFlussColumns.add(
142+
new Schema.Column(
143+
sparkColumn.name(),
144+
SparkConversions.toFlussType(sparkColumn),
145+
sparkColumn.getComment().getOrElse(() -> null)));
146+
}
147+
assertThat(actualFlussColumns).isEqualTo(flussColumns);
148+
149+
// test TIME and TIMESTAMP type
150+
assertThat(SparkConversions.toSparkType(DataTypes.TIME()))
151+
.isEqualTo(org.apache.spark.sql.types.DataTypes.LongType);
152+
assertThat(SparkConversions.toSparkType(DataTypes.TIMESTAMP()))
153+
.isEqualTo(org.apache.spark.sql.types.DataTypes.LongType);
154+
}
155+
156+
@Test
157+
void testTableConversion() {
158+
StructField[] sparkColumns =
159+
new StructField[] {
160+
new StructField(
161+
"order_id",
162+
org.apache.spark.sql.types.DataTypes.LongType,
163+
false,
164+
Metadata.empty()),
165+
new StructField(
166+
"order_name",
167+
org.apache.spark.sql.types.DataTypes.StringType,
168+
true,
169+
Metadata.empty())
170+
};
171+
172+
// test convert spark table to fluss table
173+
StructType structType = new StructType(sparkColumns);
174+
Transform[] transforms = new Transform[0];
175+
Map<String, String> properties = new HashMap<>();
176+
properties.put(SparkConnectorOptions.PRIMARY_KEY.key(), "order_id");
177+
properties.put("comment", "test comment");
178+
properties.put("k1", "v1");
179+
properties.put("k2", "v2");
180+
TableDescriptor flussTable =
181+
SparkConversions.toFlussTable(structType, transforms, properties);
182+
183+
String expectFlussTableString =
184+
"TableDescriptor{schema=("
185+
+ "order_id BIGINT NOT NULL,"
186+
+ "order_name STRING,"
187+
+ "CONSTRAINT PK_order_id PRIMARY KEY (order_id)"
188+
+ "), comment='test comment', partitionKeys=[], "
189+
+ "tableDistribution={bucketKeys=[order_id] bucketCount=null}, "
190+
+ "properties={}, "
191+
+ "customProperties={comment=test comment, primary.key=order_id, k1=v1, k2=v2}"
192+
+ "}";
193+
assertThat(flussTable.toString()).isEqualTo(expectFlussTableString);
194+
195+
// test convert fluss table to spark table
196+
StructType convertedSparkSchema = SparkConversions.toSparkSchema(flussTable.getSchema());
197+
Transform[] convertedTransforms =
198+
SparkConversions.toSparkTransforms(flussTable.getPartitionKeys());
199+
assertThat(convertedSparkSchema.fields()).isEqualTo(sparkColumns);
200+
assertThat(convertedTransforms).isEqualTo(transforms);
201+
}
202+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
Copyright (c) 2024 Alibaba Group Holding Ltd.
4+
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
9+
http://www.apache.org/licenses/LICENSE-2.0
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
See the License for the specific language governing permissions and
15+
limitations under the License.
16+
-->
17+
18+
<project xmlns="http://maven.apache.org/POM/4.0.0"
19+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
20+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
21+
<modelVersion>4.0.0</modelVersion>
22+
<parent>
23+
<groupId>com.alibaba.fluss</groupId>
24+
<artifactId>fluss-connectors</artifactId>
25+
<version>0.6-SNAPSHOT</version>
26+
</parent>
27+
28+
<artifactId>fluss-connector-spark</artifactId>
29+
<packaging>pom</packaging>
30+
31+
<name>Fluss : Connector : Spark</name>
32+
33+
<properties>
34+
<scala.binary.version>2.12</scala.binary.version>
35+
</properties>
36+
37+
<modules>
38+
<module>fluss-connector-spark-common</module>
39+
<module>fluss-connector-spark-3.3</module>
40+
</modules>
41+
</project>

‎fluss-connectors/pom.xml

+1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
<modules>
3333
<module>fluss-connector-flink</module>
34+
<module>fluss-connector-spark</module>
3435
</modules>
3536

3637
<!-- override these root dependencies as 'provided', so they don't end up

0 commit comments

Comments
 (0)
Please sign in to comment.