Skip to content

Commit 8d86634

Browse files
committed
Patch transaction commit logic for concurrent appends
1 parent c084793 commit 8d86634

2 files changed

Lines changed: 60 additions & 9 deletions

File tree

pyiceberg/table/__init__.py

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import warnings
2222
from abc import ABC, abstractmethod
2323
from dataclasses import dataclass
24-
from functools import cached_property
24+
from functools import cached_property, wraps
2525
from itertools import chain
2626
from types import TracebackType
2727
from typing import (
@@ -41,7 +41,9 @@
4141

4242
from pydantic import Field
4343
from sortedcontainers import SortedList
44+
from tenacity import RetryCallState, retry, retry_if_exception_type, stop_after_attempt, wait_random_exponential
4445

46+
from pyiceberg.exceptions import CommitFailedException
4547
import pyiceberg.expressions.parser as parser
4648
from pyiceberg.expressions import (
4749
AlwaysTrue,
@@ -74,6 +76,12 @@
7476
from pyiceberg.schema import Schema
7577
from pyiceberg.table.inspect import InspectTable
7678
from pyiceberg.table.metadata import (
79+
COMMIT_MAX_RETRY_WAIT_MS,
80+
COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
81+
COMMIT_MIN_RETRY_WAIT_MS,
82+
COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
83+
COMMIT_NUM_RETRIES,
84+
COMMIT_NUM_RETRIES_DEFAULT,
7785
INITIAL_SEQUENCE_NUMBER,
7886
TableMetadata,
7987
)
@@ -89,6 +97,7 @@
8997
from pyiceberg.table.update import (
9098
AddPartitionSpecUpdate,
9199
AddSchemaUpdate,
100+
AddSnapshotUpdate,
92101
AddSortOrderUpdate,
93102
AssertCreate,
94103
AssertRefSnapshotId,
@@ -1059,9 +1068,51 @@ def refs(self) -> Dict[str, SnapshotRef]:
10591068
return self.metadata.refs
10601069

10611070
def _do_commit(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequirement, ...]) -> None:
1062-
response = self.catalog.commit_table(self, requirements, updates)
1063-
self.metadata = response.metadata
1064-
self.metadata_location = response.metadata_location
1071+
def _on_error(*_):
1072+
nonlocal updates, requirements
1073+
self.refresh()
1074+
next_seq_num = self.metadata.next_sequence_number()
1075+
updates = tuple(
1076+
(
1077+
update.model_copy(
1078+
update={
1079+
"snapshot": update.snapshot.model_copy(
1080+
update={
1081+
"parent_snaphot_id": self.metadata.current_snapshot_id,
1082+
"sequence_number": next_seq_num,
1083+
}
1084+
),
1085+
},
1086+
)
1087+
if isinstance(update, AddSnapshotUpdate)
1088+
else update
1089+
)
1090+
for update in updates
1091+
)
1092+
requirements = tuple(
1093+
req.model_copy(update={"snapshot_id": self.metadata.current_snapshot_id})
1094+
if isinstance(req, AssertRefSnapshotId)
1095+
else req
1096+
for req in requirements
1097+
)
1098+
1099+
min_wait_ms = int(self.metadata.properties.get(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT))
1100+
max_wait_ms = int(self.metadata.properties.get(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT))
1101+
num_retries = int(self.metadata.properties.get(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
1102+
1103+
@wraps(self._do_commit)
1104+
@retry(
1105+
wait=wait_random_exponential(min=min_wait_ms / 1000, max=max_wait_ms / 1000),
1106+
stop=stop_after_attempt(num_retries),
1107+
retry=retry_if_exception_type(CommitFailedException),
1108+
after=_on_error,
1109+
)
1110+
def _do_commit_inner() -> None:
1111+
response = self.catalog.commit_table(self, requirements, updates)
1112+
self.metadata = response.metadata
1113+
self.metadata_location = response.metadata_location
1114+
1115+
return _do_commit_inner()
10651116

10661117
def __eq__(self, other: Any) -> bool:
10671118
"""Return the equality of two instances of the Table class."""

pyiceberg/table/update/__init__.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,28 +74,28 @@ def __init__(self, transaction: Transaction) -> None:
7474
def _commit(self) -> UpdatesAndRequirements: ...
7575

7676
def commit(self) -> None:
77-
min_wait = int(
77+
min_wait_ms = int(
7878
self._transaction.table_metadata.properties.get(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT)
7979
)
80-
max_wait = int(
80+
max_wait_ms = int(
8181
self._transaction.table_metadata.properties.get(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT)
8282
)
8383
num_retries = int(self._transaction.table_metadata.properties.get(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
8484

8585
@wraps(self.commit)
8686
@retry(
87-
wait=wait_random_exponential(min=min_wait, max=max_wait, exp_base=2),
87+
wait=wait_random_exponential(min=min_wait_ms / 1000, max=max_wait_ms / 1000),
8888
stop=stop_after_attempt(num_retries),
8989
retry=retry_if_exception_type(CommitFailedException),
90-
retry_error_callback=self._cleanup_commit_failure,
90+
after=self._cleanup_commit_failure,
9191
)
9292
def commit_inner():
9393
self._transaction._apply(*self._commit())
9494

9595
return commit_inner()
9696

9797
def _cleanup_commit_failure(self, state: RetryCallState) -> None:
98-
self._transaction = self._transaction._table.catalog.load_table(self._transaction._table.name()).transaction()
98+
self._transaction._table.refresh()
9999

100100
def __exit__(self, _: Any, value: Any, traceback: Any) -> None:
101101
"""Close and commit the change."""

0 commit comments

Comments
 (0)