Skip to content

Commit 32a6d5f

Browse files
committed
Call _expire from prepare instead of flush
1 parent 43fc1d3 commit 32a6d5f

File tree

1 file changed

+5
-6
lines changed

1 file changed

+5
-6
lines changed

quixstreams/state/rocksdb/timestamped.py

+5-6
Original file line numberDiff line numberDiff line change
@@ -152,18 +152,17 @@ def set_for_timestamp(
152152
)
153153
self._set_min_eligible_timestamp(prefix, min_eligible_timestamp)
154154

155-
def _flush(self, changelog_offset: Optional[int]) -> None:
155+
@validate_transaction_status(PartitionTransactionStatus.STARTED)
156+
def prepare(self, processed_offsets: Optional[dict[str, int]] = None) -> None:
156157
"""
157-
Flushes the transaction.
158-
159158
This method first calls `_expire()` to remove outdated entries based on
160159
their timestamps and retention periods, then calls the parent class's
161-
`_flush()` method to persist changes.
160+
`prepare()` to prepare the transaction for flush.
162161
163-
:param changelog_offset: Optional offset for the changelog.
162+
:param processed_offsets: the dict with <topic: offset> of the latest processed message
164163
"""
165164
self._expire()
166-
super()._flush(changelog_offset=changelog_offset)
165+
super().prepare(processed_offsets=processed_offsets)
167166

168167
def _expire(self) -> None:
169168
"""

0 commit comments

Comments
 (0)