Skip to content

Commit c5c0993

Browse files
rambleraptorsungwyFokko
authored
Add ViewMetadata read support and create_view to REST Catalog (#2154)
<!-- Thanks for opening a pull request! --> <!-- In the case this PR will resolve an issue, please replace ${GITHUB_ISSUE_ID} below with the actual Github issue id. --> <!-- Closes #${GITHUB_ISSUE_ID} --> Part of #818 This adds `create_view` to the REST Catalog. As part of this, it also defines the View and ViewMetadata class. # Rationale for this change PyIceberg's REST Catalog doesn't support `create_view`. Furthermore, PyIceberg doesn't support views at all. This is the first part in getting views supported. # Are these changes tested? Unit tests included # Are there any user-facing changes? Added Iceberg REST Catalog support for `create_view` <!-- In the case of user-facing changes, please add the changelog label. --> --------- Co-authored-by: Sung Yun <107272191+sungwy@users.noreply.github.com> Co-authored-by: Fokko Driesprong <fokko@apache.org>
1 parent 2ca06f5 commit c5c0993

File tree

15 files changed

+493
-13
lines changed

15 files changed

+493
-13
lines changed

pyiceberg/catalog/__init__.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@
6868
)
6969
from pyiceberg.utils.config import Config, merge_config
7070
from pyiceberg.utils.properties import property_as_bool
71+
from pyiceberg.view import View
72+
from pyiceberg.view.metadata import ViewVersion
7173

7274
if TYPE_CHECKING:
7375
import pyarrow as pa
@@ -683,6 +685,31 @@ def drop_view(self, identifier: str | Identifier) -> None:
683685
NoSuchViewError: If a view with the given name does not exist.
684686
"""
685687

688+
@abstractmethod
689+
def create_view(
690+
self,
691+
identifier: str | Identifier,
692+
schema: Schema | pa.Schema,
693+
view_version: ViewVersion,
694+
location: str | None = None,
695+
properties: Properties = EMPTY_DICT,
696+
) -> View:
697+
"""Create a view.
698+
699+
Args:
700+
identifier (str | Identifier): View identifier.
701+
schema (Schema): View's schema.
702+
view_version (ViewVersion): The format version for the view.
703+
location (str | None): Location for the view. Optional Argument.
704+
properties (Properties): View properties that can be a string based dictionary.
705+
706+
Returns:
707+
View: the created view instance.
708+
709+
Raises:
710+
ViewAlreadyExistsError: If a view with the name already exists.
711+
"""
712+
686713
@staticmethod
687714
def identifier_to_tuple(identifier: str | Identifier) -> Identifier:
688715
"""Parse an identifier to a tuple.
@@ -903,6 +930,16 @@ def purge_table(self, identifier: str | Identifier) -> None:
903930
delete_files(io, prev_metadata_files, PREVIOUS_METADATA)
904931
delete_files(io, {table.metadata_location}, METADATA)
905932

933+
def create_view(
934+
self,
935+
identifier: str | Identifier,
936+
schema: Schema | pa.Schema,
937+
view_version: ViewVersion,
938+
location: str | None = None,
939+
properties: Properties = EMPTY_DICT,
940+
) -> View:
941+
raise NotImplementedError
942+
906943
def _create_staged_table(
907944
self,
908945
identifier: str | Identifier,

pyiceberg/catalog/bigquery_metastore.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17+
from __future__ import annotations
18+
1719
import json
18-
from typing import TYPE_CHECKING, Any, Union
20+
from typing import TYPE_CHECKING, Any
1921

2022
from google.api_core.exceptions import NotFound
2123
from google.cloud.bigquery import Client, Dataset, DatasetReference, TableReference
@@ -101,7 +103,7 @@ def __init__(self, name: str, **properties: str):
101103
def create_table(
102104
self,
103105
identifier: str | Identifier,
104-
schema: Union[Schema, "pa.Schema"],
106+
schema: Schema | pa.Schema,
105107
location: str | None = None,
106108
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
107109
sort_order: SortOrder = UNSORTED_SORT_ORDER,
@@ -272,7 +274,7 @@ def register_table(self, identifier: str | Identifier, metadata_location: str) -
272274
"""Register a new table using existing metadata.
273275
274276
Args:
275-
identifier (Union[str, Identifier]): Table identifier for the table
277+
identifier (str | Identifier): Table identifier for the table
276278
metadata_location (str): The location to the metadata
277279
278280
Returns:

pyiceberg/catalog/dynamodb.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@
5959
)
6060
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
6161
from pyiceberg.utils.properties import get_first_property_value
62+
from pyiceberg.view import View
63+
from pyiceberg.view.metadata import ViewVersion
6264

6365
if TYPE_CHECKING:
6466
import pyarrow as pa
@@ -537,6 +539,16 @@ def update_namespace_properties(
537539

538540
return properties_update_summary
539541

542+
def create_view(
543+
self,
544+
identifier: str | Identifier,
545+
schema: Union[Schema, "pa.Schema"],
546+
view_version: ViewVersion,
547+
location: str | None = None,
548+
properties: Properties = EMPTY_DICT,
549+
) -> View:
550+
raise NotImplementedError
551+
540552
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
541553
raise NotImplementedError
542554

pyiceberg/catalog/glue.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@
8585
UUIDType,
8686
)
8787
from pyiceberg.utils.properties import get_first_property_value, property_as_bool
88+
from pyiceberg.view import View
89+
from pyiceberg.view.metadata import ViewVersion
8890

8991
if TYPE_CHECKING:
9092
import pyarrow as pa
@@ -809,6 +811,16 @@ def update_namespace_properties(
809811

810812
return properties_update_summary
811813

814+
def create_view(
815+
self,
816+
identifier: str | Identifier,
817+
schema: Union[Schema, "pa.Schema"],
818+
view_version: ViewVersion,
819+
location: str | None = None,
820+
properties: Properties = EMPTY_DICT,
821+
) -> View:
822+
raise NotImplementedError
823+
812824
def list_views(self, namespace: str | Identifier) -> list[Identifier]:
813825
raise NotImplementedError
814826

pyiceberg/catalog/hive.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@
110110
UUIDType,
111111
)
112112
from pyiceberg.utils.properties import property_as_bool, property_as_float
113+
from pyiceberg.view import View
114+
from pyiceberg.view.metadata import ViewVersion
113115

114116
if TYPE_CHECKING:
115117
import pyarrow as pa
@@ -434,6 +436,16 @@ def create_table(
434436

435437
return self._convert_hive_into_iceberg(hive_table)
436438

439+
def create_view(
440+
self,
441+
identifier: str | Identifier,
442+
schema: Union[Schema, "pa.Schema"],
443+
view_version: ViewVersion,
444+
location: str | None = None,
445+
properties: Properties = EMPTY_DICT,
446+
) -> View:
447+
raise NotImplementedError
448+
437449
def register_table(self, identifier: str | Identifier, metadata_location: str) -> Table:
438450
"""Register a new table using existing metadata.
439451

pyiceberg/catalog/noop.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17+
from __future__ import annotations
18+
1719
from typing import (
1820
TYPE_CHECKING,
19-
Union,
2021
)
2122

2223
from pyiceberg.catalog import Catalog, PropertiesUpdateSummary
@@ -33,6 +34,8 @@
3334
TableUpdate,
3435
)
3536
from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
37+
from pyiceberg.view import View
38+
from pyiceberg.view.metadata import ViewVersion
3639

3740
if TYPE_CHECKING:
3841
import pyarrow as pa
@@ -42,7 +45,7 @@ class NoopCatalog(Catalog):
4245
def create_table(
4346
self,
4447
identifier: str | Identifier,
45-
schema: Union[Schema, "pa.Schema"],
48+
schema: Schema | pa.Schema,
4649
location: str | None = None,
4750
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
4851
sort_order: SortOrder = UNSORTED_SORT_ORDER,
@@ -53,7 +56,7 @@ def create_table(
5356
def create_table_transaction(
5457
self,
5558
identifier: str | Identifier,
56-
schema: Union[Schema, "pa.Schema"],
59+
schema: Schema | pa.Schema,
5760
location: str | None = None,
5861
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
5962
sort_order: SortOrder = UNSORTED_SORT_ORDER,
@@ -130,3 +133,13 @@ def namespace_exists(self, namespace: str | Identifier) -> bool:
130133

131134
def drop_view(self, identifier: str | Identifier) -> None:
132135
raise NotImplementedError
136+
137+
def create_view(
138+
self,
139+
identifier: str | Identifier,
140+
schema: Schema | pa.Schema,
141+
view_version: ViewVersion,
142+
location: str | None = None,
143+
properties: Properties = EMPTY_DICT,
144+
) -> View:
145+
raise NotImplementedError

pyiceberg/catalog/rest/__init__.py

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,13 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17+
from __future__ import annotations
18+
1719
from collections import deque
1820
from enum import Enum
1921
from typing import (
2022
TYPE_CHECKING,
2123
Any,
22-
Union,
2324
)
2425
from urllib.parse import quote, unquote
2526

@@ -55,6 +56,7 @@
5556
NoSuchViewError,
5657
TableAlreadyExistsError,
5758
UnauthorizedError,
59+
ViewAlreadyExistsError,
5860
)
5961
from pyiceberg.io import (
6062
AWS_ACCESS_KEY_ID,
@@ -87,6 +89,8 @@
8789
from pyiceberg.types import transform_dict_value_to_str
8890
from pyiceberg.utils.deprecated import deprecation_message
8991
from pyiceberg.utils.properties import get_first_property_value, get_header_properties, property_as_bool, property_as_int
92+
from pyiceberg.view import View
93+
from pyiceberg.view.metadata import ViewMetadata, ViewVersion
9094

9195
if TYPE_CHECKING:
9296
import pyarrow as pa
@@ -123,7 +127,7 @@ def __str__(self) -> str:
123127
return f"{self.http_method.value} {self.path}"
124128

125129
@classmethod
126-
def from_string(cls, endpoint: str) -> "Endpoint":
130+
def from_string(cls, endpoint: str) -> Endpoint:
127131
elements = endpoint.strip().split(None, 1)
128132
if len(elements) != 2:
129133
raise ValueError(f"Invalid endpoint (must consist of two elements separated by a single space): {endpoint}")
@@ -148,6 +152,7 @@ class Endpoints:
148152
get_token: str = "oauth/tokens"
149153
rename_table: str = "tables/rename"
150154
list_views: str = "namespaces/{namespace}/views"
155+
create_view: str = "namespaces/{namespace}/views"
151156
drop_view: str = "namespaces/{namespace}/views/{view}"
152157
view_exists: str = "namespaces/{namespace}/views/{view}"
153158
plan_table_scan: str = "namespaces/{namespace}/tables/{table}/plan"
@@ -275,6 +280,12 @@ class TableResponse(IcebergBaseModel):
275280
storage_credentials: list[StorageCredential] = Field(alias="storage-credentials", default_factory=list)
276281

277282

283+
class ViewResponse(IcebergBaseModel):
284+
metadata_location: str | None = Field(alias="metadata-location", default=None)
285+
metadata: ViewMetadata
286+
config: Properties = Field(default_factory=dict)
287+
288+
278289
class CreateTableRequest(IcebergBaseModel):
279290
name: str = Field()
280291
location: str | None = Field()
@@ -290,6 +301,18 @@ def transform_properties_dict_value_to_str(cls, properties: Properties) -> dict[
290301
return transform_dict_value_to_str(properties)
291302

292303

304+
class CreateViewRequest(IcebergBaseModel):
305+
name: str = Field()
306+
location: str | None = Field()
307+
view_schema: Schema = Field(alias="schema")
308+
view_version: ViewVersion = Field(alias="view-version")
309+
properties: Properties = Field(default_factory=dict)
310+
311+
@field_validator("properties", mode="before")
312+
def transform_properties_dict_value_to_str(cls, properties: Properties) -> dict[str, str]:
313+
return transform_dict_value_to_str(properties)
314+
315+
293316
class RegisterTableRequest(IcebergBaseModel):
294317
name: str
295318
metadata_location: str = Field(..., alias="metadata-location")
@@ -800,6 +823,12 @@ def _response_to_staged_table(self, identifier_tuple: tuple[str, ...], table_res
800823
catalog=self,
801824
)
802825

826+
def _response_to_view(self, identifier_tuple: tuple[str, ...], view_response: ViewResponse) -> View:
827+
return View(
828+
identifier=identifier_tuple,
829+
metadata=view_response.metadata,
830+
)
831+
803832
def _refresh_token(self) -> None:
804833
# Reactive token refresh is atypical - we should proactively refresh tokens in a separate thread
805834
# instead of retrying on Auth Exceptions. Keeping refresh behavior for the LegacyOAuth2AuthManager
@@ -819,7 +848,7 @@ def _config_headers(self, session: Session) -> None:
819848
def _create_table(
820849
self,
821850
identifier: str | Identifier,
822-
schema: Union[Schema, "pa.Schema"],
851+
schema: Schema | pa.Schema,
823852
location: str | None = None,
824853
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
825854
sort_order: SortOrder = UNSORTED_SORT_ORDER,
@@ -862,7 +891,7 @@ def _create_table(
862891
def create_table(
863892
self,
864893
identifier: str | Identifier,
865-
schema: Union[Schema, "pa.Schema"],
894+
schema: Schema | pa.Schema,
866895
location: str | None = None,
867896
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
868897
sort_order: SortOrder = UNSORTED_SORT_ORDER,
@@ -883,7 +912,7 @@ def create_table(
883912
def create_table_transaction(
884913
self,
885914
identifier: str | Identifier,
886-
schema: Union[Schema, "pa.Schema"],
915+
schema: Schema | pa.Schema,
887916
location: str | None = None,
888917
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
889918
sort_order: SortOrder = UNSORTED_SORT_ORDER,
@@ -901,6 +930,44 @@ def create_table_transaction(
901930
staged_table = self._response_to_staged_table(self.identifier_to_tuple(identifier), table_response)
902931
return CreateTableTransaction(staged_table)
903932

933+
@retry(**_RETRY_ARGS)
934+
def create_view(
935+
self,
936+
identifier: str | Identifier,
937+
schema: Schema | pa.Schema,
938+
view_version: ViewVersion,
939+
location: str | None = None,
940+
properties: Properties = EMPTY_DICT,
941+
) -> View:
942+
iceberg_schema = self._convert_schema_if_needed(schema)
943+
fresh_schema = assign_fresh_schema_ids(iceberg_schema)
944+
945+
namespace_and_view = self._split_identifier_for_path(identifier, IdentifierKind.VIEW)
946+
if location:
947+
location = location.rstrip("/")
948+
949+
request = CreateViewRequest(
950+
name=namespace_and_view["view"],
951+
location=location,
952+
view_schema=fresh_schema,
953+
view_version=view_version,
954+
properties=properties,
955+
)
956+
957+
serialized_json = request.model_dump_json().encode(UTF8)
958+
response = self._session.post(
959+
self.url(Endpoints.create_view, namespace=namespace_and_view["namespace"]),
960+
data=serialized_json,
961+
)
962+
963+
try:
964+
response.raise_for_status()
965+
except HTTPError as exc:
966+
_handle_non_200_response(exc, {409: ViewAlreadyExistsError})
967+
968+
view_response = ViewResponse.model_validate_json(response.text)
969+
return self._response_to_view(self.identifier_to_tuple(identifier), view_response)
970+
904971
@retry(**_RETRY_ARGS)
905972
def register_table(self, identifier: str | Identifier, metadata_location: str) -> Table:
906973
"""Register a new table using existing metadata.

0 commit comments

Comments
 (0)