From 297932209dc740dfd871e1754b7cca36b7ab623b Mon Sep 17 00:00:00 2001 From: Harsh Sawarkar Date: Tue, 8 Apr 2025 19:55:01 +0530 Subject: [PATCH] Added error handling mech Signed-off-by: Harsh Sawarkar --- ...imulatorMappedConfigSourceInitializer.java | 1 + .../config/data/BlockStreamConfig.java | 13 ++- .../simulator/config/data/GrpcConfig.java | 3 +- .../simulator/config/types/RecoveryMode.java | 8 ++ .../BlockAsDirBlockStreamManager.java | 13 +++ .../BlockAsFileBlockStreamManager.java | 13 +++ .../generator/BlockAsFileLargeDataSets.java | 32 +++++++ .../generator/BlockStreamManager.java | 8 +- .../generator/CraftBlockStreamManager.java | 17 +++- .../grpc/PublishStreamGrpcClient.java | 2 + .../impl/PublishStreamGrpcClientImpl.java | 85 +++++++++++++++++-- .../grpc/impl/PublishStreamObserver.java | 6 +- .../config/data/BlockStreamConfigTest.java | 18 ++++ .../BlockAsDirBlockStreamManagerTest.java | 41 +++++++++ .../BlockAsFileBlockStreamManagerTest.java | 32 +++++++ .../BlockAsFileLargeDataSetsTest.java | 28 ++++++ .../CraftBlockStreamManagerTest.java | 19 +++++ .../impl/PublishStreamGrpcClientImplTest.java | 68 ++++++++++++++- .../grpc/impl/PublishStreamObserverTest.java | 29 +++++-- 19 files changed, 416 insertions(+), 20 deletions(-) create mode 100644 simulator/src/main/java/org/hiero/block/simulator/config/types/RecoveryMode.java diff --git a/simulator/src/main/java/org/hiero/block/simulator/config/SimulatorMappedConfigSourceInitializer.java b/simulator/src/main/java/org/hiero/block/simulator/config/SimulatorMappedConfigSourceInitializer.java index 4fe43ff72..91ebd27af 100644 --- a/simulator/src/main/java/org/hiero/block/simulator/config/SimulatorMappedConfigSourceInitializer.java +++ b/simulator/src/main/java/org/hiero/block/simulator/config/SimulatorMappedConfigSourceInitializer.java @@ -24,6 +24,7 @@ public final class SimulatorMappedConfigSourceInitializer { new ConfigMapping("blockStream.streamingMode", "BLOCK_STREAM_STREAMING_MODE"), new ConfigMapping("blockStream.millisecondsPerBlock", "BLOCK_STREAM_MILLISECONDS_PER_BLOCK"), new ConfigMapping("blockStream.blockItemsBatchSize", "BLOCK_STREAM_BLOCK_ITEMS_BATCH_SIZE"), + new ConfigMapping("blockStream.recoveryMode", "RECOVERY_MODE"), // Block consumer configuration new ConfigMapping("consumer.startBlockNumber", "CONSUMER_START_BLOCK_NUMBER"), diff --git a/simulator/src/main/java/org/hiero/block/simulator/config/data/BlockStreamConfig.java b/simulator/src/main/java/org/hiero/block/simulator/config/data/BlockStreamConfig.java index 5d6296054..10eb0cf96 100644 --- a/simulator/src/main/java/org/hiero/block/simulator/config/data/BlockStreamConfig.java +++ b/simulator/src/main/java/org/hiero/block/simulator/config/data/BlockStreamConfig.java @@ -4,6 +4,7 @@ import com.swirlds.config.api.ConfigData; import com.swirlds.config.api.ConfigProperty; import org.hiero.block.simulator.config.logging.Loggable; +import org.hiero.block.simulator.config.types.RecoveryMode; import org.hiero.block.simulator.config.types.SimulatorMode; import org.hiero.block.simulator.config.types.StreamingMode; @@ -26,7 +27,8 @@ public record BlockStreamConfig( @Loggable @ConfigProperty(defaultValue = "100_000") int maxBlockItemsToStream, @Loggable @ConfigProperty(defaultValue = "MILLIS_PER_BLOCK") StreamingMode streamingMode, @Loggable @ConfigProperty(defaultValue = "1000") int millisecondsPerBlock, - @Loggable @ConfigProperty(defaultValue = "1000") int blockItemsBatchSize) { + @Loggable @ConfigProperty(defaultValue = "1000") int blockItemsBatchSize, + @Loggable @ConfigProperty(defaultValue = "RESEND_LAST") RecoveryMode recoveryMode) { /** * Creates a new {@link Builder} instance for constructing a {@code BlockStreamConfig}. @@ -42,6 +44,7 @@ public static Builder builder() { */ public static class Builder { private SimulatorMode simulatorMode = SimulatorMode.PUBLISHER_CLIENT; + private RecoveryMode recoveryMode = RecoveryMode.RESEND_LAST; private int lastKnownStatusesCapacity = 10; private int delayBetweenBlockItems = 1_500_000; private int maxBlockItemsToStream = 10_000; @@ -67,6 +70,11 @@ public Builder simulatorMode(SimulatorMode simulatorMode) { return this; } + public Builder recoveryMode(RecoveryMode recoveryMode) { + this.recoveryMode = recoveryMode; + return this; + } + /** * Sets the capacity of the last known statuses. * @@ -146,7 +154,8 @@ public BlockStreamConfig build() { maxBlockItemsToStream, streamingMode, millisecondsPerBlock, - blockItemsBatchSize); + blockItemsBatchSize, + recoveryMode); } } } diff --git a/simulator/src/main/java/org/hiero/block/simulator/config/data/GrpcConfig.java b/simulator/src/main/java/org/hiero/block/simulator/config/data/GrpcConfig.java index 7b02297cd..1cb10ad9d 100644 --- a/simulator/src/main/java/org/hiero/block/simulator/config/data/GrpcConfig.java +++ b/simulator/src/main/java/org/hiero/block/simulator/config/data/GrpcConfig.java @@ -16,4 +16,5 @@ @ConfigData("grpc") public record GrpcConfig( @Loggable @ConfigProperty(defaultValue = "localhost") String serverAddress, - @Loggable @ConfigProperty(defaultValue = "8080") @Min(0) @Max(65535) int port) {} + @Loggable @ConfigProperty(defaultValue = "8080") @Min(0) @Max(65535) int port, + @Loggable @ConfigProperty(defaultValue = "0") @Min(0) long rollbackDistance) {} diff --git a/simulator/src/main/java/org/hiero/block/simulator/config/types/RecoveryMode.java b/simulator/src/main/java/org/hiero/block/simulator/config/types/RecoveryMode.java new file mode 100644 index 000000000..b3ffb5b19 --- /dev/null +++ b/simulator/src/main/java/org/hiero/block/simulator/config/types/RecoveryMode.java @@ -0,0 +1,8 @@ +// SPDX-License-Identifier: Apache-2.0 +package org.hiero.block.simulator.config.types; + +public enum RecoveryMode { + RESEND_LAST, + ROLLBACK, + SKIP_AHEAD +} diff --git a/simulator/src/main/java/org/hiero/block/simulator/generator/BlockAsDirBlockStreamManager.java b/simulator/src/main/java/org/hiero/block/simulator/generator/BlockAsDirBlockStreamManager.java index 6c7268b09..12eca0073 100644 --- a/simulator/src/main/java/org/hiero/block/simulator/generator/BlockAsDirBlockStreamManager.java +++ b/simulator/src/main/java/org/hiero/block/simulator/generator/BlockAsDirBlockStreamManager.java @@ -22,6 +22,7 @@ import org.hiero.block.common.utils.FileUtilities; import org.hiero.block.simulator.config.data.BlockGeneratorConfig; import org.hiero.block.simulator.config.types.GenerationMode; +import org.hiero.block.simulator.exception.BlockSimulatorParsingException; /** * The BlockAsDirBlockStreamManager class implements the BlockStreamManager interface to manage the @@ -99,6 +100,18 @@ public Block getNextBlock() { return nextBlock; } + @Override + public Block getLastBlock() throws IOException, BlockSimulatorParsingException { + int lastBlockNumberIndex = Math.max(0, currentBlockIndex - 1); + return blocks.get(lastBlockNumberIndex); + } + + @Override + public Block getBlockByNumber(long blockNumber) throws IOException, BlockSimulatorParsingException { + int index = (int) Math.max(0, Math.min(blockNumber, blocks.size() - 1)); + return blocks.get(index); + } + private void loadBlocks() throws IOException, ParseException { final Path rootPath = Path.of(rootFolder); diff --git a/simulator/src/main/java/org/hiero/block/simulator/generator/BlockAsFileBlockStreamManager.java b/simulator/src/main/java/org/hiero/block/simulator/generator/BlockAsFileBlockStreamManager.java index 4b3c84335..2960a8d21 100644 --- a/simulator/src/main/java/org/hiero/block/simulator/generator/BlockAsFileBlockStreamManager.java +++ b/simulator/src/main/java/org/hiero/block/simulator/generator/BlockAsFileBlockStreamManager.java @@ -22,6 +22,7 @@ import org.hiero.block.common.utils.FileUtilities; import org.hiero.block.simulator.config.data.BlockGeneratorConfig; import org.hiero.block.simulator.config.types.GenerationMode; +import org.hiero.block.simulator.exception.BlockSimulatorParsingException; /** The block as file block stream manager. */ public class BlockAsFileBlockStreamManager implements BlockStreamManager { @@ -92,6 +93,18 @@ public Block getNextBlock() { return nextBlock; } + @Override + public Block getLastBlock() throws IOException, BlockSimulatorParsingException { + int lastBlockNumberIndex = Math.max(0, currentBlockIndex - 1); + return blocks.get(lastBlockNumberIndex); + } + + @Override + public Block getBlockByNumber(long blockNumber) throws IOException, BlockSimulatorParsingException { + int index = (int) Math.max(0, Math.min(blockNumber, blocks.size() - 1)); + return blocks.get(index); + } + private void loadBlocks() throws IOException, ParseException { final Path rootPath = Path.of(rootFolder); diff --git a/simulator/src/main/java/org/hiero/block/simulator/generator/BlockAsFileLargeDataSets.java b/simulator/src/main/java/org/hiero/block/simulator/generator/BlockAsFileLargeDataSets.java index 5928c23ef..fb26162fe 100644 --- a/simulator/src/main/java/org/hiero/block/simulator/generator/BlockAsFileLargeDataSets.java +++ b/simulator/src/main/java/org/hiero/block/simulator/generator/BlockAsFileLargeDataSets.java @@ -102,4 +102,36 @@ public Block getNextBlock() throws IOException, BlockSimulatorParsingException { return block; } + + @Override + public Block getLastBlock() throws IOException, BlockSimulatorParsingException { + if (currentBlock == null) { + throw new IllegalStateException("No block has been loaded yet."); + } + return currentBlock; + } + + @Override + public Block getBlockByNumber(long blockNumber) throws IOException, BlockSimulatorParsingException { + if (blockNumber <= 0) { + throw new IllegalArgumentException("Block number must be greater than 0"); + } + + final String blockFileName = String.format(formatString, blockNumber); + final Path blockPath = Path.of(blockStreamPath).resolve(blockFileName); + + if (!Files.exists(blockPath)) { + throw new IOException("Block file does not exist: " + blockPath); + } + + final byte[] blockBytes = FileUtilities.readFileBytesUnsafe(blockPath, RECORD_EXTENSION, GZ_EXTENSION); + + if (blockBytes == null) { + throw new BlockSimulatorParsingException( + "Failed to read block file: " + blockPath + " with supported extensions"); + } + + LOGGER.log(INFO, "Loading block by number: " + blockNumber + " (" + blockPath.getFileName() + ")"); + return Block.parseFrom(blockBytes); + } } diff --git a/simulator/src/main/java/org/hiero/block/simulator/generator/BlockStreamManager.java b/simulator/src/main/java/org/hiero/block/simulator/generator/BlockStreamManager.java index 4cad671ea..32f25e875 100644 --- a/simulator/src/main/java/org/hiero/block/simulator/generator/BlockStreamManager.java +++ b/simulator/src/main/java/org/hiero/block/simulator/generator/BlockStreamManager.java @@ -26,7 +26,7 @@ default void init() {} * Get the next block item. * * @return the next block item - * @throws IOException if a I/O error occurs + * @throws IOException if a I/O error occurs * @throws BlockSimulatorParsingException if a parse error occurs */ BlockItem getNextBlockItem() throws IOException, BlockSimulatorParsingException; @@ -35,8 +35,12 @@ default void init() {} * Get the next block. * * @return the next block - * @throws IOException if a I/O error occurs + * @throws IOException if a I/O error occurs * @throws BlockSimulatorParsingException if a parse error occurs */ Block getNextBlock() throws IOException, BlockSimulatorParsingException; + + Block getLastBlock() throws IOException, BlockSimulatorParsingException; + + Block getBlockByNumber(long blockNumber) throws IOException, BlockSimulatorParsingException; } diff --git a/simulator/src/main/java/org/hiero/block/simulator/generator/CraftBlockStreamManager.java b/simulator/src/main/java/org/hiero/block/simulator/generator/CraftBlockStreamManager.java index 364fc1cd9..b9f442674 100644 --- a/simulator/src/main/java/org/hiero/block/simulator/generator/CraftBlockStreamManager.java +++ b/simulator/src/main/java/org/hiero/block/simulator/generator/CraftBlockStreamManager.java @@ -54,6 +54,7 @@ public class CraftBlockStreamManager implements BlockStreamManager { private long currentBlockNumber; private StreamingTreeHasher inputTreeHasher; private StreamingTreeHasher outputTreeHasher; + private Block block; /** * Constructs a new CraftBlockStreamManager with the specified configuration. @@ -148,9 +149,23 @@ public Block getNextBlock() throws IOException, BlockSimulatorParsingException { ItemHandler proofItemHandler = new BlockProofHandler(previousBlockHash, currentBlockHash, currentBlockNumber); items.add(proofItemHandler); resetState(); - return Block.newBuilder() + block = Block.newBuilder() .addAllItems(items.stream().map(ItemHandler::getItem).toList()) .build(); + return block; + } + + @Override + public Block getLastBlock() throws IOException, BlockSimulatorParsingException { + if (block == null) { + throw new IllegalStateException("No block has been generated yet."); + } + return block; + } + + @Override + public Block getBlockByNumber(long blockNumber) throws IOException, BlockSimulatorParsingException { + throw new UnsupportedOperationException("Craft mode does not support fetching specific blocks by number."); } private void updateCurrentBlockHash() { diff --git a/simulator/src/main/java/org/hiero/block/simulator/grpc/PublishStreamGrpcClient.java b/simulator/src/main/java/org/hiero/block/simulator/grpc/PublishStreamGrpcClient.java index 49db5d526..36aac5237 100644 --- a/simulator/src/main/java/org/hiero/block/simulator/grpc/PublishStreamGrpcClient.java +++ b/simulator/src/main/java/org/hiero/block/simulator/grpc/PublishStreamGrpcClient.java @@ -51,6 +51,8 @@ public interface PublishStreamGrpcClient { */ List getLastKnownStatuses(); + void recoverStream(); + /** * Shutdowns the channel. * diff --git a/simulator/src/main/java/org/hiero/block/simulator/grpc/impl/PublishStreamGrpcClientImpl.java b/simulator/src/main/java/org/hiero/block/simulator/grpc/impl/PublishStreamGrpcClientImpl.java index b6aed9ea2..f9450432b 100644 --- a/simulator/src/main/java/org/hiero/block/simulator/grpc/impl/PublishStreamGrpcClientImpl.java +++ b/simulator/src/main/java/org/hiero/block/simulator/grpc/impl/PublishStreamGrpcClientImpl.java @@ -17,6 +17,7 @@ import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; +import java.io.IOException; import java.util.ArrayDeque; import java.util.Deque; import java.util.List; @@ -25,6 +26,8 @@ import org.hiero.block.common.utils.ChunkUtils; import org.hiero.block.simulator.config.data.BlockStreamConfig; import org.hiero.block.simulator.config.data.GrpcConfig; +import org.hiero.block.simulator.exception.BlockSimulatorParsingException; +import org.hiero.block.simulator.generator.BlockStreamManager; import org.hiero.block.simulator.grpc.PublishStreamGrpcClient; import org.hiero.block.simulator.metrics.MetricsService; import org.hiero.block.simulator.startup.SimulatorStartupData; @@ -47,12 +50,14 @@ public class PublishStreamGrpcClientImpl implements PublishStreamGrpcClient { // gRPC components private ManagedChannel channel; private StreamObserver requestStreamObserver; + private BlockStreamServiceGrpc.BlockStreamServiceStub stub; // State private final AtomicBoolean streamEnabled; private final int lastKnownStatusesCapacity; private final Deque lastKnownStatuses; private final SimulatorStartupData startupData; + private final BlockStreamManager blockStreamManager; /** * Creates a new PublishStreamGrpcClientImpl with the specified dependencies. @@ -70,12 +75,14 @@ public PublishStreamGrpcClientImpl( @NonNull final BlockStreamConfig blockStreamConfig, @NonNull final MetricsService metricsService, @NonNull final AtomicBoolean streamEnabled, - @NonNull final SimulatorStartupData startupData) { + @NonNull final SimulatorStartupData startupData, + @NonNull final BlockStreamManager blockStreamManager) { this.grpcConfig = requireNonNull(grpcConfig); this.blockStreamConfig = requireNonNull(blockStreamConfig); this.metricsService = requireNonNull(metricsService); this.streamEnabled = requireNonNull(streamEnabled); this.lastKnownStatusesCapacity = blockStreamConfig.lastKnownStatusesCapacity(); + this.blockStreamManager = blockStreamManager; this.lastKnownStatuses = new ArrayDeque<>(this.lastKnownStatusesCapacity); this.startupData = requireNonNull(startupData); } @@ -88,13 +95,81 @@ public void init() { channel = ManagedChannelBuilder.forAddress(grpcConfig.serverAddress(), grpcConfig.port()) .usePlaintext() .build(); - final BlockStreamServiceGrpc.BlockStreamServiceStub stub = BlockStreamServiceGrpc.newStub(channel); - final PublishStreamObserver publishStreamObserver = - new PublishStreamObserver(startupData, streamEnabled, lastKnownStatuses, lastKnownStatusesCapacity); - requestStreamObserver = stub.publishBlockStream(publishStreamObserver); + stub = BlockStreamServiceGrpc.newStub(channel); + createNewStreamObserver(); lastKnownStatuses.clear(); } + private void createNewStreamObserver() { + final PublishStreamObserver publishStreamObserver = new PublishStreamObserver( + startupData, streamEnabled, lastKnownStatuses, lastKnownStatusesCapacity, this); + requestStreamObserver = stub.publishBlockStream(publishStreamObserver); + } + + @Override + public void recoverStream() { + int maxRetries = 5; + long baseBackoff = 2000; + + for (int attempt = 0; attempt < maxRetries; attempt++) { + try { + long waitMillis = Math.min(60000, (long) Math.pow(2, attempt) * baseBackoff); + LOGGER.log( + INFO, "Attempting to recover stream (attempt " + (attempt + 1) + ") in " + waitMillis + " ms"); + Thread.sleep(waitMillis); + + resetBlockPointer(); + createNewStreamObserver(); + streamEnabled.set(true); + LOGGER.log(INFO, "Stream recovered successfully on attempt " + (attempt + 1)); + return; + } catch (Exception e) { + LOGGER.log(ERROR, "Stream recovery failed on attempt " + (attempt + 1), e); + } + } + + LOGGER.log(ERROR, "Max retries reached. Stream could not be recovered."); + streamEnabled.set(false); + } + + private void resetBlockPointer() throws BlockSimulatorParsingException, IOException { + long lastAckedBlock = startupData.getLatestAckBlockNumber(); + LOGGER.log(INFO, "Resetting block pointer. Last acknowledged block: " + lastAckedBlock); + + switch (blockStreamConfig.recoveryMode()) { + case RESEND_LAST -> { + LOGGER.log(INFO, "Recovery strategy: RESEND_LAST"); + Block block = blockStreamManager.getLastBlock(); + if (block != null) { + streamBlock(block); + } else { + LOGGER.log(INFO, "Block #" + lastAckedBlock + " not found in stream. Skipping resend."); + } + } + + case ROLLBACK -> { + LOGGER.log(INFO, "Recovery strategy: ROLLBACK"); + long rollbackBlockNumber = Math.max(0, lastAckedBlock - grpcConfig.rollbackDistance()); + LOGGER.log(INFO, "Rolling back to block #" + rollbackBlockNumber); + + Block block = blockStreamManager.getBlockByNumber(rollbackBlockNumber); + while (block != null && rollbackBlockNumber <= lastAckedBlock) { + streamBlock(block); + rollbackBlockNumber++; + block = blockStreamManager.getBlockByNumber(rollbackBlockNumber); + } + } + + case SKIP_AHEAD -> { + LOGGER.log(INFO, "Recovery strategy: SKIP_AHEAD"); + Block block = blockStreamManager.getNextBlock(); + if (block != null) { + streamBlock(block); + } + } + } + } + /** * Streams a list of block items to the server. * diff --git a/simulator/src/main/java/org/hiero/block/simulator/grpc/impl/PublishStreamObserver.java b/simulator/src/main/java/org/hiero/block/simulator/grpc/impl/PublishStreamObserver.java index 3bb43f420..3e197dd58 100644 --- a/simulator/src/main/java/org/hiero/block/simulator/grpc/impl/PublishStreamObserver.java +++ b/simulator/src/main/java/org/hiero/block/simulator/grpc/impl/PublishStreamObserver.java @@ -29,6 +29,7 @@ public class PublishStreamObserver implements StreamObserver lastKnownStatuses; private final SimulatorStartupData startupData; + private final PublishStreamGrpcClientImpl grpcClient; /** * Creates a new PublishStreamObserver instance. @@ -43,11 +44,13 @@ public PublishStreamObserver( @NonNull final SimulatorStartupData startupData, @NonNull final AtomicBoolean streamEnabled, @NonNull final Deque lastKnownStatuses, - final int lastKnownStatusesCapacity) { + final int lastKnownStatusesCapacity, + @NonNull final PublishStreamGrpcClientImpl grpcClient) { this.streamEnabled = requireNonNull(streamEnabled); this.lastKnownStatuses = requireNonNull(lastKnownStatuses); this.lastKnownStatusesCapacity = lastKnownStatusesCapacity; this.startupData = requireNonNull(startupData); + this.grpcClient = requireNonNull(grpcClient); } /** @@ -90,6 +93,7 @@ public void onError(@NonNull final Throwable streamError) { Status status = Status.fromThrowable(streamError); lastKnownStatuses.add(status.toString()); LOGGER.log(ERROR, "Error %s with status %s.".formatted(streamError, status), streamError); + grpcClient.recoverStream(); } /** diff --git a/simulator/src/test/java/org/hiero/block/simulator/config/data/BlockStreamConfigTest.java b/simulator/src/test/java/org/hiero/block/simulator/config/data/BlockStreamConfigTest.java index ee9954c15..aaeed948e 100644 --- a/simulator/src/test/java/org/hiero/block/simulator/config/data/BlockStreamConfigTest.java +++ b/simulator/src/test/java/org/hiero/block/simulator/config/data/BlockStreamConfigTest.java @@ -103,6 +103,24 @@ void testValidAbsolutePath() { assertEquals(GenerationMode.DIR, config.generationMode()); } + @Test + void testRecoveryModeVariants() { + BlockStreamConfig rollbackConfig = getBlockStreamConfigBuilder() + .recoveryMode(org.hiero.block.simulator.config.types.RecoveryMode.ROLLBACK) + .build(); + assertEquals(org.hiero.block.simulator.config.types.RecoveryMode.ROLLBACK, rollbackConfig.recoveryMode()); + + BlockStreamConfig resendLastConfig = getBlockStreamConfigBuilder() + .recoveryMode(org.hiero.block.simulator.config.types.RecoveryMode.RESEND_LAST) + .build(); + assertEquals(org.hiero.block.simulator.config.types.RecoveryMode.RESEND_LAST, resendLastConfig.recoveryMode()); + + BlockStreamConfig skipAheadConfig = getBlockStreamConfigBuilder() + .recoveryMode(org.hiero.block.simulator.config.types.RecoveryMode.SKIP_AHEAD) + .build(); + assertEquals(org.hiero.block.simulator.config.types.RecoveryMode.SKIP_AHEAD, skipAheadConfig.recoveryMode()); + } + @Test void testEmptyFolderRootPath() { // Setup empty folder root path and generation mode diff --git a/simulator/src/test/java/org/hiero/block/simulator/generator/BlockAsDirBlockStreamManagerTest.java b/simulator/src/test/java/org/hiero/block/simulator/generator/BlockAsDirBlockStreamManagerTest.java index 65e2213e5..bbfb65690 100644 --- a/simulator/src/test/java/org/hiero/block/simulator/generator/BlockAsDirBlockStreamManagerTest.java +++ b/simulator/src/test/java/org/hiero/block/simulator/generator/BlockAsDirBlockStreamManagerTest.java @@ -5,6 +5,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import com.hedera.hapi.block.stream.protoc.Block; import java.io.IOException; import java.nio.file.Paths; import org.hiero.block.simulator.config.data.BlockGeneratorConfig; @@ -48,6 +49,46 @@ void getNextBlock() throws IOException, BlockSimulatorParsingException { } } + @Test + void getLastBlock_ReturnsPreviousBlock() throws IOException, BlockSimulatorParsingException { + BlockAsDirBlockStreamManager blockStreamManager = + (BlockAsDirBlockStreamManager) getBlockAsDirBlockStreamManager(getAbsoluteFolder(rootFolder)); + blockStreamManager.init(); + + // Call getNextBlock twice + Block firstBlock = blockStreamManager.getNextBlock(); // index 0 + Block secondBlock = blockStreamManager.getNextBlock(); // index 1 + + Block lastBlock = blockStreamManager.getLastBlock(); + + // Should return same as secondBlock + assertEquals(secondBlock, lastBlock, "Expected getLastBlock to return the most recently given block"); + } + + @Test + void getBlockByNumber_ReturnsCorrectBlockAndHandlesBounds() throws IOException, BlockSimulatorParsingException { + BlockAsDirBlockStreamManager blockStreamManager = + (BlockAsDirBlockStreamManager) getBlockAsDirBlockStreamManager(getAbsoluteFolder(rootFolder)); + blockStreamManager.init(); + + // Should return first block + Block block0 = blockStreamManager.getBlockByNumber(0); + assertNotNull(block0); + + // Should return second block (if it exists) + Block block1 = blockStreamManager.getBlockByNumber(1); + assertNotNull(block1); + + // Should return last block if index is out of bounds + Block blockOutOfBounds = blockStreamManager.getBlockByNumber(9999); + Block lastBlockInList = blockStreamManager.getBlockByNumber(blockStreamManager.blocks.size() - 1); + assertEquals(lastBlockInList, blockOutOfBounds, "Expected out-of-bounds access to return last block"); + + // Negative number should return first block + Block negativeIndexBlock = blockStreamManager.getBlockByNumber(-5); + assertEquals(block0, negativeIndexBlock, "Expected negative index to return first block"); + } + @Test void BlockAsFileBlockStreamManagerInvalidRootPath() { assertThrows( diff --git a/simulator/src/test/java/org/hiero/block/simulator/generator/BlockAsFileBlockStreamManagerTest.java b/simulator/src/test/java/org/hiero/block/simulator/generator/BlockAsFileBlockStreamManagerTest.java index 33731e56c..0f6445452 100644 --- a/simulator/src/test/java/org/hiero/block/simulator/generator/BlockAsFileBlockStreamManagerTest.java +++ b/simulator/src/test/java/org/hiero/block/simulator/generator/BlockAsFileBlockStreamManagerTest.java @@ -3,6 +3,7 @@ import static org.junit.jupiter.api.Assertions.*; +import com.hedera.hapi.block.stream.protoc.Block; import java.io.IOException; import java.nio.file.Paths; import org.hiero.block.simulator.config.data.BlockGeneratorConfig; @@ -45,6 +46,37 @@ void getNextBlockItem() throws IOException, BlockSimulatorParsingException { } } + @Test + void getLastBlock_ReturnsMostRecentBlock() throws IOException, BlockSimulatorParsingException { + BlockAsFileBlockStreamManager fileManager = (BlockAsFileBlockStreamManager) blockStreamManager; + + // Move through a few blocks + fileManager.getNextBlock(); // block 0 + Block block1 = fileManager.getNextBlock(); // block 1 + + Block lastBlock = fileManager.getLastBlock(); + + assertEquals(block1, lastBlock, "Expected getLastBlock to return the most recently given block"); + } + + @Test + void getBlockByNumber_ReturnsCorrectBlock() throws IOException, BlockSimulatorParsingException { + BlockAsFileBlockStreamManager fileManager = (BlockAsFileBlockStreamManager) blockStreamManager; + + Block block0 = fileManager.getBlockByNumber(0); + assertNotNull(block0, "Expected block at index 0"); + + Block block1 = fileManager.getBlockByNumber(1); + assertNotNull(block1, "Expected block at index 1"); + + Block last = fileManager.getBlockByNumber(9999); // Should clamp to last + Block expectedLast = fileManager.getBlockByNumber(fileManager.blocks.size() - 1); + assertEquals(expectedLast, last, "Expected out-of-bounds access to return last block"); + + Block fromNegative = fileManager.getBlockByNumber(-5); // Should clamp to first + assertEquals(block0, fromNegative, "Expected negative index to return first block"); + } + @Test void loadBlockBlk() throws IOException, BlockSimulatorParsingException { String blkRootFolder = "build/resources/test//block-0.0.3-blk/"; diff --git a/simulator/src/test/java/org/hiero/block/simulator/generator/BlockAsFileLargeDataSetsTest.java b/simulator/src/test/java/org/hiero/block/simulator/generator/BlockAsFileLargeDataSetsTest.java index 377c26735..a5abd407f 100644 --- a/simulator/src/test/java/org/hiero/block/simulator/generator/BlockAsFileLargeDataSetsTest.java +++ b/simulator/src/test/java/org/hiero/block/simulator/generator/BlockAsFileLargeDataSetsTest.java @@ -6,6 +6,7 @@ import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import com.hedera.hapi.block.stream.protoc.Block; import com.hedera.hapi.block.stream.protoc.BlockItem; import java.io.File; import java.io.IOException; @@ -93,6 +94,33 @@ void getNextBlockItem() throws IOException, BlockSimulatorParsingException { } } + @Test + void getBlockByNumberReturnsBlockIfExists() throws IOException, BlockSimulatorParsingException { + BlockAsFileLargeDataSets blockStreamManager = + getBlockAsFileLargeDatasetsBlockStreamManager(getAbsoluteFolder(rootFolder)); + + Block block = blockStreamManager.getBlockByNumber(1); + assertNotNull(block); + assertNotNull(block.getItemsList()); // optional: sanity check on block content + } + + @Test + void getBlockByNumberThrowsIfBlockDoesNotExist() throws IOException { + BlockAsFileLargeDataSets blockStreamManager = + getBlockAsFileLargeDatasetsBlockStreamManager(getAbsoluteFolder(rootFolder)); + + long nonExistentBlock = 999_999L; + assertThrows(IOException.class, () -> blockStreamManager.getBlockByNumber(nonExistentBlock)); + } + + @Test + void getBlockByNumberThrowsForInvalidBlockNumber() throws IOException { + BlockAsFileLargeDataSets blockStreamManager = + getBlockAsFileLargeDatasetsBlockStreamManager(getAbsoluteFolder(rootFolder)); + + assertThrows(IllegalArgumentException.class, () -> blockStreamManager.getBlockByNumber(0)); + } + @Test void gettingNextBlockItemThrowsParsingException(@TempDir Path tempDir) throws IOException { String blockFolderName = "block-0.0.3-blk"; diff --git a/simulator/src/test/java/org/hiero/block/simulator/generator/CraftBlockStreamManagerTest.java b/simulator/src/test/java/org/hiero/block/simulator/generator/CraftBlockStreamManagerTest.java index b26d18e4b..99e616c39 100644 --- a/simulator/src/test/java/org/hiero/block/simulator/generator/CraftBlockStreamManagerTest.java +++ b/simulator/src/test/java/org/hiero/block/simulator/generator/CraftBlockStreamManagerTest.java @@ -70,6 +70,25 @@ void testMultipleBlockGeneration() throws IOException, BlockSimulatorParsingExce assertNotEquals(0, block2.getItemsCount()); } + @Test + void testGetLastBlockReturnsLastGeneratedBlock() throws IOException, BlockSimulatorParsingException { + final Block generatedBlock = manager.getNextBlock(); + final Block lastBlock = manager.getLastBlock(); + + assertNotNull(lastBlock); + assertEquals(generatedBlock, lastBlock); + } + + @Test + void testGetLastBlockThrowsIfNoBlockGenerated() { + assertThrows(IllegalStateException.class, () -> manager.getLastBlock()); + } + + @Test + void testGetBlockByNumberThrowsUnsupportedOperation() { + assertThrows(UnsupportedOperationException.class, () -> manager.getBlockByNumber(1)); + } + @Test void testBlockGenerationWithCustomValues() throws IOException, BlockSimulatorParsingException { Mockito.when(generatorConfigMock.minEventsPerBlock()).thenReturn(3); diff --git a/simulator/src/test/java/org/hiero/block/simulator/grpc/impl/PublishStreamGrpcClientImplTest.java b/simulator/src/test/java/org/hiero/block/simulator/grpc/impl/PublishStreamGrpcClientImplTest.java index 7093b0ba0..87de186e3 100644 --- a/simulator/src/test/java/org/hiero/block/simulator/grpc/impl/PublishStreamGrpcClientImplTest.java +++ b/simulator/src/test/java/org/hiero/block/simulator/grpc/impl/PublishStreamGrpcClientImplTest.java @@ -27,6 +27,8 @@ import org.hiero.block.simulator.TestUtils; import org.hiero.block.simulator.config.data.BlockStreamConfig; import org.hiero.block.simulator.config.data.GrpcConfig; +import org.hiero.block.simulator.config.types.RecoveryMode; +import org.hiero.block.simulator.generator.BlockStreamManager; import org.hiero.block.simulator.grpc.PublishStreamGrpcClient; import org.hiero.block.simulator.metrics.MetricsService; import org.hiero.block.simulator.metrics.MetricsServiceImpl; @@ -48,6 +50,9 @@ class PublishStreamGrpcClientImplTest { @Mock private SimulatorStartupData startupDataMock; + @Mock + private BlockStreamManager blockStreamManagerMock; + private BlockStreamConfig blockStreamConfig; private AtomicBoolean streamEnabled; private Server server; @@ -116,7 +121,10 @@ public void onCompleted() { }) .build() .start(); - blockStreamConfig = BlockStreamConfig.builder().blockItemsBatchSize(2).build(); + blockStreamConfig = BlockStreamConfig.builder() + .recoveryMode(RecoveryMode.RESEND_LAST) + .blockItemsBatchSize(2) + .build(); Configuration config = TestUtils.getTestConfiguration(); metricsService = new MetricsServiceImpl(getTestMetrics(config)); @@ -126,7 +134,7 @@ public void onCompleted() { when(grpcConfig.port()).thenReturn(serverPort); publishStreamGrpcClient = new PublishStreamGrpcClientImpl( - grpcConfig, blockStreamConfig, metricsService, streamEnabled, startupDataMock); + grpcConfig, blockStreamConfig, metricsService, streamEnabled, startupDataMock, blockStreamManagerMock); } @AfterEach @@ -199,6 +207,62 @@ void testStreamBlock_Success() throws InterruptedException { streamedBlocks, publishStreamGrpcClient.getLastKnownStatuses().size()); } + @Test + void testRecoverStream_ResendLast() throws Exception { + when(startupDataMock.getLatestAckBlockNumber()).thenReturn(5L); + when(grpcConfig.rollbackDistance()).thenReturn(1L); + Block dummyBlock = constructBlock(5); + + when(blockStreamManagerMock.getLastBlock()).thenReturn(dummyBlock); + + publishStreamGrpcClient.init(); + publishStreamGrpcClient.recoverStream(); + + assertTrue(streamEnabled.get(), "Stream should be re-enabled after recovery with RESEND_LAST"); + assertEquals(1, publishStreamGrpcClient.getPublishedBlocks()); + } + + @Test + void testRecoverStream_Rollback() throws Exception { + when(startupDataMock.getLatestAckBlockNumber()).thenReturn(3L); + when(grpcConfig.rollbackDistance()).thenReturn(2L); + for (long i = 1; i <= 3; i++) { + when(blockStreamManagerMock.getBlockByNumber(i)).thenReturn(constructBlock(i)); + } + + publishStreamGrpcClient.init(); + publishStreamGrpcClient.recoverStream(); + + assertTrue(streamEnabled.get(), "Stream should be re-enabled after recovery with ROLLBACK"); + assertEquals(3, publishStreamGrpcClient.getPublishedBlocks()); + } + + @Test + void testRecoverStream_SkipAhead() throws Exception { + when(startupDataMock.getLatestAckBlockNumber()).thenReturn(10L); + Block nextBlock = constructBlock(11); + when(blockStreamManagerMock.getNextBlock()).thenReturn(nextBlock); + + publishStreamGrpcClient.init(); + publishStreamGrpcClient.recoverStream(); + + assertTrue(streamEnabled.get(), "Stream should be re-enabled after SKIP_AHEAD"); + assertEquals(1, publishStreamGrpcClient.getPublishedBlocks()); + } + + @Test + void testRecoverStreamFailsAfterMaxRetries() throws Exception { + when(startupDataMock.getLatestAckBlockNumber()).thenReturn(0L); + when(grpcConfig.rollbackDistance()).thenReturn(1L); + // force an exception during block access + when(blockStreamManagerMock.getLastBlock()).thenThrow(new IOException("Simulated failure")); + + publishStreamGrpcClient.init(); + publishStreamGrpcClient.recoverStream(); + + assertTrue(!streamEnabled.get(), "Stream should be disabled after max retries"); + } + @Test void testStreamBlock_RejectsAfterShutdown() throws InterruptedException { publishStreamGrpcClient.init(); diff --git a/simulator/src/test/java/org/hiero/block/simulator/grpc/impl/PublishStreamObserverTest.java b/simulator/src/test/java/org/hiero/block/simulator/grpc/impl/PublishStreamObserverTest.java index 37519db9b..fe4d741ba 100644 --- a/simulator/src/test/java/org/hiero/block/simulator/grpc/impl/PublishStreamObserverTest.java +++ b/simulator/src/test/java/org/hiero/block/simulator/grpc/impl/PublishStreamObserverTest.java @@ -4,6 +4,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.verify; import com.hedera.hapi.block.protoc.PublishStreamResponse; import java.util.ArrayDeque; @@ -19,14 +20,21 @@ class PublishStreamObserverTest { @Mock private SimulatorStartupData startupDataMock; + @Mock + private PublishStreamGrpcClientImpl publishStreamGrpcClientImpl; + @Test void onNext() { PublishStreamResponse response = PublishStreamResponse.newBuilder().build(); AtomicBoolean streamEnabled = new AtomicBoolean(true); ArrayDeque lastKnownStatuses = new ArrayDeque<>(); final int lastKnownStatusesCapacity = 10; - PublishStreamObserver publishStreamObserver = - new PublishStreamObserver(startupDataMock, streamEnabled, lastKnownStatuses, lastKnownStatusesCapacity); + PublishStreamObserver publishStreamObserver = new PublishStreamObserver( + startupDataMock, + streamEnabled, + lastKnownStatuses, + lastKnownStatusesCapacity, + publishStreamGrpcClientImpl); publishStreamObserver.onNext(response); assertTrue(streamEnabled.get(), "streamEnabled should remain true after onCompleted"); @@ -38,12 +46,17 @@ void onError() { AtomicBoolean streamEnabled = new AtomicBoolean(true); ArrayDeque lastKnownStatuses = new ArrayDeque<>(); final int lastKnownStatusesCapacity = 10; - PublishStreamObserver publishStreamObserver = - new PublishStreamObserver(startupDataMock, streamEnabled, lastKnownStatuses, lastKnownStatusesCapacity); + PublishStreamObserver publishStreamObserver = new PublishStreamObserver( + startupDataMock, + streamEnabled, + lastKnownStatuses, + lastKnownStatusesCapacity, + publishStreamGrpcClientImpl); publishStreamObserver.onError(new Throwable()); assertFalse(streamEnabled.get(), "streamEnabled should be set to false after onError"); assertEquals(1, lastKnownStatuses.size(), "lastKnownStatuses should have one element after onError"); + verify(publishStreamGrpcClientImpl).recoverStream(); } @Test @@ -51,8 +64,12 @@ void onCompleted() { AtomicBoolean streamEnabled = new AtomicBoolean(true); ArrayDeque lastKnownStatuses = new ArrayDeque<>(); final int lastKnownStatusesCapacity = 10; - PublishStreamObserver publishStreamObserver = - new PublishStreamObserver(startupDataMock, streamEnabled, lastKnownStatuses, lastKnownStatusesCapacity); + PublishStreamObserver publishStreamObserver = new PublishStreamObserver( + startupDataMock, + streamEnabled, + lastKnownStatuses, + lastKnownStatusesCapacity, + publishStreamGrpcClientImpl); publishStreamObserver.onCompleted(); assertTrue(streamEnabled.get(), "streamEnabled should remain true after onCompleted");