Skip to content

Commit 43fc1d3

Browse files
committed
Call prepare on PartitionTransaction context manager exit
1 parent 4e7f736 commit 43fc1d3

File tree

2 files changed

+3
-2
lines changed

2 files changed

+3
-2
lines changed

quixstreams/sources/base/source.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,7 @@ def flush(self, timeout: Optional[float] = None) -> None:
528528
:raises CheckpointProducerTimeout: if any message fails to produce before the timeout
529529
"""
530530
if self._store_transaction:
531-
self._store_transaction.prepare(None)
531+
self._store_transaction.prepare()
532532
self._store_transaction.flush()
533533
self._store_transaction = None
534534
self._store_state = None

quixstreams/state/base/transaction.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ def exists(self, key: K, prefix: bytes, cf_name: str = "default") -> bool:
471471
return self._partition.exists(key_serialized, cf_name=cf_name)
472472

473473
@validate_transaction_status(PartitionTransactionStatus.STARTED)
474-
def prepare(self, processed_offsets: Optional[dict[str, int]]):
474+
def prepare(self, processed_offsets: Optional[dict[str, int]] = None) -> None:
475475
"""
476476
Produce changelog messages to the changelog topic for all changes accumulated
477477
in this transaction and prepare transaction to flush its state to the state
@@ -581,4 +581,5 @@ def __enter__(self):
581581

582582
def __exit__(self, exc_type, exc_val, exc_tb):
583583
if exc_val is None and not self.failed:
584+
self.prepare()
584585
self.flush()

0 commit comments

Comments
 (0)