diff --git a/core/src/main/scala/kafka/log/stream/s3/wal/BootstrapWalV1.java b/core/src/main/scala/kafka/log/stream/s3/wal/BootstrapWalV1.java index 646ac0ca64..e749caefc6 100644 --- a/core/src/main/scala/kafka/log/stream/s3/wal/BootstrapWalV1.java +++ b/core/src/main/scala/kafka/log/stream/s3/wal/BootstrapWalV1.java @@ -180,6 +180,11 @@ public CompletableFuture trim(RecordOffset offset) { return wal.trim(offset); } + @Override + public CompletableFuture truncateTail(RecordOffset offset) { + return wal.truncateTail(offset); + } + private CompletableFuture buildRecoverWal(String kraftWalConfigs, long oldNodeEpoch) { IdURI uri = IdURI.parse(kraftWalConfigs); CompletableFuture cf = walHandle diff --git a/core/src/main/scala/kafka/log/streamaspect/ClientWrapper.java b/core/src/main/scala/kafka/log/streamaspect/ClientWrapper.java index dc63e654c3..ace5f29168 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ClientWrapper.java +++ b/core/src/main/scala/kafka/log/streamaspect/ClientWrapper.java @@ -292,6 +292,11 @@ private void trim0(long newStartOffset, CompletableFuture cf) { }, generalCallbackExecutors); } + @Override + public CompletableFuture truncateTail(long newNextOffset) { + return failureHandle(stream.truncateTail(newNextOffset).thenApplyAsync(nil -> nil, streamManagerCallbackExecutors)); + } + @Override public CompletableFuture close() { return failureHandle(stream.close().thenApplyAsync(nil -> nil, streamManagerCallbackExecutors)); diff --git a/core/src/main/scala/kafka/log/streamaspect/LazyStream.java b/core/src/main/scala/kafka/log/streamaspect/LazyStream.java index e8ddb49885..65e4ab04eb 100644 --- a/core/src/main/scala/kafka/log/streamaspect/LazyStream.java +++ b/core/src/main/scala/kafka/log/streamaspect/LazyStream.java @@ -144,6 +144,11 @@ public CompletableFuture trim(long newStartOffset) { return inner.trim(newStartOffset); } + @Override + public CompletableFuture truncateTail(long newNextOffset) { + return inner.truncateTail(newNextOffset); + } + @Override public CompletableFuture fetch(FetchContext context, long startOffset, long endOffset, int maxBytesHint) { return inner.fetch(context, startOffset, endOffset, maxBytesHint); @@ -228,6 +233,11 @@ public CompletableFuture trim(long newStartOffset) { return CompletableFuture.completedFuture(null); } + @Override + public CompletableFuture truncateTail(long newNextOffset) { + return CompletableFuture.completedFuture(null); + } + @Override public CompletableFuture fetch(FetchContext context, long startOffset, long endOffset, int maxBytesHint) { return CompletableFuture.completedFuture(Collections::emptyList); diff --git a/core/src/main/scala/kafka/log/streamaspect/MemoryClient.java b/core/src/main/scala/kafka/log/streamaspect/MemoryClient.java index cae0cc6249..ddc15b7362 100644 --- a/core/src/main/scala/kafka/log/streamaspect/MemoryClient.java +++ b/core/src/main/scala/kafka/log/streamaspect/MemoryClient.java @@ -165,6 +165,13 @@ public CompletableFuture trim(long newStartOffset) { return CompletableFuture.completedFuture(null); } + @Override + public synchronized CompletableFuture truncateTail(long newNextOffset) { + recordMap = new ConcurrentSkipListMap<>(recordMap.headMap(newNextOffset, false)); + nextOffsetAlloc.updateAndGet(current -> Math.min(current, newNextOffset)); + return CompletableFuture.completedFuture(null); + } + @Override public CompletableFuture close() { return CompletableFuture.completedFuture(null); diff --git a/core/src/main/scala/kafka/log/streamaspect/MetaStream.java b/core/src/main/scala/kafka/log/streamaspect/MetaStream.java index cf73e2c525..3d55a54690 100644 --- a/core/src/main/scala/kafka/log/streamaspect/MetaStream.java +++ b/core/src/main/scala/kafka/log/streamaspect/MetaStream.java @@ -174,6 +174,12 @@ public CompletableFuture trim(long newStartOffset) { return innerStream.trim(newStartOffset); } + @Override + public CompletableFuture truncateTail(long newNextOffset) { + metaCache.entrySet().removeIf(entry -> entry.getValue().offset >= newNextOffset); + return innerStream.truncateTail(newNextOffset); + } + @Override public CompletableFuture close() { if (compactionFuture != null) { diff --git a/s3stream/src/main/java/com/automq/stream/api/Stream.java b/s3stream/src/main/java/com/automq/stream/api/Stream.java index 5e25faeb46..3550748fb7 100644 --- a/s3stream/src/main/java/com/automq/stream/api/Stream.java +++ b/s3stream/src/main/java/com/automq/stream/api/Stream.java @@ -99,6 +99,14 @@ default CompletableFuture fetch(long startOffset, long endOffset, i */ CompletableFuture trim(long newStartOffset); + /** + * Truncate the tail of the stream so that subsequent appends start from {@code newNextOffset}. + * + * @param newNextOffset new next offset after truncation + * @return future completing when truncation finishes + */ + CompletableFuture truncateTail(long newNextOffset); + /** * Close the stream. */ diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index 6e004ed6ff..9d06b9ae5e 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -236,7 +236,7 @@ static RecoveryBlockResult recoverContinuousRecords( ) { RecordOffset logEndOffset = null; Map streamNextOffsets = new HashMap<>(); - Map> streamDiscontinuousRecords = new HashMap<>(); + Map> streamDiscontinuousRecords = new HashMap<>(); LogCache.LogCacheBlock cacheBlock = new LogCache.LogCacheBlock(maxCacheSize); boolean first = true; @@ -249,11 +249,11 @@ static RecoveryBlockResult recoverContinuousRecords( first = false; } StreamRecordBatch streamRecordBatch = recoverResult.record(); - processRecoveredRecord(streamRecordBatch, openingStreamEndOffsets, streamDiscontinuousRecords, cacheBlock, streamNextOffsets, logger); + processRecoveredRecord(streamRecordBatch, recoverResult.recordOffset(), openingStreamEndOffsets, streamDiscontinuousRecords, cacheBlock, streamNextOffsets, logger); } } catch (Throwable e) { // {@link RuntimeIOException} may be thrown by {@code it.next()} - releaseAllRecords(streamDiscontinuousRecords.values()); + releaseRecoverRecords(streamDiscontinuousRecords.values()); releaseAllRecords(cacheBlock.records().values()); throw e; } @@ -278,8 +278,9 @@ static RecoveryBlockResult recoverContinuousRecords( */ private static void processRecoveredRecord( StreamRecordBatch streamRecordBatch, + RecordOffset recordOffset, Map openingStreamEndOffsets, - Map> streamDiscontinuousRecords, + Map> streamDiscontinuousRecords, LogCache.LogCacheBlock cacheBlock, Map streamNextOffsets, Logger logger @@ -294,19 +295,19 @@ private static void processRecoveredRecord( } Long expectedNextOffset = streamNextOffsets.get(streamId); - Queue discontinuousRecords = streamDiscontinuousRecords.get(streamId); + Queue discontinuousRecords = streamDiscontinuousRecords.get(streamId); boolean isContinuous = expectedNextOffset == null || expectedNextOffset == streamRecordBatch.getBaseOffset(); if (!isContinuous) { // unexpected record, put it into discontinuous records queue. if (discontinuousRecords == null) { - discontinuousRecords = new PriorityQueue<>(Comparator.comparingLong(StreamRecordBatch::getBaseOffset)); + discontinuousRecords = new PriorityQueue<>(Comparator.comparingLong(r -> r.record.getBaseOffset())); streamDiscontinuousRecords.put(streamId, discontinuousRecords); } - discontinuousRecords.add(streamRecordBatch); + discontinuousRecords.add(new RecoverRecord(streamRecordBatch, recordOffset)); return; } // continuous record, put it into cache, and check if there is any historical discontinuous records can be polled. - cacheBlock.put(streamRecordBatch); + cacheBlock.put(streamRecordBatch, recordOffset); expectedNextOffset = maybePollDiscontinuousRecords(streamRecordBatch, cacheBlock, discontinuousRecords, logger); streamNextOffsets.put(streamId, expectedNextOffset); } @@ -314,7 +315,7 @@ private static void processRecoveredRecord( private static long maybePollDiscontinuousRecords( StreamRecordBatch streamRecordBatch, LogCache.LogCacheBlock cacheBlock, - Queue discontinuousRecords, + Queue discontinuousRecords, Logger logger ) { long expectedNextOffset = streamRecordBatch.getLastOffset(); @@ -323,25 +324,28 @@ private static long maybePollDiscontinuousRecords( } // check and poll historical discontinuous records. while (!discontinuousRecords.isEmpty()) { - StreamRecordBatch peek = discontinuousRecords.peek(); - if (peek.getBaseOffset() != expectedNextOffset) { + RecoverRecord peek = discontinuousRecords.peek(); + if (peek.record.getBaseOffset() != expectedNextOffset) { break; } // should never happen, log it. - logger.error("[BUG] recover an out of order record, streamId={}, expectedNextOffset={}, record={}", streamRecordBatch.getStreamId(), expectedNextOffset, peek); + logger.error("[BUG] recover an out of order record, streamId={}, expectedNextOffset={}, record={}", streamRecordBatch.getStreamId(), expectedNextOffset, peek.record); discontinuousRecords.poll(); - cacheBlock.put(peek); - expectedNextOffset = peek.getLastOffset(); + cacheBlock.put(peek.record, peek.walOffset); + expectedNextOffset = peek.record.getLastOffset(); } return expectedNextOffset; } - private static void releaseDiscontinuousRecords(Map> streamDiscontinuousRecords, + private static void releaseDiscontinuousRecords(Map> streamDiscontinuousRecords, Logger logger) { streamDiscontinuousRecords.values().stream() .filter(q -> !q.isEmpty()) .peek(q -> logger.info("drop discontinuous records, records={}", q)) - .forEach(S3Storage::releaseRecords); + .forEach(queue -> { + queue.forEach(record -> record.record.release()); + queue.clear(); + }); } /** @@ -375,7 +379,7 @@ private static RecoveryBlockResult filterOutInvalidStreams(LogCache.LogCacheBloc LogCache.LogCacheBlock newCacheBlock = new LogCache.LogCacheBlock(1024L * 1024 * 1024); cacheBlock.records().forEach((streamId, records) -> { if (!invalidStreams.contains(streamId)) { - records.forEach(newCacheBlock::put); + records.forEach(record -> newCacheBlock.put(record, null)); } else { // release invalid records. releaseRecords(records); @@ -388,6 +392,13 @@ private static void releaseAllRecords(Collection> allRecords) { + allRecords.forEach(queue -> { + queue.forEach(record -> record.record.release()); + queue.clear(); + }); + } + private static void releaseRecords(Collection records) { records.forEach(StreamRecordBatch::release); } @@ -648,6 +659,78 @@ public LogCache snapshotReadCache() { return snapshotReadCache; } + @Override + public CompletableFuture truncateTail(long streamId, long newNextOffset) { + if (streamId < 0) { + return CompletableFuture.failedFuture(new IllegalArgumentException("streamId must be non-negative")); + } + if (newNextOffset < 0) { + return CompletableFuture.failedFuture(new IllegalArgumentException("newNextOffset must be non-negative")); + } + try { + RecordOffset walOffset = prepareTruncateTail(streamId, newNextOffset); + if (walOffset == null) { + return CompletableFuture.completedFuture(null); + } + return deltaWAL.truncateTail(walOffset); + } catch (Exception e) { + return CompletableFuture.failedFuture(e); + } + } + + private RecordOffset prepareTruncateTail(long streamId, long newNextOffset) { + synchronized (this) { + validateTruncateTailState(streamId); + cancelBackoffRecords(streamId, newNextOffset); + RecordOffset walOffset = selectFirstRemovedWalOffset(streamId, newNextOffset); + if (walOffset != null) { + deltaWALCache.setLastRecordOffset(walOffset); + } + return walOffset; + } + } + + private void validateTruncateTailState(long streamId) { + if (!walPrepareQueue.isEmpty() || !walCommitQueue.isEmpty()) { + throw new IllegalStateException("Cannot truncate tail when WAL upload tasks are pending"); + } + boolean hasInflight = inflightWALUploadTasks.stream().anyMatch(ctx -> ctx.cache.containsStream(streamId)); + if (hasInflight) { + throw new IllegalStateException("Cannot truncate tail while stream has inflight WAL uploads"); + } + } + + private void cancelBackoffRecords(long streamId, long newNextOffset) { + Iterator iterator = backoffRecords.iterator(); + while (iterator.hasNext()) { + WalWriteRequest request = iterator.next(); + StreamRecordBatch record = request.record; + if (record.getStreamId() >= 0 && record.getStreamId() == streamId && record.getBaseOffset() >= newNextOffset) { + iterator.remove(); + request.cf.completeExceptionally(new IllegalStateException("Append cancelled due to truncate")); + record.release(); + } + } + } + + private RecordOffset selectFirstRemovedWalOffset(long streamId, long newNextOffset) { + Optional deltaResult = deltaWALCache.truncateStreamRecords(streamId, newNextOffset); + Optional snapshotResult = snapshotReadCache == null + ? Optional.empty() + : snapshotReadCache.truncateStreamRecords(streamId, newNextOffset); + RecordOffset walOffset = deltaResult + .map(r -> r.firstRemovedWalOffset) + .filter(Objects::nonNull) + .orElse(null); + if (walOffset == null) { + walOffset = snapshotResult + .map(r -> r.firstRemovedWalOffset) + .filter(Objects::nonNull) + .orElse(null); + } + return walOffset; + } + @SuppressWarnings({"checkstyle:npathcomplexity"}) @WithSpan private CompletableFuture read0(FetchContext context, @@ -806,7 +889,7 @@ private void handleAppendCallback(WalWriteRequest request) { private void handleAppendCallback0(WalWriteRequest request) { final long startTime = System.nanoTime(); request.record.retain(); - boolean full = deltaWALCache.put(request.record); + boolean full = deltaWALCache.put(request.record, request.offset); deltaWALCache.setLastRecordOffset(request.offset); if (full) { // cache block is full, trigger WAL upload. @@ -1062,6 +1145,16 @@ public RecoveryBlockResult(LogCache.LogCacheBlock cacheBlock, RuntimeException e } } + static class RecoverRecord { + final StreamRecordBatch record; + final RecordOffset walOffset; + + RecoverRecord(StreamRecordBatch record, RecordOffset walOffset) { + this.record = record; + this.walOffset = walOffset; + } + } + public static class LazyCommit { final CompletableFuture cf = new CompletableFuture<>(); final long lazyLingerMs; diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java index 35e199c54e..e437bf7a9f 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Stream.java @@ -55,6 +55,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.TimeUnit; @@ -365,6 +366,78 @@ private CompletableFuture trim0(long newStartOffset) { return trimCf; } + @Override + public CompletableFuture truncateTail(long newNextOffset) { + if (snapshotRead()) { + return FutureUtil.failedFuture(new IllegalStateException("truncateTail is not support for readonly stream")); + } + try { + truncateTailSync(newNextOffset); + return CompletableFuture.completedFuture(null); + } catch (RuntimeException e) { + return CompletableFuture.failedFuture(e); + } + } + + private void truncateTailSync(long newNextOffset) { + writeLock.lock(); + try { + ensureTruncateTailAllowed(newNextOffset); + waitForInFlightOperations(); + invokeStorageTruncate(newNextOffset); + nextOffset.updateAndGet(old -> Math.min(old, newNextOffset)); + confirmOffset.updateAndGet(old -> Math.min(old, newNextOffset)); + } finally { + writeLock.unlock(); + } + } + + private void ensureTruncateTailAllowed(long newNextOffset) { + if (!status.isWritable()) { + throw new StreamClientException(ErrorCode.STREAM_ALREADY_CLOSED, logIdent + "stream is not writable"); + } + if (newNextOffset < startOffset()) { + throw new IllegalArgumentException("newNextOffset " + newNextOffset + " is less than start offset " + startOffset()); + } + long currentNext = nextOffset.get(); + if (newNextOffset > currentNext) { + throw new IllegalArgumentException("newNextOffset " + newNextOffset + " is greater than current next offset " + currentNext); + } + } + + private void waitForInFlightOperations() { + List> waits = new ArrayList<>(pendingAppends); + waits.addAll(pendingFetches); + if (lastAppendFuture != null) { + waits.add(lastAppendFuture); + } + if (waits.isEmpty()) { + return; + } + CompletableFuture[] array = waits.toArray(new CompletableFuture[0]); + try { + CompletableFuture.allOf(array).join(); + } catch (CompletionException e) { + throw wrapCompletionException(e); + } + } + + private void invokeStorageTruncate(long newNextOffset) { + try { + storage.truncateTail(streamId, newNextOffset).join(); + } catch (CompletionException e) { + throw wrapCompletionException(e); + } + } + + private RuntimeException wrapCompletionException(CompletionException exception) { + Throwable cause = FutureUtil.cause(exception); + if (cause instanceof RuntimeException) { + return (RuntimeException) cause; + } + return new RuntimeException(cause); + } + @Override public CompletableFuture close() { return close(false); diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java index a0d7bc1a72..c019d87e67 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3StreamClient.java @@ -351,6 +351,11 @@ public CompletableFuture trim(long newStartOffset) { return stream.trim(newStartOffset); } + @Override + public CompletableFuture truncateTail(long newNextOffset) { + return stream.truncateTail(newNextOffset); + } + @Override public CompletableFuture close() { return close(false); diff --git a/s3stream/src/main/java/com/automq/stream/s3/Storage.java b/s3stream/src/main/java/com/automq/stream/s3/Storage.java index 6a6c8376c0..7f7a82734a 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/Storage.java @@ -57,4 +57,13 @@ default CompletableFuture read(long streamId, long startOffset, l * Force stream record in WAL upload to s3 */ CompletableFuture forceUpload(long streamId); + + /** + * Rollback the tail of a stream so that subsequent appends start from {@code newNextOffset}. + * + * @param streamId stream identifier + * @param newNextOffset new next offset after rollback + * @return future complete when rollback finished + */ + CompletableFuture truncateTail(long streamId, long newNextOffset); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java index 6a7954a253..b446a030da 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java @@ -24,6 +24,7 @@ import com.automq.stream.s3.model.StreamRecordBatch; import com.automq.stream.s3.trace.context.TraceContext; import com.automq.stream.s3.wal.RecordOffset; +import com.automq.stream.s3.wal.impl.DefaultRecordOffset; import com.automq.stream.utils.biniarysearch.StreamRecordBatchList; import org.slf4j.Logger; @@ -36,6 +37,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -92,14 +94,14 @@ public LogCache(long capacity, long cacheBlockMaxSize, int maxCacheBlockStreamCo * Put a record batch into the cache. * record batched in the same stream should be put in order. */ - public boolean put(StreamRecordBatch recordBatch) { + public boolean put(StreamRecordBatch recordBatch, RecordOffset walOffset) { long startTime = System.nanoTime(); tryRealFree(); size.addAndGet(recordBatch.occupiedSize()); readLock.lock(); boolean full; try { - full = activeBlock.put(recordBatch); + full = activeBlock.put(recordBatch, walOffset); } finally { readLock.unlock(); } @@ -356,6 +358,37 @@ public void clearStreamRecords(long streamId) { } } + public Optional truncateStreamRecords(long streamId, long newNextOffset) { + if (streamId == MATCH_ALL_STREAMS) { + throw new IllegalArgumentException("streamId must not be MATCH_ALL_STREAMS"); + } + List results = new ArrayList<>(); + readLock.lock(); + try { + for (LogCacheBlock block : blocks) { + TruncateResult result = block.truncateStream(streamId, newNextOffset); + if (result != null) { + results.add(result); + } + } + } finally { + readLock.unlock(); + } + if (results.isEmpty()) { + return Optional.empty(); + } + long freedBytes = results.stream().mapToLong(r -> r.freedBytes).sum(); + if (freedBytes > 0) { + size.addAndGet(-freedBytes); + } + RecordOffset earliest = results.stream() + .map(r -> r.firstRemovedWalOffset) + .filter(Objects::nonNull) + .min((a, b) -> Long.compare(DefaultRecordOffset.of(a).offset(), DefaultRecordOffset.of(b).offset())) + .orElse(null); + return Optional.of(new TruncateResult(freedBytes, earliest)); + } + static boolean isDiscontinuous(LogCacheBlock left, LogCacheBlock right) { for (Map.Entry entry : left.map.entrySet()) { Long streamId = entry.getKey(); @@ -382,6 +415,7 @@ static void mergeBlock(LogCacheBlock left, LogCacheBlock right) { StreamCache rightStreamCache = right.map.get(streamId); if (rightStreamCache != null) { leftStreamCache.records.addAll(rightStreamCache.records); + leftStreamCache.walOffsets.addAll(rightStreamCache.walOffsets); leftStreamCache.endOffset(rightStreamCache.endOffset()); } }); @@ -393,6 +427,16 @@ static void mergeBlock(LogCacheBlock left, LogCacheBlock right) { } } + public static class TruncateResult { + public final long freedBytes; + public final RecordOffset firstRemovedWalOffset; + + public TruncateResult(long freedBytes, RecordOffset firstRemovedWalOffset) { + this.freedBytes = freedBytes; + this.firstRemovedWalOffset = firstRemovedWalOffset; + } + } + public static class LogCacheBlock { private static final AtomicLong BLOCK_ID_ALLOC = new AtomicLong(); final Map map = new ConcurrentHashMap<>(); @@ -423,12 +467,12 @@ public boolean isFull() { return size.get() >= maxSize || map.size() >= maxStreamCount; } - public boolean put(StreamRecordBatch recordBatch) { + public boolean put(StreamRecordBatch recordBatch, RecordOffset walOffset) { map.compute(recordBatch.getStreamId(), (id, cache) -> { if (cache == null) { cache = new StreamCache(); } - cache.add(recordBatch); + cache.add(recordBatch, walOffset); return cache; }); size.addAndGet(recordBatch.occupiedSize()); @@ -452,6 +496,22 @@ StreamRange getStreamRange(Long streamId) { } } + TruncateResult truncateStream(long streamId, long newNextOffset) { + StreamCache cache = map.get(streamId); + if (cache == null) { + return null; + } + TruncateResult result = cache.truncate(newNextOffset); + if (result == null) { + return null; + } + size.addAndGet(-result.freedBytes); + if (cache.isEmpty()) { + map.remove(streamId); + } + return result; + } + public Map> records() { return map.entrySet().stream() .map(e -> Map.entry(e.getKey(), e.getValue().records)) @@ -549,17 +609,19 @@ public StreamRange(long startOffset, long endOffset) { static class StreamCache { List records = new ArrayList<>(); + List walOffsets = new ArrayList<>(); long startOffset = NOOP_OFFSET; long endOffset = NOOP_OFFSET; Map offsetIndexMap = new HashMap<>(); - synchronized void add(StreamRecordBatch recordBatch) { + synchronized void add(StreamRecordBatch recordBatch, RecordOffset walOffset) { if (recordBatch.getBaseOffset() != endOffset && endOffset != NOOP_OFFSET) { RuntimeException ex = new IllegalArgumentException(String.format("streamId=%s record batch base offset mismatch, expect %s, actual %s", recordBatch.getStreamId(), endOffset, recordBatch.getBaseOffset())); LOGGER.error("[FATAL]", ex); } records.add(recordBatch); + walOffsets.add(walOffset); if (startOffset == NOOP_OFFSET) { startOffset = recordBatch.getBaseOffset(); } @@ -593,6 +655,44 @@ synchronized List get(long startOffset, long endOffset, int m return new ArrayList<>(records.subList(startIndex, endIndex)); } + synchronized TruncateResult truncate(long newNextOffset) { + if (records.isEmpty()) { + return null; + } + int removeFrom = -1; + for (int i = 0; i < records.size(); i++) { + StreamRecordBatch record = records.get(i); + if (record.getLastOffset() >= newNextOffset) { + removeFrom = i; + break; + } + } + if (removeFrom == -1) { + return null; + } + RecordOffset firstRemoved = walOffsets.get(removeFrom); + long freed = 0L; + for (int i = records.size() - 1; i >= removeFrom; i--) { + StreamRecordBatch record = records.remove(i); + freed += record.occupiedSize(); + record.release(); + walOffsets.remove(i); + } + Iterator> iterator = offsetIndexMap.entrySet().iterator(); + while (iterator.hasNext()) { + if (iterator.next().getKey() >= newNextOffset) { + iterator.remove(); + } + } + if (records.isEmpty()) { + startOffset = endOffset = NOOP_OFFSET; + } else { + startOffset = records.get(0).getBaseOffset(); + endOffset = records.get(records.size() - 1).getLastOffset(); + } + return new TruncateResult(freed, firstRemoved); + } + int searchStartIndex(long startOffset) { IndexAndCount indexAndCount = offsetIndexMap.get(startOffset); if (indexAndCount != null) { @@ -629,6 +729,11 @@ synchronized StreamRange range() { synchronized void free() { records.forEach(StreamRecordBatch::release); records.clear(); + walOffsets.clear(); + } + + synchronized boolean isEmpty() { + return records.isEmpty(); } synchronized long startOffset() { diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/SnapshotReadCache.java b/s3stream/src/main/java/com/automq/stream/s3/cache/SnapshotReadCache.java index 00da4be47b..14bc34fddd 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/SnapshotReadCache.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/SnapshotReadCache.java @@ -33,6 +33,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Queue; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -112,7 +113,7 @@ void put(CloseableIterator it) { // The LogCacheBlock doesn't accept discontinuous record batches. cache.clearStreamRecords(streamId); } - if (cache.put(batch)) { + if (cache.put(batch, null)) { // the block is full LogCache.LogCacheBlock cacheBlock = cache.archiveCurrentBlock(); cacheBlock.addFreeListener(cacheFreeListener); @@ -138,6 +139,13 @@ public void addEventListener(EventListener eventListener) { cacheFreeListener.addListener(eventListener); } + public Optional truncateStreamRecords(long streamId, long newNextOffset) { + if (cache == null) { + return Optional.empty(); + } + return cache.truncateStreamRecords(streamId, newNextOffset); + } + @EventLoopSafe private void clearStream(Long streamId) { cache.clearStreamRecords(streamId); diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/WriteAheadLog.java b/s3stream/src/main/java/com/automq/stream/s3/wal/WriteAheadLog.java index ae39cfc8cc..33db30f0c5 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/WriteAheadLog.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/WriteAheadLog.java @@ -84,4 +84,14 @@ public interface WriteAheadLog { * @return future complete when trim done. */ CompletableFuture trim(RecordOffset offset); + + /** + * Truncate wal tail so that all records with offset greater than or equal to {@code offset} + * are discarded. After completion, {@link #confirmOffset()} and {@code offset} should align + * with the new logical end of the wal. + * + * @param offset new next offset after truncate. + * @return future completes when truncate is done. + */ + CompletableFuture truncateTail(RecordOffset offset); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/MemoryWriteAheadLog.java b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/MemoryWriteAheadLog.java index dde9ca3e49..db4d4e7512 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/MemoryWriteAheadLog.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/MemoryWriteAheadLog.java @@ -135,4 +135,16 @@ public CompletableFuture trim(RecordOffset offset) { }); return CompletableFuture.completedFuture(null); } -} \ No newline at end of file + + @Override + public CompletableFuture truncateTail(RecordOffset offset) { + long targetOffset = DefaultRecordOffset.of(offset).offset(); + dataMap.tailMap(targetOffset, true) + .forEach((key, value) -> { + dataMap.remove(key); + value.release(); + }); + offsetAlloc.updateAndGet(current -> Math.min(current, targetOffset)); + return CompletableFuture.completedFuture(null); + } +} diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/DefaultWriter.java b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/DefaultWriter.java index 614dc44438..f8f8befb44 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/DefaultWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/DefaultWriter.java @@ -47,8 +47,10 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Queue; import java.util.concurrent.CompletableFuture; @@ -443,6 +445,112 @@ public CompletableFuture trim(RecordOffset recordOffset) throws WALFencedE return trim0(newStartOffset); } + @Override + public CompletableFuture truncateTail(RecordOffset recordOffset) throws WALFencedException { + try { + TailTruncatePlan plan = prepareTruncatePlan(DefaultRecordOffset.of(recordOffset).offset()); + if (plan.noop || plan.deleteObjects.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + return objectStorage.delete(plan.deleteObjects).whenComplete((nil, throwable) -> { + if (throwable != null) { + LOGGER.error("Failed to delete WAL objects when truncating tail: {}", plan.deleteObjects, throwable); + } + }); + } catch (Throwable t) { + return CompletableFuture.failedFuture(t); + } + } + + private TailTruncatePlan prepareTruncatePlan(long targetOffset) throws WALFencedException { + checkStatus(); + if (targetOffset < 0) { + throw new IllegalArgumentException("targetOffset must be non-negative"); + } + lock.writeLock().lock(); + try { + if (targetOffset > nextOffset.get()) { + throw new IllegalArgumentException("targetOffset " + targetOffset + " is greater than current nextOffset " + nextOffset.get()); + } + if (targetOffset == nextOffset.get()) { + return TailTruncatePlan.noop(); + } + if (targetOffset < trimOffset.get()) { + throw new IllegalArgumentException("targetOffset " + targetOffset + " is less than trimmed offset " + trimOffset.get()); + } + if (activeBulk != null || !waitingUploadBulks.isEmpty() || !uploadingBulks.isEmpty()) { + throw new IllegalStateException("Cannot truncate tail while there are pending bulks"); + } + + List deleteObjects = new ArrayList<>(); + long deletedBytes = collectTailObjects(targetOffset, deleteObjects); + deletedBytes += collectHistoricalObjects(targetOffset, deleteObjects); + + objectDataBytes.addAndGet(-deletedBytes); + long alignedTarget = ObjectUtils.ceilAlignOffset(targetOffset); + nextOffset.set(alignedTarget); + flushedOffset.updateAndGet(current -> Math.min(current, alignedTarget)); + return new TailTruncatePlan(deleteObjects.isEmpty(), deleteObjects); + } finally { + lock.writeLock().unlock(); + } + } + + private long collectTailObjects(long targetOffset, List deleteObjects) { + long deletedSize = 0L; + List keysToRemove = new ArrayList<>(); + for (Map.Entry entry : lastRecordOffset2object.tailMap(targetOffset, true).entrySet()) { + WALObject object = entry.getValue(); + ensureNotWithinObject(targetOffset, object); + if (object.startOffset() >= targetOffset) { + deleteObjects.add(new ObjectStorage.ObjectPath(object.bucketId(), object.path())); + deletedSize += object.length(); + keysToRemove.add(entry.getKey()); + } + } + keysToRemove.forEach(lastRecordOffset2object::remove); + return deletedSize; + } + + private long collectHistoricalObjects(long targetOffset, List deleteObjects) { + if (previousObjects.isEmpty()) { + return 0L; + } + long deletedSize = 0L; + List retained = new ArrayList<>(previousObjects.size()); + for (WALObject object : previousObjects) { + ensureNotWithinObject(targetOffset, object); + if (object.startOffset() >= targetOffset) { + deleteObjects.add(new ObjectStorage.ObjectPath(object.bucketId(), object.path())); + deletedSize += object.length(); + } else { + retained.add(object); + } + } + previousObjects = retained; + return deletedSize; + } + + private static class TailTruncatePlan { + final boolean noop; + final List deleteObjects; + + TailTruncatePlan(boolean noop, List deleteObjects) { + this.noop = noop; + this.deleteObjects = deleteObjects; + } + + static TailTruncatePlan noop() { + return new TailTruncatePlan(true, Collections.emptyList()); + } + } + + private void ensureNotWithinObject(long targetOffset, WALObject object) { + if (targetOffset > object.startOffset() && targetOffset < object.endOffset()) { + throw new IllegalArgumentException("targetOffset " + targetOffset + " falls inside WAL object " + object); + } + } + @Override public Iterator recover() { try { diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/NoopWriter.java b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/NoopWriter.java index 5c82720674..5a4a5b5dd5 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/NoopWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/NoopWriter.java @@ -62,6 +62,11 @@ public CompletableFuture trim(RecordOffset recordOffset) throws WALFencedE return CompletableFuture.failedFuture(new UnsupportedOperationException("trim is not supported")); } + @Override + public CompletableFuture truncateTail(RecordOffset recordOffset) throws WALFencedException { + return CompletableFuture.failedFuture(new UnsupportedOperationException("truncateTail is not supported")); + } + @Override public Iterator recover() { throw new UnsupportedOperationException("recover is not supported"); diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/ObjectWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/ObjectWALService.java index 80c3e52faf..79eadb9768 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/ObjectWALService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/ObjectWALService.java @@ -125,6 +125,17 @@ public CompletableFuture trim(RecordOffset offset) { } } + @Override + public CompletableFuture truncateTail(RecordOffset offset) { + log.info("Truncate tail of S3 WAL to offset: {}", offset); + try { + return writer.truncateTail(offset); + } catch (Throwable e) { + log.error("Truncate tail of S3 WAL failed, due to unrecoverable exception.", e); + return CompletableFuture.failedFuture(e); + } + } + @Override public String toString() { return String.format("ObjectWALService{%s@%s-%s-%s}", config.bucketId(), config.nodeId(), config.epoch(), config.type()); diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/Writer.java b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/Writer.java index abc9279594..932162b552 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/Writer.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/object/Writer.java @@ -42,5 +42,7 @@ public interface Writer { CompletableFuture trim(RecordOffset recordOffset) throws WALFencedException; + CompletableFuture truncateTail(RecordOffset recordOffset) throws WALFencedException; + Iterator recover(); } diff --git a/s3stream/src/test/java/com/automq/stream/s3/S3StorageTest.java b/s3stream/src/test/java/com/automq/stream/s3/S3StorageTest.java index e5a1dbb786..ef9105c182 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/S3StorageTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/S3StorageTest.java @@ -58,6 +58,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import static com.automq.stream.s3.TestUtils.random; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -153,14 +154,15 @@ public void testUploadWALObject_sequence() throws ExecutionException, Interrupte Mockito.doAnswer(invocation -> commitCfList.get(commitCfIndex.getAndIncrement())).when(objectManager).commitStreamSetObject(any()); LogCache.LogCacheBlock logCacheBlock1 = new LogCache.LogCacheBlock(1024); - logCacheBlock1.put(newRecord(233L, 10L)); - logCacheBlock1.put(newRecord(234L, 10L)); + AtomicLong walOffsetGen = new AtomicLong(); + logCacheBlock1.put(newRecord(233L, 10L), DefaultRecordOffset.of(0, walOffsetGen.getAndIncrement(), 0)); + logCacheBlock1.put(newRecord(234L, 10L), DefaultRecordOffset.of(0, walOffsetGen.getAndIncrement(), 0)); logCacheBlock1.lastRecordOffset(DefaultRecordOffset.of(0, 10L, 0)); CompletableFuture cf1 = storage.uploadDeltaWAL(logCacheBlock1); LogCache.LogCacheBlock logCacheBlock2 = new LogCache.LogCacheBlock(1024); - logCacheBlock2.put(newRecord(233L, 20L)); - logCacheBlock2.put(newRecord(234L, 20L)); + logCacheBlock2.put(newRecord(233L, 20L), DefaultRecordOffset.of(0, walOffsetGen.getAndIncrement(), 0)); + logCacheBlock2.put(newRecord(234L, 20L), DefaultRecordOffset.of(0, walOffsetGen.getAndIncrement(), 0)); logCacheBlock2.lastRecordOffset(DefaultRecordOffset.of(0, 20L, 0)); CompletableFuture cf2 = storage.uploadDeltaWAL(logCacheBlock2); @@ -331,7 +333,7 @@ public StreamRecordBatch record() { @Override public RecordOffset recordOffset() { - return DefaultRecordOffset.of(0, 0, 0); + return DefaultRecordOffset.of(0, record.getBaseOffset(), 0); } } } \ No newline at end of file diff --git a/s3stream/src/test/java/com/automq/stream/s3/cache/LogCacheTest.java b/s3stream/src/test/java/com/automq/stream/s3/cache/LogCacheTest.java index 557c012736..0952aad296 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/cache/LogCacheTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/cache/LogCacheTest.java @@ -22,15 +22,18 @@ import com.automq.stream.s3.TestUtils; import com.automq.stream.s3.cache.LogCache.LogCacheBlock; import com.automq.stream.s3.model.StreamRecordBatch; +import com.automq.stream.s3.wal.impl.DefaultRecordOffset; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import java.util.List; import java.util.Map; +import java.util.Optional; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @Tag("S3Unit") @@ -39,16 +42,17 @@ public class LogCacheTest { @Test public void testPutGet() { LogCache logCache = new LogCache(1024 * 1024, 1024 * 1024); + long walOffset = 0L; - logCache.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20))); - logCache.put(new StreamRecordBatch(233L, 0L, 11L, 2, TestUtils.random(20))); + logCache.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0)); + logCache.put(new StreamRecordBatch(233L, 0L, 11L, 2, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0)); logCache.archiveCurrentBlock(); - logCache.put(new StreamRecordBatch(233L, 0L, 13L, 2, TestUtils.random(20))); + logCache.put(new StreamRecordBatch(233L, 0L, 13L, 2, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0)); logCache.archiveCurrentBlock(); - logCache.put(new StreamRecordBatch(233L, 0L, 20L, 1, TestUtils.random(20))); - logCache.put(new StreamRecordBatch(233L, 0L, 21L, 1, TestUtils.random(20))); + logCache.put(new StreamRecordBatch(233L, 0L, 20L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0)); + logCache.put(new StreamRecordBatch(233L, 0L, 21L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0)); List records = logCache.get(233L, 10L, 21L, 1000); assertEquals(1, records.size()); @@ -71,9 +75,9 @@ public void testPutGet() { @Test public void testOffsetIndex() { LogCache cache = new LogCache(Integer.MAX_VALUE, Integer.MAX_VALUE); - + long walOffset = 0L; for (int i = 0; i < 100000; i++) { - cache.put(new StreamRecordBatch(233L, 0L, i, 1, TestUtils.random(1))); + cache.put(new StreamRecordBatch(233L, 0L, i, 1, TestUtils.random(1)), DefaultRecordOffset.of(0, walOffset++, 0)); } long start = System.nanoTime(); @@ -89,14 +93,15 @@ public void testOffsetIndex() { @Test public void testClearStreamRecords() { LogCache logCache = new LogCache(1024 * 1024, 1024 * 1024); + long walOffset = 0L; - logCache.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20))); - logCache.put(new StreamRecordBatch(233L, 0L, 11L, 2, TestUtils.random(20))); + logCache.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0)); + logCache.put(new StreamRecordBatch(233L, 0L, 11L, 2, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0)); logCache.archiveCurrentBlock(); - logCache.put(new StreamRecordBatch(233L, 0L, 13L, 2, TestUtils.random(20))); + logCache.put(new StreamRecordBatch(233L, 0L, 13L, 2, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0)); - logCache.put(new StreamRecordBatch(234L, 0L, 13L, 2, TestUtils.random(20))); + logCache.put(new StreamRecordBatch(234L, 0L, 13L, 2, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0)); assertTrue(logCache.blocks.get(0).containsStream(233L)); assertTrue(logCache.blocks.get(1).containsStream(234L)); @@ -112,19 +117,21 @@ public void testClearStreamRecords() { @Test public void testIsDiscontinuous() { LogCacheBlock left = new LogCacheBlock(1024L * 1024); - left.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20))); + long walOffset = 0L; + left.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0)); LogCacheBlock right = new LogCacheBlock(1024L * 1024); - right.put(new StreamRecordBatch(233L, 0L, 13L, 1, TestUtils.random(20))); + right.put(new StreamRecordBatch(233L, 0L, 13L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0)); assertTrue(LogCache.isDiscontinuous(left, right)); left = new LogCacheBlock(1024L * 1024); - left.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20))); - left.put(new StreamRecordBatch(234L, 0L, 10L, 1, TestUtils.random(20))); + walOffset = 0L; + left.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0)); + left.put(new StreamRecordBatch(234L, 0L, 10L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0)); right = new LogCacheBlock(1024L * 1024); - right.put(new StreamRecordBatch(233L, 0L, 11L, 1, TestUtils.random(20))); + right.put(new StreamRecordBatch(233L, 0L, 11L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0)); assertFalse(LogCache.isDiscontinuous(left, right)); } @@ -132,13 +139,14 @@ public void testIsDiscontinuous() { public void testMergeBlock() { long size = 0; LogCacheBlock left = new LogCacheBlock(1024L * 1024); - left.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20))); - left.put(new StreamRecordBatch(234L, 0L, 100L, 1, TestUtils.random(20))); + long walOffset = 0L; + left.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0)); + left.put(new StreamRecordBatch(234L, 0L, 100L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0)); size += left.size(); LogCacheBlock right = new LogCacheBlock(1024L * 1024); - right.put(new StreamRecordBatch(233L, 0L, 11L, 1, TestUtils.random(20))); - right.put(new StreamRecordBatch(235L, 0L, 200L, 1, TestUtils.random(20))); + right.put(new StreamRecordBatch(233L, 0L, 11L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0)); + right.put(new StreamRecordBatch(235L, 0L, 200L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0)); size += right.size(); LogCache.mergeBlock(left, right); @@ -165,4 +173,25 @@ public void testMergeBlock() { } + @Test + public void testTruncateStreamRecords() { + LogCache cache = new LogCache(1024 * 1024, 1024 * 1024); + long walOffset = 0L; + + cache.put(new StreamRecordBatch(233L, 0L, 10L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0)); + cache.put(new StreamRecordBatch(233L, 0L, 11L, 1, TestUtils.random(20)), DefaultRecordOffset.of(0, walOffset++, 0)); + + Optional result = cache.truncateStreamRecords(233L, 12L); + assertTrue(result.isPresent()); + LogCache.TruncateResult truncateResult = result.get(); + assertTrue(truncateResult.freedBytes > 0); + assertNotNull(truncateResult.firstRemovedWalOffset); + assertEquals(1L, DefaultRecordOffset.of(truncateResult.firstRemovedWalOffset).offset()); + + List remaining = cache.get(233L, 12L, 14L, 1000); + assertEquals(0, remaining.size()); + + assertTrue(cache.truncateStreamRecords(233L, 12L).isEmpty()); + } + }