@@ -46,6 +46,14 @@ def __init__(
46
46
loads : LoadsFunc ,
47
47
changelog_producer : Optional [ChangelogProducer ] = None ,
48
48
) -> None :
49
+ """
50
+ Initializes a new `TimestampedPartitionTransaction`.
51
+
52
+ :param partition: The `TimestampedStorePartition` this transaction belongs to.
53
+ :param dumps: The serialization function for keys/values.
54
+ :param loads: The deserialization function for keys/values.
55
+ :param changelog_producer: Optional `ChangelogProducer` for recording changes.
56
+ """
49
57
super ().__init__ (
50
58
partition = partition ,
51
59
dumps = dumps ,
@@ -61,25 +69,23 @@ def __init__(
61
69
)
62
70
63
71
@validate_transaction_status (PartitionTransactionStatus .STARTED )
64
- def get_last (
65
- self ,
66
- timestamp : int ,
67
- prefix : Any ,
68
- ) -> Optional [Any ]:
72
+ def get_last (self , timestamp : int , prefix : Any ) -> Optional [Any ]:
69
73
"""Get the latest value for a prefix up to a given timestamp.
70
74
71
75
Searches both the transaction's update cache and the underlying RocksDB store
72
76
to find the value associated with the given `prefix` that has the highest
73
77
timestamp less than or equal to the provided `timestamp`.
74
78
75
- The search prioritizes values from the update cache if their timestamps are
76
- more recent than those found in the store.
79
+ The search considers both the update cache and the store. It returns the value
80
+ associated with the key that has the numerically largest timestamp less than
81
+ or equal to the provided `timestamp`. If multiple entries exist for the same
82
+ prefix across the cache and store within the valid time range, the one with
83
+ the highest timestamp is chosen.
77
84
78
85
:param timestamp: The upper bound timestamp (inclusive) in milliseconds.
79
86
:param prefix: The key prefix.
80
87
:return: The deserialized value if found, otherwise None.
81
88
"""
82
-
83
89
prefix = self ._ensure_bytes (prefix )
84
90
min_eligible_timestamp = self ._get_min_eligible_timestamp (prefix )
85
91
@@ -130,7 +136,9 @@ def set_for_timestamp(
130
136
to the parent `set` method. The parent method internally serializes these
131
137
into a combined key before storing the value in the update cache.
132
138
133
- Additionally, it triggers the expiration logic.
139
+ Additionally, it updates the minimum eligible timestamp for the given prefix
140
+ based on the `retention_ms`, which is used later during the flush process to
141
+ expire old data.
134
142
135
143
:param timestamp: Timestamp associated with the value in milliseconds.
136
144
:param value: The value to store.
@@ -144,7 +152,16 @@ def set_for_timestamp(
144
152
)
145
153
self ._set_min_eligible_timestamp (prefix , min_eligible_timestamp )
146
154
147
- def _flush (self , changelog_offset : Optional [int ]):
155
+ def _flush (self , changelog_offset : Optional [int ]) -> None :
156
+ """
157
+ Flushes the transaction.
158
+
159
+ This method first calls `_expire()` to remove outdated entries based on
160
+ their timestamps and retention periods, then calls the parent class's
161
+ `_flush()` method to persist changes.
162
+
163
+ :param changelog_offset: Optional offset for the changelog.
164
+ """
148
165
self ._expire ()
149
166
super ()._flush (changelog_offset = changelog_offset )
150
167
@@ -156,7 +173,6 @@ def _expire(self) -> None:
156
173
This applies to both the in-memory update cache and the underlying
157
174
RocksDB store within the current transaction.
158
175
"""
159
-
160
176
updates = self ._update_cache .get_updates ()
161
177
for prefix , cached in updates .items ():
162
178
min_eligible_timestamp = self ._get_min_eligible_timestamp (prefix )
@@ -185,12 +201,32 @@ def _serialize_key(self, key: Union[int, bytes], prefix: bytes) -> bytes:
185
201
raise ValueError (f"Invalid key type: { type (key )} " )
186
202
187
203
def _get_min_eligible_timestamp (self , prefix : bytes ) -> int :
204
+ """
205
+ Retrieves the minimum eligible timestamp for a given prefix.
206
+
207
+ It first checks an in-memory cache (`self._min_eligible_timestamps`).
208
+ If not found, it queries the underlying RocksDB store using `self.get()`.
209
+ Defaults to 0 if no timestamp is found.
210
+
211
+ :param prefix: The key prefix (bytes).
212
+ :return: The minimum eligible timestamp (int).
213
+ """
188
214
cache = self ._min_eligible_timestamps
189
215
return (
190
216
cache .timestamps .get (prefix ) or self .get (key = cache .key , prefix = prefix ) or 0
191
217
)
192
218
193
219
def _set_min_eligible_timestamp (self , prefix : bytes , timestamp : int ) -> None :
220
+ """
221
+ Sets the minimum eligible timestamp for a given prefix.
222
+
223
+ Updates an in-memory cache (`self._min_eligible_timestamps`) and then
224
+ persists this new minimum to the underlying RocksDB store using `self.set()`.
225
+ The value is stored in a designated column family.
226
+
227
+ :param prefix: The key prefix (bytes).
228
+ :param timestamp: The minimum eligible timestamp (int) to set.
229
+ """
194
230
cache = self ._min_eligible_timestamps
195
231
cache .timestamps [prefix ] = timestamp
196
232
self .set (key = cache .key , value = timestamp , prefix = prefix , cf_name = cache .cf_name )
0 commit comments