Skip to content

Commit ea4ed5d

Browse files
Omar-SalehOmar Elhadidy
and
Omar Elhadidy
authored
[DELTA] Add e2e tests for catalog owned managed tables (#4414)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description This PR expands on the existing python test script to add more e2e tests to cover more use cases using catalog owned tables. <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> ## How was this patch tested? E2E tests using OSS delta, spark and uc. <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> --------- Co-authored-by: Omar Elhadidy <[email protected]>
1 parent 86a2a55 commit ea4ed5d

File tree

2 files changed

+219
-41
lines changed

2 files changed

+219
-41
lines changed

python/delta/integration_tests/unity-catalog-commit-coordinator-integration-tests.py

Lines changed: 216 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,27 @@
1818
import sys
1919
import threading
2020
import json
21+
import unittest
2122

22-
from pyspark.sql import SparkSession
23-
import time
23+
import py4j.protocol
24+
from pyspark.sql import SparkSession, DataFrame
25+
import datetime
2426
import uuid
2527

28+
from pyspark.sql.functions import lit
29+
from pyspark.sql.types import IntegerType, StructType, StructField
30+
from pyspark.testing import assertDataFrameEqual
31+
2632
"""
2733
Run this script in root dir of repository:
2834
2935
===== Mandatory input from user =====
30-
export CATALOG_NAME=___
31-
export CATALOG_URI=___
3236
export CATALOG_TOKEN=___
33-
export TABLE_NAME=___
37+
export CATALOG_URI=___
38+
export CATALOG_NAME=___
3439
export SCHEMA=___
40+
export MANAGED_CC_TABLE=___
41+
export MANAGED_NON_CC_TABLE=___
3542
3643
./run-integration-tests.py --use-local --unity-catalog-commit-coordinator-integration-tests \
3744
--packages \
@@ -41,8 +48,10 @@
4148
CATALOG_NAME = os.environ.get("CATALOG_NAME")
4249
CATALOG_TOKEN = os.environ.get("CATALOG_TOKEN")
4350
CATALOG_URI = os.environ.get("CATALOG_URI")
44-
TABLE_NAME = os.environ.get("TABLE_NAME")
51+
MANAGED_CC_TABLE = os.environ.get("MANAGED_CC_TABLE")
4552
SCHEMA = os.environ.get("SCHEMA")
53+
MANAGED_NON_CC_TABLE = os.environ.get("MANAGED_NON_CC_TABLE")
54+
4655

4756
spark = SparkSession \
4857
.builder \
@@ -54,54 +63,220 @@
5463
.config(f"spark.sql.catalog.{CATALOG_NAME}.token", CATALOG_TOKEN) \
5564
.config(f"spark.sql.catalog.{CATALOG_NAME}.uri", CATALOG_URI) \
5665
.config(f"spark.sql.defaultCatalog", CATALOG_NAME) \
66+
.config("spark.databricks.delta.replaceWhere.constraintCheck.enabled", True) \
5767
.config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
5868
.config("spark.databricks.delta.commitcoordinator.unity-catalog.impl",
5969
"org.delta.catalog.UCCoordinatedCommitClient") \
6070
.getOrCreate()
6171

62-
expected_error_tag = "UNITY_CATALOG_EXTERNAL_COORDINATED_COMMITS_REQUEST_DENIED"
72+
MANAGED_CATALOG_OWNED_TABLE_FULL_NAME = f"{CATALOG_NAME}.{SCHEMA}.{MANAGED_CC_TABLE}"
73+
MANAGED_NON_CATALOG_OWNED_TABLE_FULL_NAME = f"{CATALOG_NAME}.{SCHEMA}.{MANAGED_NON_CC_TABLE}"
74+
75+
76+
class UnityCatalogManagedTableTestSuite(unittest.TestCase):
77+
setup_df = spark.createDataFrame([(1, ), (2, ), (3, )],
78+
schema=StructType([StructField("id", IntegerType(), True)]))
79+
80+
def setUp(self) -> None:
81+
self.setup_df.write.mode("overwrite").insertInto(MANAGED_CATALOG_OWNED_TABLE_FULL_NAME)
82+
83+
# Helper methods
84+
def read(self, table_name: str) -> DataFrame:
85+
return spark.read.table(table_name)
86+
87+
def read_with_timestamp(self, timestamp: str, table_name: str) -> DataFrame:
88+
return spark.read.option("timestampAsOf", timestamp).table(table_name)
89+
90+
def read_with_cdf_timestamp(self, timestamp: str, table_name: str) -> DataFrame:
91+
return spark.read.option('readChangeFeed', 'true').option(
92+
"startingTimestamp", timestamp).table(table_name)
93+
94+
def read_with_cdf_version(self, version: int, table_name: str) -> DataFrame:
95+
return spark.read.option('readChangeFeed', 'true').option(
96+
"startingVersion", version).table(table_name)
97+
98+
def create_df_with_rows(self, list_of_rows: list) -> DataFrame:
99+
return spark.createDataFrame(list_of_rows,
100+
schema=StructType([StructField("id", IntegerType(), True)]))
101+
102+
def get_table_history(self, table_name: str) -> DataFrame:
103+
return spark.sql(f"DESCRIBE HISTORY {table_name};")
104+
105+
def append(self, table_name: str) -> None:
106+
single_col_df = spark.createDataFrame(
107+
[(4, ), (5, )], schema=StructType([StructField("id", IntegerType(), True)]))
108+
single_col_df.writeTo(table_name).append()
109+
110+
# DML Operations #
111+
def test_update(self) -> None:
112+
spark.sql(f"UPDATE {MANAGED_CATALOG_OWNED_TABLE_FULL_NAME} SET id=4 WHERE id=1")
113+
updated_tbl = self.read(MANAGED_CATALOG_OWNED_TABLE_FULL_NAME).toDF("id")
114+
assertDataFrameEqual(updated_tbl, self.create_df_with_rows([(4, ), (2, ), (3, )]))
115+
116+
def test_delete(self) -> None:
117+
spark.sql(f"DELETE FROM {MANAGED_CATALOG_OWNED_TABLE_FULL_NAME} where id=1")
118+
updated_tbl = self.read(MANAGED_CATALOG_OWNED_TABLE_FULL_NAME).toDF("id")
119+
assertDataFrameEqual(updated_tbl, self.create_df_with_rows([(2, ), (3, )]))
120+
121+
def test_merge(self) -> None:
122+
spark.sql(f"MERGE INTO {MANAGED_CATALOG_OWNED_TABLE_FULL_NAME} AS target "
123+
f"USING (VALUES 2, 3, 4, 5 AS src(id)) AS src "
124+
f"ON src.id = target.id WHEN NOT MATCHED THEN INSERT *;")
125+
updated_tbl = self.read(MANAGED_CATALOG_OWNED_TABLE_FULL_NAME).toDF("id")
126+
assertDataFrameEqual(updated_tbl,
127+
self.create_df_with_rows([(1, ), (2, ), (3, ), (4, ), (5, )]))
128+
129+
# Utility Functions #
130+
def test_optimize(self) -> None:
131+
spark.sql(f"OPTIMIZE {MANAGED_CATALOG_OWNED_TABLE_FULL_NAME}")
132+
updated_tbl = self.read(MANAGED_CATALOG_OWNED_TABLE_FULL_NAME).toDF("id")
133+
assertDataFrameEqual(updated_tbl, self.create_df_with_rows([(1, ), (2, ), (3, )]))
134+
135+
# DESCRIBE HISTORY is currently unsupported on catalog owned tables.
136+
def test_history(self) -> None:
137+
try:
138+
self.get_table_history(MANAGED_CATALOG_OWNED_TABLE_FULL_NAME).collect()
139+
except py4j.protocol.Py4JJavaError as error:
140+
assert("Path based access is not supported for Catalog-Owned table" in str(error))
141+
142+
def test_time_travel_read(self) -> None:
143+
current_timestamp = str(datetime.datetime.now())
144+
self.append(MANAGED_CATALOG_OWNED_TABLE_FULL_NAME)
145+
updated_tbl = self.read_with_timestamp(current_timestamp,
146+
MANAGED_CATALOG_OWNED_TABLE_FULL_NAME)
147+
assertDataFrameEqual(updated_tbl, self.setup_df)
148+
149+
# Restore is currenlty unsupported on catalog owned tables.
150+
def test_restore(self) -> None:
151+
try:
152+
spark.sql(f"RESTORE TABLE {MANAGED_CATALOG_OWNED_TABLE_FULL_NAME} TO VERSION AS OF 0")
153+
except py4j.protocol.Py4JJavaError as error:
154+
assert("A table's Delta metadata can only be changed from a cluster or warehouse"
155+
in str(error))
156+
157+
# CDC (Timestamps, Versions) are currently unsupported for Catalog owned tables.
158+
def test_change_data_feed_with_timestamp(self) -> None:
159+
timestamp = str(datetime.datetime.now())
160+
self.append(MANAGED_CATALOG_OWNED_TABLE_FULL_NAME)
161+
try:
162+
self.read_with_cdf_timestamp(
163+
timestamp, MANAGED_CATALOG_OWNED_TABLE_FULL_NAME).select("id", "_change_type")
164+
except py4j.protocol.Py4JJavaError as error:
165+
assert("Path based access is not supported for Catalog-Owned table" in str(error))
166+
167+
def test_change_data_feed_with_version(self) -> None:
168+
self.append(MANAGED_CATALOG_OWNED_TABLE_FULL_NAME)
169+
try:
170+
self.read_with_cdf_version(
171+
0,
172+
MANAGED_CATALOG_OWNED_TABLE_FULL_NAME).select("id", "_change_type")
173+
except py4j.protocol.Py4JJavaError as error:
174+
assert("Path based access is not supported for Catalog-Owned table" in str(error))
175+
176+
# Dataframe Writer V1 Tests #
177+
def test_insert_into_append(self) -> None:
178+
single_col_df = spark.createDataFrame([(4, ), (5, )], schema=["id"])
179+
single_col_df.write.mode("append").insertInto(MANAGED_CATALOG_OWNED_TABLE_FULL_NAME)
180+
updated_tbl = self.read(MANAGED_CATALOG_OWNED_TABLE_FULL_NAME).toDF("id")
181+
assertDataFrameEqual(updated_tbl,
182+
self.create_df_with_rows([(1, ), (2, ), (3, ), (4, ), (5, )]))
183+
184+
def test_insert_into_overwrite(self) -> None:
185+
single_col_df = spark.createDataFrame([(5, )], schema=["id"])
186+
single_col_df.write.mode("overwrite").insertInto(
187+
MANAGED_CATALOG_OWNED_TABLE_FULL_NAME, True)
188+
updated_tbl = self.read(MANAGED_CATALOG_OWNED_TABLE_FULL_NAME).toDF("id")
189+
assertDataFrameEqual(updated_tbl, self.create_df_with_rows([(5, )]))
190+
191+
def test_insert_into_overwrite_replace_where(self) -> None:
192+
single_col_df = spark.createDataFrame([(5, )], schema=["id"])
193+
single_col_df.write.mode("overwrite").option("replaceWhere", "id > 1").insertInto(
194+
f"{MANAGED_CATALOG_OWNED_TABLE_FULL_NAME}", True)
195+
updated_tbl = self.read(MANAGED_CATALOG_OWNED_TABLE_FULL_NAME).toDF("id")
196+
assertDataFrameEqual(updated_tbl, self.create_df_with_rows([(1, ), (5, )]))
197+
198+
def test_insert_into_overwrite_partition_overwrite(self) -> None:
199+
single_col_df = spark.createDataFrame([(5,)], schema=["id"])
200+
single_col_df.write.mode("overwrite").option(
201+
"partitionOverwriteMode", "dynamic").insertInto(
202+
MANAGED_CATALOG_OWNED_TABLE_FULL_NAME, True)
203+
updated_tbl = self.read(MANAGED_CATALOG_OWNED_TABLE_FULL_NAME).toDF("id")
204+
assertDataFrameEqual(updated_tbl, self.create_df_with_rows([(5,)]))
63205

206+
def test_save_as_table_append_existing_table(self) -> None:
207+
single_col_df = spark.createDataFrame(
208+
[(4, ), (5, )], schema=StructType([StructField("id", IntegerType(), True)]))
209+
single_col_df.write.format("delta").mode("append").saveAsTable(
210+
MANAGED_CATALOG_OWNED_TABLE_FULL_NAME)
211+
updated_tbl = self.read(MANAGED_CATALOG_OWNED_TABLE_FULL_NAME).toDF("id")
212+
assertDataFrameEqual(updated_tbl,
213+
self.create_df_with_rows([(1, ), (2, ), (3, ), (4, ), (5, )]))
64214

65-
def create() -> None:
66-
try:
67-
spark.sql(f"CREATE TABLE {SCHEMA}.{TABLE_NAME} (a INT)")
68-
except Exception:
69-
print("[UNSUPPORTED] Creating managed table using UC commit coordinator isn't allowed")
215+
# Setting mode to append should work, however cc tables do not allow path based access.
216+
def test_save_append_using_path(self) -> None:
217+
single_col_df = spark.createDataFrame([(4, ), (5, )])
218+
# Fetch managed table path and attempt to side-step UC
219+
# and directly update table using path based access.
220+
tbl_path = spark.sql(
221+
f"DESCRIBE formatted {MANAGED_CATALOG_OWNED_TABLE_FULL_NAME}").collect()[5].data_type
222+
try:
223+
single_col_df.write.format("delta").save(mode="append", path=tbl_path)
224+
except Exception as error:
225+
assert("Forbidden (Service: Amazon S3; Status Code: 403; "
226+
"Error Code: 403 Forbidden;" in str(error))
227+
updated_tbl = self.read(MANAGED_CATALOG_OWNED_TABLE_FULL_NAME).toDF("id")
228+
assertDataFrameEqual(updated_tbl, self.setup_df)
70229

230+
# DataFrame V2 Tests #
231+
def test_append(self) -> None:
232+
self.append(MANAGED_CATALOG_OWNED_TABLE_FULL_NAME)
233+
updated_tbl = self.read(MANAGED_CATALOG_OWNED_TABLE_FULL_NAME).toDF("id")
234+
assertDataFrameEqual(updated_tbl,
235+
self.create_df_with_rows([(1, ), (2, ), (3, ), (4, ), (5, )]))
71236

72-
def insert() -> None:
73-
try:
74-
spark.sql(f"INSERT INTO {SCHEMA}.{TABLE_NAME} VALUES (1), (2)")
75-
except Exception as error:
76-
assert(expected_error_tag in str(error))
77-
print("[UNSUPPORTED] Writing to managed table using UC commit coordinator isn't allowed")
237+
def test_overwrite(self) -> None:
238+
single_col_df = spark.createDataFrame(
239+
[(5,)], schema=StructType([StructField("id", IntegerType(), True)]))
240+
single_col_df.writeTo(MANAGED_CATALOG_OWNED_TABLE_FULL_NAME).overwrite(lit(True))
241+
updated_tbl = self.read(MANAGED_CATALOG_OWNED_TABLE_FULL_NAME).toDF("id")
242+
assertDataFrameEqual(updated_tbl, self.create_df_with_rows([(5,)]))
78243

244+
def test_overwrite_partitions(self) -> None:
245+
single_col_df = spark.createDataFrame(
246+
[(5,)], schema=StructType([StructField("id", IntegerType(), True)]))
247+
single_col_df.writeTo(MANAGED_CATALOG_OWNED_TABLE_FULL_NAME).overwritePartitions()
248+
updated_tbl = self.read(MANAGED_CATALOG_OWNED_TABLE_FULL_NAME).toDF("id")
249+
assertDataFrameEqual(updated_tbl, self.create_df_with_rows([(5,)]))
79250

80-
def update() -> None:
81-
try:
82-
spark.sql(f"UPDATE {SCHEMA}.{TABLE_NAME} SET a=4")
83-
except Exception as error:
84-
assert(expected_error_tag in str(error))
85-
print("[UNSUPPORTED] Updating managed table using UC commit coordinator isn't allowed")
251+
# CREATE TABLE is currently not supported by UC.
252+
def test_create(self) -> None:
253+
single_col_df = spark.createDataFrame(
254+
[(5,)], schema=StructType([StructField("id", IntegerType(), True)]))
255+
try:
256+
single_col_df.writeTo(f"{CATALOG_NAME}.{SCHEMA}.created_table").create()
257+
except py4j.protocol.Py4JJavaError as error:
258+
assert("io.unitycatalog.spark.UCProxy.createTable" in str(error))
86259

260+
# Writing to tables that are not catalog owned is not supported.
261+
def test_write_to_managed_table_without_catalog_owned(self) -> None:
262+
try:
263+
self.append(MANAGED_NON_CATALOG_OWNED_TABLE_FULL_NAME)
264+
except py4j.protocol.Py4JJavaError as error:
265+
assert("[TASK_WRITE_FAILED] Task failed while writing rows to s3" in str(error))
87266

88-
def delete() -> None:
89-
try:
90-
spark.sql(f"DELETE FROM {SCHEMA}.{TABLE_NAME} where a=1")
91-
except Exception as error:
92-
assert(expected_error_tag in str(error))
93-
print("[UNSUPPORTED] Deleting from managed table using UC commit coordinator isn't allowed")
267+
def test_read_from_managed_table_without_catalog_owned(self) -> None:
268+
self.read(MANAGED_NON_CATALOG_OWNED_TABLE_FULL_NAME)
94269

270+
def test_write_to_managed_catalog_owned_table(self) -> None:
271+
self.append(MANAGED_CATALOG_OWNED_TABLE_FULL_NAME)
272+
updated_tbl = self.read(MANAGED_CATALOG_OWNED_TABLE_FULL_NAME).toDF("id")
273+
assertDataFrameEqual(updated_tbl,
274+
self.create_df_with_rows([(1, ), (2, ), (3, ), (4, ), (5, )]))
95275

96-
def read() -> None:
97-
try:
98-
res = spark.sql(f"SELECT * FROM {SCHEMA}.{TABLE_NAME}")
99-
except Exception as error:
100-
assert(expected_error_tag in str(error))
101-
print("[UNSUPPORTED] Reading from managed table using UC commit coordinator isn't allowed")
276+
def test_read_from_managed_catalog_owned_table(self) -> None:
277+
self.read(MANAGED_CATALOG_OWNED_TABLE_FULL_NAME)
278+
updated_tbl = self.read(MANAGED_CATALOG_OWNED_TABLE_FULL_NAME).toDF("id")
279+
assertDataFrameEqual(updated_tbl, self.setup_df)
102280

103-
create()
104-
insert()
105-
update()
106-
read()
107-
delete()
281+
if __name__ == "__main__":
282+
unittest.main()

python/mypy.ini

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,6 @@ ignore_missing_imports = True
3333

3434
[mypy-google.*]
3535
ignore_missing_imports = True
36+
37+
[mypy-py4j.*]
38+
ignore_missing_imports = True

0 commit comments

Comments
 (0)