Skip to content

Commit 6d53e24

Browse files
Merge branch 'apache:main' into fix-3008-sigv4-retries
2 parents 078cf86 + 95f6273 commit 6d53e24

File tree

12 files changed

+506
-197
lines changed

12 files changed

+506
-197
lines changed

.github/workflows/python-ci-docs.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@ jobs:
4141
python-version: 3.12
4242
- name: Install UV
4343
uses: astral-sh/setup-uv@v7
44-
- name: Install
45-
run: make docs-install
4644
- name: Build docs
4745
run: make docs-build
4846
- name: Run linters

.github/workflows/python-release-docs.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@ jobs:
3636
python-version: ${{ matrix.python }}
3737
- name: Install UV
3838
uses: astral-sh/setup-uv@v7
39-
- name: Install docs
40-
run: make docs-install
4139
- name: Build docs
4240
run: make docs-build
4341
- name: Copy

Makefile

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,13 @@
1414
# KIND, either express or implied. See the License for the
1515
# specific language governing permissions and limitations
1616
# under the License.
17+
.PHONY: help install install-uv check-license lint \
18+
test test-integration test-integration-setup test-integration-exec test-integration-cleanup test-integration-rebuild \
19+
test-s3 test-adls test-gcs test-coverage coverage-report \
20+
docs-serve docs-build notebook notebook-infra \
21+
clean uv-lock uv-lock-check
22+
23+
.DEFAULT_GOAL := help
1724
# ========================
1825
# Configuration Variables
1926
# ========================
@@ -149,29 +156,23 @@ coverage-report: ## Combine and report coverage
149156

150157
##@ Documentation
151158

152-
docs-install: ## Install docs dependencies (included in default groups)
153-
uv sync $(PYTHON_ARG) --group docs
154-
155159
docs-serve: ## Serve local docs preview (hot reload)
156-
uv run $(PYTHON_ARG) mkdocs serve -f mkdocs/mkdocs.yml --livereload
160+
uv run $(PYTHON_ARG) --group docs mkdocs serve -f mkdocs/mkdocs.yml --livereload
157161

158162
docs-build: ## Build the static documentation site
159-
uv run $(PYTHON_ARG) mkdocs build -f mkdocs/mkdocs.yml --strict
163+
uv run $(PYTHON_ARG) --group docs mkdocs build -f mkdocs/mkdocs.yml --strict
160164

161165
# ========================
162166
# Experimentation
163167
# ========================
164168

165169
##@ Experimentation
166170

167-
notebook-install: ## Install notebook dependencies
168-
uv sync $(PYTHON_ARG) --all-extras --group notebook
169-
170-
notebook: notebook-install ## Launch notebook for experimentation
171-
uv run jupyter lab --notebook-dir=notebooks
171+
notebook: ## Launch notebook for experimentation
172+
uv run $(PYTHON_ARG) --all-extras --group notebook jupyter lab --notebook-dir=notebooks
172173

173-
notebook-infra: notebook-install test-integration-setup ## Launch notebook with integration test infra (Spark, Iceberg Rest Catalog, object storage, etc.)
174-
uv run jupyter lab --notebook-dir=notebooks
174+
notebook-infra: test-integration-setup ## Launch notebook with integration test infra (Spark, Iceberg Rest Catalog, object storage, etc.)
175+
uv run $(PYTHON_ARG) --all-extras --group notebook jupyter lab --notebook-dir=notebooks
175176

176177
# ===================
177178
# Project Maintenance
@@ -189,6 +190,8 @@ clean: ## Remove build artifacts and caches
189190
@find . -name "*.pyo" -exec echo Deleting {} \; -delete
190191
@echo "Cleaning up Jupyter notebook checkpoints..."
191192
@find . -name ".ipynb_checkpoints" -exec echo Deleting {} \; -exec rm -rf {} +
193+
@echo "Cleaning up coverage files..."
194+
@rm -rf .coverage .coverage.* htmlcov/ coverage.xml
192195
@echo "Cleanup complete."
193196

194197
uv-lock: ## Regenerate uv.lock file from pyproject.toml

mkdocs/README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,5 @@ The pyiceberg docs are stored in `docs/`.
2222
## Running docs locally
2323

2424
```sh
25-
make docs-install
2625
make docs-serve
2726
```

pyiceberg/io/fsspec.py

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
TYPE_CHECKING,
3030
Any,
3131
)
32-
from urllib.parse import urlparse
32+
from urllib.parse import ParseResult, urlparse
3333

3434
import requests
3535
from fsspec import AbstractFileSystem
@@ -244,7 +244,7 @@ def _gs(properties: Properties) -> AbstractFileSystem:
244244
)
245245

246246

247-
def _adls(properties: Properties) -> AbstractFileSystem:
247+
def _adls(properties: Properties, hostname: str | None = None) -> AbstractFileSystem:
248248
# https://fsspec.github.io/adlfs/api/
249249

250250
from adlfs import AzureBlobFileSystem
@@ -259,6 +259,10 @@ def _adls(properties: Properties) -> AbstractFileSystem:
259259
if ADLS_SAS_TOKEN not in properties:
260260
properties[ADLS_SAS_TOKEN] = sas_token
261261

262+
# Fallback: extract account_name from URI hostname (e.g. "account.dfs.core.windows.net" -> "account")
263+
if hostname and ADLS_ACCOUNT_NAME not in properties:
264+
properties[ADLS_ACCOUNT_NAME] = hostname.split(".")[0]
265+
262266
class StaticTokenCredential(AsyncTokenCredential):
263267
_DEFAULT_EXPIRY_SECONDS = 3600
264268

@@ -300,7 +304,7 @@ def _hf(properties: Properties) -> AbstractFileSystem:
300304
)
301305

302306

303-
SCHEME_TO_FS = {
307+
SCHEME_TO_FS: dict[str, Callable[..., AbstractFileSystem]] = {
304308
"": _file,
305309
"file": _file,
306310
"s3": _s3,
@@ -313,6 +317,8 @@ def _hf(properties: Properties) -> AbstractFileSystem:
313317
"hf": _hf,
314318
}
315319

320+
_ADLS_SCHEMES = frozenset({"abfs", "abfss", "wasb", "wasbs"})
321+
316322

317323
class FsspecInputFile(InputFile):
318324
"""An input file implementation for the FsspecFileIO.
@@ -414,8 +420,7 @@ class FsspecFileIO(FileIO):
414420
"""A FileIO implementation that uses fsspec."""
415421

416422
def __init__(self, properties: Properties):
417-
self._scheme_to_fs = {}
418-
self._scheme_to_fs.update(SCHEME_TO_FS)
423+
self._scheme_to_fs: dict[str, Callable[..., AbstractFileSystem]] = dict(SCHEME_TO_FS)
419424
self._thread_locals = threading.local()
420425
super().__init__(properties=properties)
421426

@@ -429,7 +434,7 @@ def new_input(self, location: str) -> FsspecInputFile:
429434
FsspecInputFile: An FsspecInputFile instance for the given location.
430435
"""
431436
uri = urlparse(location)
432-
fs = self.get_fs(uri.scheme)
437+
fs = self._get_fs_from_uri(uri)
433438
return FsspecInputFile(location=location, fs=fs)
434439

435440
def new_output(self, location: str) -> FsspecOutputFile:
@@ -442,7 +447,7 @@ def new_output(self, location: str) -> FsspecOutputFile:
442447
FsspecOutputFile: An FsspecOutputFile instance for the given location.
443448
"""
444449
uri = urlparse(location)
445-
fs = self.get_fs(uri.scheme)
450+
fs = self._get_fs_from_uri(uri)
446451
return FsspecOutputFile(location=location, fs=fs)
447452

448453
def delete(self, location: str | InputFile | OutputFile) -> None:
@@ -459,20 +464,30 @@ def delete(self, location: str | InputFile | OutputFile) -> None:
459464
str_location = location
460465

461466
uri = urlparse(str_location)
462-
fs = self.get_fs(uri.scheme)
467+
fs = self._get_fs_from_uri(uri)
463468
fs.rm(str_location)
464469

465-
def get_fs(self, scheme: str) -> AbstractFileSystem:
470+
def _get_fs_from_uri(self, uri: "ParseResult") -> AbstractFileSystem:
471+
"""Get a filesystem from a parsed URI, using hostname for ADLS account resolution."""
472+
if uri.scheme in _ADLS_SCHEMES:
473+
return self.get_fs(uri.scheme, uri.hostname)
474+
return self.get_fs(uri.scheme)
475+
476+
def get_fs(self, scheme: str, hostname: str | None = None) -> AbstractFileSystem:
466477
"""Get a filesystem for a specific scheme, cached per thread."""
467478
if not hasattr(self._thread_locals, "get_fs_cached"):
468479
self._thread_locals.get_fs_cached = lru_cache(self._get_fs)
469480

470-
return self._thread_locals.get_fs_cached(scheme)
481+
return self._thread_locals.get_fs_cached(scheme, hostname)
471482

472-
def _get_fs(self, scheme: str) -> AbstractFileSystem:
483+
def _get_fs(self, scheme: str, hostname: str | None = None) -> AbstractFileSystem:
473484
"""Get a filesystem for a specific scheme."""
474485
if scheme not in self._scheme_to_fs:
475486
raise ValueError(f"No registered filesystem for scheme: {scheme}")
487+
488+
if scheme in _ADLS_SCHEMES:
489+
return _adls(self.properties, hostname)
490+
476491
return self._scheme_to_fs[scheme](self.properties)
477492

478493
def __getstate__(self) -> dict[str, Any]:

pyiceberg/table/__init__.py

Lines changed: 20 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -26,25 +26,12 @@
2626
from functools import cached_property
2727
from itertools import chain
2828
from types import TracebackType
29-
from typing import (
30-
TYPE_CHECKING,
31-
Any,
32-
TypeVar,
33-
)
29+
from typing import TYPE_CHECKING, Any, TypeVar
3430

3531
from pydantic import Field
3632

3733
import pyiceberg.expressions.parser as parser
38-
from pyiceberg.expressions import (
39-
AlwaysFalse,
40-
AlwaysTrue,
41-
And,
42-
BooleanExpression,
43-
EqualTo,
44-
IsNull,
45-
Or,
46-
Reference,
47-
)
34+
from pyiceberg.expressions import AlwaysFalse, AlwaysTrue, And, BooleanExpression, EqualTo, IsNull, Or, Reference
4835
from pyiceberg.expressions.visitors import (
4936
ResidualEvaluator,
5037
_InclusiveMetricsEvaluator,
@@ -54,36 +41,17 @@
5441
manifest_evaluator,
5542
)
5643
from pyiceberg.io import FileIO, load_file_io
57-
from pyiceberg.manifest import (
58-
DataFile,
59-
DataFileContent,
60-
ManifestContent,
61-
ManifestEntry,
62-
ManifestFile,
63-
)
64-
from pyiceberg.partitioning import (
65-
PARTITION_FIELD_ID_START,
66-
UNPARTITIONED_PARTITION_SPEC,
67-
PartitionKey,
68-
PartitionSpec,
69-
)
44+
from pyiceberg.manifest import DataFile, DataFileContent, ManifestContent, ManifestEntry, ManifestFile
45+
from pyiceberg.partitioning import PARTITION_FIELD_ID_START, UNPARTITIONED_PARTITION_SPEC, PartitionKey, PartitionSpec
7046
from pyiceberg.schema import Schema
7147
from pyiceberg.table.delete_file_index import DeleteFileIndex
7248
from pyiceberg.table.inspect import InspectTable
7349
from pyiceberg.table.locations import LocationProvider, load_location_provider
7450
from pyiceberg.table.maintenance import MaintenanceTable
75-
from pyiceberg.table.metadata import (
76-
INITIAL_SEQUENCE_NUMBER,
77-
TableMetadata,
78-
)
79-
from pyiceberg.table.name_mapping import (
80-
NameMapping,
81-
)
51+
from pyiceberg.table.metadata import INITIAL_SEQUENCE_NUMBER, TableMetadata
52+
from pyiceberg.table.name_mapping import NameMapping
8253
from pyiceberg.table.refs import MAIN_BRANCH, SnapshotRef
83-
from pyiceberg.table.snapshots import (
84-
Snapshot,
85-
SnapshotLogEntry,
86-
)
54+
from pyiceberg.table.snapshots import Snapshot, SnapshotLogEntry
8755
from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
8856
from pyiceberg.table.update import (
8957
AddPartitionSpecUpdate,
@@ -107,11 +75,7 @@
10775
update_table_metadata,
10876
)
10977
from pyiceberg.table.update.schema import UpdateSchema
110-
from pyiceberg.table.update.snapshot import (
111-
ManageSnapshots,
112-
UpdateSnapshot,
113-
_FastAppendFiles,
114-
)
78+
from pyiceberg.table.update.snapshot import ManageSnapshots, UpdateSnapshot, _FastAppendFiles
11579
from pyiceberg.table.update.sorting import UpdateSortOrder
11680
from pyiceberg.table.update.spec import UpdateSpec
11781
from pyiceberg.table.update.statistics import UpdateStatistics
@@ -126,9 +90,7 @@
12690
Record,
12791
TableVersion,
12892
)
129-
from pyiceberg.types import (
130-
strtobool,
131-
)
93+
from pyiceberg.types import strtobool
13294
from pyiceberg.utils.concurrent import ExecutorFactory
13395
from pyiceberg.utils.config import Config
13496
from pyiceberg.utils.properties import property_as_bool
@@ -144,11 +106,7 @@
144106
from pyiceberg_core.datafusion import IcebergDataFusionTable
145107

146108
from pyiceberg.catalog import Catalog
147-
from pyiceberg.catalog.rest.scan_planning import (
148-
RESTContentFile,
149-
RESTDeleteFile,
150-
RESTFileScanTask,
151-
)
109+
from pyiceberg.catalog.rest.scan_planning import RESTContentFile, RESTDeleteFile, RESTFileScanTask
152110

153111
ALWAYS_TRUE = AlwaysTrue()
154112
DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE = "downcast-ns-timestamp-to-us-on-write"
@@ -396,17 +354,19 @@ def _set_ref_snapshot(
396354

397355
return updates, requirements
398356

399-
def _build_partition_predicate(self, partition_records: set[Record]) -> BooleanExpression:
357+
def _build_partition_predicate(
358+
self, partition_records: set[Record], spec: PartitionSpec, schema: Schema
359+
) -> BooleanExpression:
400360
"""Build a filter predicate matching any of the input partition records.
401361
402362
Args:
403363
partition_records: A set of partition records to match
364+
spec: An optional partition spec, if none then defaults to current
365+
schema: An optional schema, if none then defaults to current
404366
Returns:
405367
A predicate matching any of the input partition records.
406368
"""
407-
partition_spec = self.table_metadata.spec()
408-
schema = self.table_metadata.schema()
409-
partition_fields = [schema.find_field(field.source_id).name for field in partition_spec.fields]
369+
partition_fields = [schema.find_field(field.source_id).name for field in spec.fields]
410370

411371
expr: BooleanExpression = AlwaysFalse()
412372
for partition_record in partition_records:
@@ -583,7 +543,9 @@ def dynamic_partition_overwrite(
583543
)
584544

585545
partitions_to_overwrite = {data_file.partition for data_file in data_files}
586-
delete_filter = self._build_partition_predicate(partition_records=partitions_to_overwrite)
546+
delete_filter = self._build_partition_predicate(
547+
partition_records=partitions_to_overwrite, spec=self.table_metadata.spec(), schema=self.table_metadata.schema()
548+
)
587549
self.delete(delete_filter=delete_filter, snapshot_properties=snapshot_properties, branch=branch)
588550

589551
with self._append_snapshot_producer(snapshot_properties, branch=branch) as append_files:
@@ -673,11 +635,7 @@ def delete(
673635
case_sensitive: A bool determine if the provided `delete_filter` is case-sensitive
674636
branch: Branch Reference to run the delete operation
675637
"""
676-
from pyiceberg.io.pyarrow import (
677-
ArrowScan,
678-
_dataframe_to_data_files,
679-
_expression_to_complementary_pyarrow,
680-
)
638+
from pyiceberg.io.pyarrow import ArrowScan, _dataframe_to_data_files, _expression_to_complementary_pyarrow
681639

682640
if (
683641
self.table_metadata.properties.get(TableProperties.DELETE_MODE, TableProperties.DELETE_MODE_DEFAULT)

pyiceberg/table/delete_file_index.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ def filter_by_seq(self, seq: int) -> list[DataFile]:
5454
start_idx = bisect_left(self._seqs, seq)
5555
return [delete_file for delete_file, _ in self._files[start_idx:]]
5656

57+
def referenced_delete_files(self) -> list[DataFile]:
58+
self._ensure_indexed()
59+
return [data_file for data_file, _ in self._files]
60+
5761

5862
def _has_path_bounds(delete_file: DataFile) -> bool:
5963
lower = delete_file.lower_bounds
@@ -140,3 +144,14 @@ def for_data_file(self, seq_num: int, data_file: DataFile, partition_key: Record
140144
deletes.update(path_deletes.filter_by_seq(seq_num))
141145

142146
return deletes
147+
148+
def referenced_delete_files(self) -> list[DataFile]:
149+
data_files: list[DataFile] = []
150+
151+
for deletes in self._by_partition.values():
152+
data_files.extend(deletes.referenced_delete_files())
153+
154+
for deletes in self._by_path.values():
155+
data_files.extend(deletes.referenced_delete_files())
156+
157+
return data_files

0 commit comments

Comments
 (0)