Skip to content

chore: added bn error handling in simulator for publish mode #986

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public final class SimulatorMappedConfigSourceInitializer {
new ConfigMapping("blockStream.streamingMode", "BLOCK_STREAM_STREAMING_MODE"),
new ConfigMapping("blockStream.millisecondsPerBlock", "BLOCK_STREAM_MILLISECONDS_PER_BLOCK"),
new ConfigMapping("blockStream.blockItemsBatchSize", "BLOCK_STREAM_BLOCK_ITEMS_BATCH_SIZE"),
new ConfigMapping("blockStream.recoveryMode", "RECOVERY_MODE"),

// Block consumer configuration
new ConfigMapping("consumer.startBlockNumber", "CONSUMER_START_BLOCK_NUMBER"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.swirlds.config.api.ConfigData;
import com.swirlds.config.api.ConfigProperty;
import org.hiero.block.simulator.config.logging.Loggable;
import org.hiero.block.simulator.config.types.RecoveryMode;
import org.hiero.block.simulator.config.types.SimulatorMode;
import org.hiero.block.simulator.config.types.StreamingMode;

Expand All @@ -26,7 +27,8 @@ public record BlockStreamConfig(
@Loggable @ConfigProperty(defaultValue = "100_000") int maxBlockItemsToStream,
@Loggable @ConfigProperty(defaultValue = "MILLIS_PER_BLOCK") StreamingMode streamingMode,
@Loggable @ConfigProperty(defaultValue = "1000") int millisecondsPerBlock,
@Loggable @ConfigProperty(defaultValue = "1000") int blockItemsBatchSize) {
@Loggable @ConfigProperty(defaultValue = "1000") int blockItemsBatchSize,
@Loggable @ConfigProperty(defaultValue = "RESEND_LAST") RecoveryMode recoveryMode) {

/**
* Creates a new {@link Builder} instance for constructing a {@code BlockStreamConfig}.
Expand All @@ -42,6 +44,7 @@ public static Builder builder() {
*/
public static class Builder {
private SimulatorMode simulatorMode = SimulatorMode.PUBLISHER_CLIENT;
private RecoveryMode recoveryMode = RecoveryMode.RESEND_LAST;
private int lastKnownStatusesCapacity = 10;
private int delayBetweenBlockItems = 1_500_000;
private int maxBlockItemsToStream = 10_000;
Expand All @@ -67,6 +70,11 @@ public Builder simulatorMode(SimulatorMode simulatorMode) {
return this;
}

public Builder recoveryMode(RecoveryMode recoveryMode) {
this.recoveryMode = recoveryMode;
return this;
}

/**
* Sets the capacity of the last known statuses.
*
Expand Down Expand Up @@ -146,7 +154,8 @@ public BlockStreamConfig build() {
maxBlockItemsToStream,
streamingMode,
millisecondsPerBlock,
blockItemsBatchSize);
blockItemsBatchSize,
recoveryMode);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@
@ConfigData("grpc")
public record GrpcConfig(
@Loggable @ConfigProperty(defaultValue = "localhost") String serverAddress,
@Loggable @ConfigProperty(defaultValue = "8080") @Min(0) @Max(65535) int port) {}
@Loggable @ConfigProperty(defaultValue = "8080") @Min(0) @Max(65535) int port,
@Loggable @ConfigProperty(defaultValue = "0") @Min(0) long rollbackDistance) {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// SPDX-License-Identifier: Apache-2.0
package org.hiero.block.simulator.config.types;

public enum RecoveryMode {
RESEND_LAST,
ROLLBACK,
SKIP_AHEAD
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.hiero.block.common.utils.FileUtilities;
import org.hiero.block.simulator.config.data.BlockGeneratorConfig;
import org.hiero.block.simulator.config.types.GenerationMode;
import org.hiero.block.simulator.exception.BlockSimulatorParsingException;

/**
* The BlockAsDirBlockStreamManager class implements the BlockStreamManager interface to manage the
Expand Down Expand Up @@ -99,6 +100,18 @@ public Block getNextBlock() {
return nextBlock;
}

@Override
public Block getLastBlock() throws IOException, BlockSimulatorParsingException {
int lastBlockNumberIndex = Math.max(0, currentBlockIndex - 1);
return blocks.get(lastBlockNumberIndex);
}

@Override
public Block getBlockByNumber(long blockNumber) throws IOException, BlockSimulatorParsingException {
int index = (int) Math.max(0, Math.min(blockNumber, blocks.size() - 1));
return blocks.get(index);
}

private void loadBlocks() throws IOException, ParseException {
final Path rootPath = Path.of(rootFolder);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.hiero.block.common.utils.FileUtilities;
import org.hiero.block.simulator.config.data.BlockGeneratorConfig;
import org.hiero.block.simulator.config.types.GenerationMode;
import org.hiero.block.simulator.exception.BlockSimulatorParsingException;

/** The block as file block stream manager. */
public class BlockAsFileBlockStreamManager implements BlockStreamManager {
Expand Down Expand Up @@ -92,6 +93,18 @@ public Block getNextBlock() {
return nextBlock;
}

@Override
public Block getLastBlock() throws IOException, BlockSimulatorParsingException {
int lastBlockNumberIndex = Math.max(0, currentBlockIndex - 1);
return blocks.get(lastBlockNumberIndex);
}

@Override
public Block getBlockByNumber(long blockNumber) throws IOException, BlockSimulatorParsingException {
int index = (int) Math.max(0, Math.min(blockNumber, blocks.size() - 1));
return blocks.get(index);
}

private void loadBlocks() throws IOException, ParseException {

final Path rootPath = Path.of(rootFolder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,36 @@ public Block getNextBlock() throws IOException, BlockSimulatorParsingException {

return block;
}

@Override
public Block getLastBlock() throws IOException, BlockSimulatorParsingException {
if (currentBlock == null) {
throw new IllegalStateException("No block has been loaded yet.");
}
return currentBlock;
}

@Override
public Block getBlockByNumber(long blockNumber) throws IOException, BlockSimulatorParsingException {
if (blockNumber <= 0) {
throw new IllegalArgumentException("Block number must be greater than 0");
}

final String blockFileName = String.format(formatString, blockNumber);
final Path blockPath = Path.of(blockStreamPath).resolve(blockFileName);

if (!Files.exists(blockPath)) {
throw new IOException("Block file does not exist: " + blockPath);
}

final byte[] blockBytes = FileUtilities.readFileBytesUnsafe(blockPath, RECORD_EXTENSION, GZ_EXTENSION);

if (blockBytes == null) {
throw new BlockSimulatorParsingException(
"Failed to read block file: " + blockPath + " with supported extensions");
}

LOGGER.log(INFO, "Loading block by number: " + blockNumber + " (" + blockPath.getFileName() + ")");
return Block.parseFrom(blockBytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ default void init() {}
* Get the next block item.
*
* @return the next block item
* @throws IOException if a I/O error occurs
* @throws IOException if a I/O error occurs
* @throws BlockSimulatorParsingException if a parse error occurs
*/
BlockItem getNextBlockItem() throws IOException, BlockSimulatorParsingException;
Expand All @@ -35,8 +35,12 @@ default void init() {}
* Get the next block.
*
* @return the next block
* @throws IOException if a I/O error occurs
* @throws IOException if a I/O error occurs
* @throws BlockSimulatorParsingException if a parse error occurs
*/
Block getNextBlock() throws IOException, BlockSimulatorParsingException;

Block getLastBlock() throws IOException, BlockSimulatorParsingException;

Block getBlockByNumber(long blockNumber) throws IOException, BlockSimulatorParsingException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class CraftBlockStreamManager implements BlockStreamManager {
private long currentBlockNumber;
private StreamingTreeHasher inputTreeHasher;
private StreamingTreeHasher outputTreeHasher;
private Block block;

/**
* Constructs a new CraftBlockStreamManager with the specified configuration.
Expand Down Expand Up @@ -148,9 +149,23 @@ public Block getNextBlock() throws IOException, BlockSimulatorParsingException {
ItemHandler proofItemHandler = new BlockProofHandler(previousBlockHash, currentBlockHash, currentBlockNumber);
items.add(proofItemHandler);
resetState();
return Block.newBuilder()
block = Block.newBuilder()
.addAllItems(items.stream().map(ItemHandler::getItem).toList())
.build();
return block;
}

@Override
public Block getLastBlock() throws IOException, BlockSimulatorParsingException {
if (block == null) {
throw new IllegalStateException("No block has been generated yet.");
}
return block;
}

@Override
public Block getBlockByNumber(long blockNumber) throws IOException, BlockSimulatorParsingException {
throw new UnsupportedOperationException("Craft mode does not support fetching specific blocks by number.");
}

private void updateCurrentBlockHash() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public interface PublishStreamGrpcClient {
*/
List<String> getLastKnownStatuses();

void recoverStream();

/**
* Shutdowns the channel.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
Expand All @@ -25,6 +26,8 @@
import org.hiero.block.common.utils.ChunkUtils;
import org.hiero.block.simulator.config.data.BlockStreamConfig;
import org.hiero.block.simulator.config.data.GrpcConfig;
import org.hiero.block.simulator.exception.BlockSimulatorParsingException;
import org.hiero.block.simulator.generator.BlockStreamManager;
import org.hiero.block.simulator.grpc.PublishStreamGrpcClient;
import org.hiero.block.simulator.metrics.MetricsService;
import org.hiero.block.simulator.startup.SimulatorStartupData;
Expand All @@ -47,12 +50,14 @@
// gRPC components
private ManagedChannel channel;
private StreamObserver<PublishStreamRequest> requestStreamObserver;
private BlockStreamServiceGrpc.BlockStreamServiceStub stub;

// State
private final AtomicBoolean streamEnabled;
private final int lastKnownStatusesCapacity;
private final Deque<String> lastKnownStatuses;
private final SimulatorStartupData startupData;
private final BlockStreamManager blockStreamManager;

/**
* Creates a new PublishStreamGrpcClientImpl with the specified dependencies.
Expand All @@ -70,12 +75,14 @@
@NonNull final BlockStreamConfig blockStreamConfig,
@NonNull final MetricsService metricsService,
@NonNull final AtomicBoolean streamEnabled,
@NonNull final SimulatorStartupData startupData) {
@NonNull final SimulatorStartupData startupData,
@NonNull final BlockStreamManager blockStreamManager) {
this.grpcConfig = requireNonNull(grpcConfig);
this.blockStreamConfig = requireNonNull(blockStreamConfig);
this.metricsService = requireNonNull(metricsService);
this.streamEnabled = requireNonNull(streamEnabled);
this.lastKnownStatusesCapacity = blockStreamConfig.lastKnownStatusesCapacity();
this.blockStreamManager = blockStreamManager;
this.lastKnownStatuses = new ArrayDeque<>(this.lastKnownStatusesCapacity);
this.startupData = requireNonNull(startupData);
}
Expand All @@ -88,13 +95,81 @@
channel = ManagedChannelBuilder.forAddress(grpcConfig.serverAddress(), grpcConfig.port())
.usePlaintext()
.build();
final BlockStreamServiceGrpc.BlockStreamServiceStub stub = BlockStreamServiceGrpc.newStub(channel);
final PublishStreamObserver publishStreamObserver =
new PublishStreamObserver(startupData, streamEnabled, lastKnownStatuses, lastKnownStatusesCapacity);
requestStreamObserver = stub.publishBlockStream(publishStreamObserver);
stub = BlockStreamServiceGrpc.newStub(channel);
createNewStreamObserver();
lastKnownStatuses.clear();
}

private void createNewStreamObserver() {
final PublishStreamObserver publishStreamObserver = new PublishStreamObserver(
startupData, streamEnabled, lastKnownStatuses, lastKnownStatusesCapacity, this);
requestStreamObserver = stub.publishBlockStream(publishStreamObserver);
}

@Override
public void recoverStream() {
int maxRetries = 5;
long baseBackoff = 2000;

for (int attempt = 0; attempt < maxRetries; attempt++) {
try {
long waitMillis = Math.min(60000, (long) Math.pow(2, attempt) * baseBackoff);
LOGGER.log(
INFO, "Attempting to recover stream (attempt " + (attempt + 1) + ") in " + waitMillis + " ms");
Thread.sleep(waitMillis);

resetBlockPointer();
createNewStreamObserver();
streamEnabled.set(true);
LOGGER.log(INFO, "Stream recovered successfully on attempt " + (attempt + 1));
return;
} catch (Exception e) {
LOGGER.log(ERROR, "Stream recovery failed on attempt " + (attempt + 1), e);
}
}

LOGGER.log(ERROR, "Max retries reached. Stream could not be recovered.");
streamEnabled.set(false);
}

private void resetBlockPointer() throws BlockSimulatorParsingException, IOException {
long lastAckedBlock = startupData.getLatestAckBlockNumber();
LOGGER.log(INFO, "Resetting block pointer. Last acknowledged block: " + lastAckedBlock);

switch (blockStreamConfig.recoveryMode()) {

Check warning on line 139 in simulator/src/main/java/org/hiero/block/simulator/grpc/impl/PublishStreamGrpcClientImpl.java

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

simulator/src/main/java/org/hiero/block/simulator/grpc/impl/PublishStreamGrpcClientImpl.java#L139

Switch statements should be exhaustive, add a default case (or missing enum branches)
case RESEND_LAST -> {
LOGGER.log(INFO, "Recovery strategy: RESEND_LAST");
Block block = blockStreamManager.getLastBlock();
if (block != null) {
streamBlock(block);
} else {
LOGGER.log(INFO, "Block #" + lastAckedBlock + " not found in stream. Skipping resend.");
}
}

case ROLLBACK -> {
LOGGER.log(INFO, "Recovery strategy: ROLLBACK");
long rollbackBlockNumber = Math.max(0, lastAckedBlock - grpcConfig.rollbackDistance());
LOGGER.log(INFO, "Rolling back to block #" + rollbackBlockNumber);

Block block = blockStreamManager.getBlockByNumber(rollbackBlockNumber);
while (block != null && rollbackBlockNumber <= lastAckedBlock) {
streamBlock(block);
rollbackBlockNumber++;
block = blockStreamManager.getBlockByNumber(rollbackBlockNumber);
}
}

case SKIP_AHEAD -> {
LOGGER.log(INFO, "Recovery strategy: SKIP_AHEAD");
Block block = blockStreamManager.getNextBlock();
if (block != null) {
streamBlock(block);
}
}
}
}

/**
* Streams a list of block items to the server.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class PublishStreamObserver implements StreamObserver<PublishStreamRespon
private final int lastKnownStatusesCapacity;
private final Deque<String> lastKnownStatuses;
private final SimulatorStartupData startupData;
private final PublishStreamGrpcClientImpl grpcClient;

/**
* Creates a new PublishStreamObserver instance.
Expand All @@ -43,11 +44,13 @@ public PublishStreamObserver(
@NonNull final SimulatorStartupData startupData,
@NonNull final AtomicBoolean streamEnabled,
@NonNull final Deque<String> lastKnownStatuses,
final int lastKnownStatusesCapacity) {
final int lastKnownStatusesCapacity,
@NonNull final PublishStreamGrpcClientImpl grpcClient) {
this.streamEnabled = requireNonNull(streamEnabled);
this.lastKnownStatuses = requireNonNull(lastKnownStatuses);
this.lastKnownStatusesCapacity = lastKnownStatusesCapacity;
this.startupData = requireNonNull(startupData);
this.grpcClient = requireNonNull(grpcClient);
}

/**
Expand Down Expand Up @@ -90,6 +93,7 @@ public void onError(@NonNull final Throwable streamError) {
Status status = Status.fromThrowable(streamError);
lastKnownStatuses.add(status.toString());
LOGGER.log(ERROR, "Error %s with status %s.".formatted(streamError, status), streamError);
grpcClient.recoverStream();
}

/**
Expand Down
Loading
Loading