Skip to content

Commit e183ed2

Browse files
rcjverhoefclaude
andcommitted
feat: support storage-credentials in REST catalog LoadTableResult
The Iceberg REST spec's LoadTableResult includes a storage-credentials field for vended credentials (prefix-scoped temporary STS tokens). PyIceberg was only reading the config field and silently dropping storage-credentials, so vended credentials never reached the FileIO. Per the spec: "Clients must first check whether the respective credentials exist in the storage-credentials field before checking the config for credentials." This adds: - storage_credentials field to TableResponse - Longest-prefix credential resolution (mirroring Java's S3FileIO) - Merging resolved credentials into FileIO with highest precedence Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 09de790 commit e183ed2

File tree

2 files changed

+116
-2
lines changed

2 files changed

+116
-2
lines changed

pyiceberg/catalog/rest/__init__.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
PlanSubmitted,
4141
PlanTableScanRequest,
4242
ScanTasks,
43+
StorageCredential,
4344
)
4445
from pyiceberg.exceptions import (
4546
AuthorizationExpiredError,
@@ -261,6 +262,7 @@ class TableResponse(IcebergBaseModel):
261262
metadata_location: str | None = Field(alias="metadata-location", default=None)
262263
metadata: TableMetadata
263264
config: Properties = Field(default_factory=dict)
265+
storage_credentials: list[StorageCredential] = Field(alias="storage-credentials", default_factory=list)
264266

265267

266268
class CreateTableRequest(IcebergBaseModel):
@@ -396,6 +398,26 @@ def _create_session(self) -> Session:
396398

397399
return session
398400

401+
@staticmethod
402+
def _resolve_storage_credentials(storage_credentials: list[StorageCredential], location: str | None) -> Properties:
403+
"""Resolve the best-matching storage credential by longest prefix match.
404+
405+
Mirrors the Java implementation in S3FileIO.clientForStoragePath() which iterates
406+
over storage credential prefixes and selects the one with the longest match.
407+
408+
See: https://github.com/apache/iceberg/blob/main/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
409+
"""
410+
if not storage_credentials or not location:
411+
return {}
412+
413+
best_match: StorageCredential | None = None
414+
for cred in storage_credentials:
415+
if location.startswith(cred.prefix):
416+
if best_match is None or len(cred.prefix) > len(best_match.prefix):
417+
best_match = cred
418+
419+
return best_match.config if best_match else {}
420+
399421
def _load_file_io(self, properties: Properties = EMPTY_DICT, location: str | None = None) -> FileIO:
400422
merged_properties = {**self.properties, **properties}
401423
if self._auth_manager:
@@ -734,24 +756,34 @@ def add_headers(self, request: PreparedRequest, **kwargs: Any) -> None: # pylin
734756
session.mount(self.uri, SigV4Adapter(**self.properties))
735757

736758
def _response_to_table(self, identifier_tuple: tuple[str, ...], table_response: TableResponse) -> Table:
759+
# Per Iceberg spec: storage-credentials take precedence over config
760+
credential_config = self._resolve_storage_credentials(
761+
table_response.storage_credentials, table_response.metadata_location
762+
)
737763
return Table(
738764
identifier=identifier_tuple,
739765
metadata_location=table_response.metadata_location, # type: ignore
740766
metadata=table_response.metadata,
741767
io=self._load_file_io(
742-
{**table_response.metadata.properties, **table_response.config}, table_response.metadata_location
768+
{**table_response.metadata.properties, **table_response.config, **credential_config},
769+
table_response.metadata_location,
743770
),
744771
catalog=self,
745772
config=table_response.config,
746773
)
747774

748775
def _response_to_staged_table(self, identifier_tuple: tuple[str, ...], table_response: TableResponse) -> StagedTable:
776+
# Per Iceberg spec: storage-credentials take precedence over config
777+
credential_config = self._resolve_storage_credentials(
778+
table_response.storage_credentials, table_response.metadata_location
779+
)
749780
return StagedTable(
750781
identifier=identifier_tuple,
751782
metadata_location=table_response.metadata_location, # type: ignore
752783
metadata=table_response.metadata,
753784
io=self._load_file_io(
754-
{**table_response.metadata.properties, **table_response.config}, table_response.metadata_location
785+
{**table_response.metadata.properties, **table_response.config, **credential_config},
786+
table_response.metadata_location,
755787
),
756788
catalog=self,
757789
)

tests/catalog/test_rest.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2390,3 +2390,85 @@ def test_endpoint_parsing_from_string_with_valid_http_method() -> None:
23902390
def test_endpoint_parsing_from_string_with_invalid_http_method() -> None:
23912391
with pytest.raises(ValueError, match="not a valid HttpMethod"):
23922392
Endpoint.from_string("INVALID /v1/resource")
2393+
2394+
2395+
def test_resolve_storage_credentials_longest_prefix_wins() -> None:
2396+
from pyiceberg.catalog.rest.scan_planning import StorageCredential
2397+
2398+
credentials = [
2399+
StorageCredential(prefix="s3://warehouse/", config={"s3.access-key-id": "short-prefix-key"}),
2400+
StorageCredential(prefix="s3://warehouse/database/table", config={"s3.access-key-id": "long-prefix-key"}),
2401+
]
2402+
result = RestCatalog._resolve_storage_credentials(credentials, "s3://warehouse/database/table/metadata/00001.json")
2403+
assert result == {"s3.access-key-id": "long-prefix-key"}
2404+
2405+
2406+
def test_resolve_storage_credentials_no_match() -> None:
2407+
from pyiceberg.catalog.rest.scan_planning import StorageCredential
2408+
2409+
credentials = [
2410+
StorageCredential(prefix="s3://other-bucket/", config={"s3.access-key-id": "no-match"}),
2411+
]
2412+
result = RestCatalog._resolve_storage_credentials(credentials, "s3://warehouse/database/table/metadata/00001.json")
2413+
assert result == {}
2414+
2415+
2416+
def test_resolve_storage_credentials_empty() -> None:
2417+
assert RestCatalog._resolve_storage_credentials([], "s3://warehouse/foo") == {}
2418+
assert RestCatalog._resolve_storage_credentials([], None) == {}
2419+
2420+
2421+
def test_load_table_with_storage_credentials(rest_mock: Mocker, example_table_metadata_with_snapshot_v1: dict[str, Any]) -> None:
2422+
metadata_location = "s3://warehouse/database/table/metadata/00001.metadata.json"
2423+
rest_mock.get(
2424+
f"{TEST_URI}v1/namespaces/fokko/tables/table",
2425+
json={
2426+
"metadata-location": metadata_location,
2427+
"metadata": example_table_metadata_with_snapshot_v1,
2428+
"config": {
2429+
"s3.access-key-id": "from-config",
2430+
"s3.secret-access-key": "from-config-secret",
2431+
},
2432+
"storage-credentials": [
2433+
{
2434+
"prefix": "s3://warehouse/database/table",
2435+
"config": {
2436+
"s3.access-key-id": "vended-key",
2437+
"s3.secret-access-key": "vended-secret",
2438+
"s3.session-token": "vended-token",
2439+
},
2440+
}
2441+
],
2442+
},
2443+
status_code=200,
2444+
request_headers=TEST_HEADERS,
2445+
)
2446+
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
2447+
table = catalog.load_table(("fokko", "table"))
2448+
2449+
# Storage credentials should override config values
2450+
assert table.io.properties["s3.access-key-id"] == "vended-key"
2451+
assert table.io.properties["s3.secret-access-key"] == "vended-secret"
2452+
assert table.io.properties["s3.session-token"] == "vended-token"
2453+
2454+
2455+
def test_load_table_without_storage_credentials(
2456+
rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any]
2457+
) -> None:
2458+
rest_mock.get(
2459+
f"{TEST_URI}v1/namespaces/fokko/tables/table",
2460+
json=example_table_metadata_with_snapshot_v1_rest_json,
2461+
status_code=200,
2462+
request_headers=TEST_HEADERS,
2463+
)
2464+
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
2465+
actual = catalog.load_table(("fokko", "table"))
2466+
expected = Table(
2467+
identifier=("fokko", "table"),
2468+
metadata_location=example_table_metadata_with_snapshot_v1_rest_json["metadata-location"],
2469+
metadata=TableMetadataV1(**example_table_metadata_with_snapshot_v1_rest_json["metadata"]),
2470+
io=load_file_io(),
2471+
catalog=catalog,
2472+
)
2473+
assert actual.metadata.model_dump() == expected.metadata.model_dump()
2474+
assert actual == expected

0 commit comments

Comments
 (0)