diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index d68ed06d3070d..9bcaa48587f78 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -22,6 +22,10 @@ import kafka.server.ReplicaManager; import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.errors.NotLeaderOrFollowerException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.message.ShareFetchResponseData; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.utils.Time; @@ -34,8 +38,12 @@ import org.apache.kafka.server.share.fetch.ShareFetchPartitionData; import org.apache.kafka.server.share.metrics.ShareGroupMetrics; import org.apache.kafka.server.storage.log.FetchIsolation; +import org.apache.kafka.server.storage.log.FetchPartitionData; +import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.LogOffsetMetadata; import org.apache.kafka.storage.internals.log.LogOffsetSnapshot; +import org.apache.kafka.storage.internals.log.RemoteLogReadResult; +import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo; import com.yammer.metrics.core.Meter; @@ -44,10 +52,16 @@ import java.util.ArrayList; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.function.BiConsumer; @@ -83,7 +97,9 @@ public class DelayedShareFetch extends DelayedOperation { // Tracks the start time to acquire any share partition for a fetch request. private long acquireStartTimeMs; private LinkedHashMap<TopicIdPartition, Long> partitionsAcquired; - private LinkedHashMap<TopicIdPartition, LogReadResult> partitionsAlreadyFetched; + private LinkedHashMap<TopicIdPartition, LogReadResult> localPartitionsAlreadyFetched; + private Optional<RemoteFetch> remoteFetchOpt; + private Optional<Exception> remoteStorageFetchException; /** * This function constructs an instance of delayed share fetch operation for completing share fetch @@ -110,10 +126,24 @@ public DelayedShareFetch( sharePartitions, PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM), shareGroupMetrics, - time + time, + Optional.empty() ); } + /** + * This function constructs an instance of delayed share fetch operation for completing share fetch + * requests instantaneously or with delay. The direct usage of this constructor is only from tests. + * + * @param shareFetch The share fetch parameters of the share fetch request. + * @param replicaManager The replica manager instance used to read from log/complete the request. + * @param exceptionHandler The handler to complete share fetch requests with exception. + * @param sharePartitions The share partitions referenced in the share fetch request. + * @param partitionMaxBytesStrategy The strategy to identify the max bytes for topic partitions in the share fetch request. + * @param shareGroupMetrics The share group metrics to record the metrics. + * @param time The system time. + * @param remoteFetchOpt Optional containing an in-flight remote fetch object or an empty optional. + */ DelayedShareFetch( ShareFetch shareFetch, ReplicaManager replicaManager, @@ -121,19 +151,22 @@ public DelayedShareFetch( LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions, PartitionMaxBytesStrategy partitionMaxBytesStrategy, ShareGroupMetrics shareGroupMetrics, - Time time + Time time, + Optional<RemoteFetch> remoteFetchOpt ) { super(shareFetch.fetchParams().maxWaitMs, Optional.empty()); this.shareFetch = shareFetch; this.replicaManager = replicaManager; this.partitionsAcquired = new LinkedHashMap<>(); - this.partitionsAlreadyFetched = new LinkedHashMap<>(); + this.localPartitionsAlreadyFetched = new LinkedHashMap<>(); this.exceptionHandler = exceptionHandler; this.sharePartitions = sharePartitions; this.partitionMaxBytesStrategy = partitionMaxBytesStrategy; this.shareGroupMetrics = shareGroupMetrics; this.time = time; this.acquireStartTimeMs = time.hiResClockMs(); + this.remoteFetchOpt = remoteFetchOpt; + this.remoteStorageFetchException = Optional.empty(); // Register metrics for DelayedShareFetch. KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.server", "DelayedShareFetchMetrics"); this.expiredRequestMeter = metricsGroup.newMeter(EXPIRES_PER_SEC, "requests", TimeUnit.SECONDS); @@ -152,58 +185,68 @@ public void onExpiration() { @Override public void onComplete() { // We are utilizing lock so that onComplete doesn't do a dirty read for instance variables - - // partitionsAcquired and partitionsAlreadyFetched, since these variables can get updated in a different tryComplete thread. + // partitionsAcquired and localPartitionsAlreadyFetched, since these variables can get updated in a different tryComplete thread. lock.lock(); log.trace("Completing the delayed share fetch request for group {}, member {}, " + "topic partitions {}", shareFetch.groupId(), shareFetch.memberId(), partitionsAcquired.keySet()); try { - LinkedHashMap<TopicIdPartition, Long> topicPartitionData; - // tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch. - if (partitionsAcquired.isEmpty()) { - topicPartitionData = acquirablePartitions(); - // The TopicPartitionsAcquireTimeMs metric signifies the tension when acquiring the locks - // for the share partition, hence if no partitions are yet acquired by tryComplete, - // we record the metric here. Do not check if the request has successfully acquired any - // partitions now or not, as then the upper bound of request timeout shall be recorded - // for the metric. - updateAcquireElapsedTimeMetric(); - } else { - // tryComplete invoked forceComplete, so we can use the data from tryComplete. - topicPartitionData = partitionsAcquired; - } - - if (topicPartitionData.isEmpty()) { - // No locks for share partitions could be acquired, so we complete the request with an empty response. - shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), 0); - shareFetch.maybeComplete(Map.of()); - return; + if (remoteStorageFetchException.isPresent()) { + completeErroneousRemoteShareFetchRequest(); + } else if (remoteFetchOpt.isPresent()) { + completeRemoteStorageShareFetchRequest(); } else { - // Update metric to record acquired to requested partitions. - double requestTopicToAcquired = (double) topicPartitionData.size() / shareFetch.topicIdPartitions().size(); - shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) (requestTopicToAcquired * 100)); + completeLocalLogShareFetchRequest(); } - log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}", - topicPartitionData, shareFetch.groupId(), shareFetch.fetchParams()); - - completeShareFetchRequest(topicPartitionData); } finally { lock.unlock(); } } - private void completeShareFetchRequest(LinkedHashMap<TopicIdPartition, Long> topicPartitionData) { + private void completeLocalLogShareFetchRequest() { + LinkedHashMap<TopicIdPartition, Long> topicPartitionData; + // tryComplete did not invoke forceComplete, so we need to check if we have any partitions to fetch. + if (partitionsAcquired.isEmpty()) { + topicPartitionData = acquirablePartitions(sharePartitions); + // The TopicPartitionsAcquireTimeMs metric signifies the tension when acquiring the locks + // for the share partition, hence if no partitions are yet acquired by tryComplete, + // we record the metric here. Do not check if the request has successfully acquired any + // partitions now or not, as then the upper bound of request timeout shall be recorded + // for the metric. + updateAcquireElapsedTimeMetric(); + } else { + // tryComplete invoked forceComplete, so we can use the data from tryComplete. + topicPartitionData = partitionsAcquired; + } + + if (topicPartitionData.isEmpty()) { + // No locks for share partitions could be acquired, so we complete the request with an empty response. + shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), 0); + shareFetch.maybeComplete(Map.of()); + return; + } else { + // Update metric to record acquired to requested partitions. + double requestTopicToAcquired = (double) topicPartitionData.size() / shareFetch.topicIdPartitions().size(); + shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) (requestTopicToAcquired * 100)); + } + log.trace("Fetchable share partitions data: {} with groupId: {} fetch params: {}", + topicPartitionData, shareFetch.groupId(), shareFetch.fetchParams()); + + processAcquiredTopicPartitionsForLocalLogFetch(topicPartitionData); + } + + private void processAcquiredTopicPartitionsForLocalLogFetch(LinkedHashMap<TopicIdPartition, Long> topicPartitionData) { try { LinkedHashMap<TopicIdPartition, LogReadResult> responseData; - if (partitionsAlreadyFetched.isEmpty()) + if (localPartitionsAlreadyFetched.isEmpty()) responseData = readFromLog( topicPartitionData, partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes, topicPartitionData.keySet(), topicPartitionData.size())); else // There shouldn't be a case when we have a partitionsAlreadyFetched value here and this variable is getting // updated in a different tryComplete thread. - responseData = combineLogReadResponse(topicPartitionData, partitionsAlreadyFetched); + responseData = combineLogReadResponse(topicPartitionData, localPartitionsAlreadyFetched); List<ShareFetchPartitionData> shareFetchPartitionDataList = new ArrayList<>(); responseData.forEach((topicIdPartition, logReadResult) -> @@ -225,15 +268,7 @@ private void completeShareFetchRequest(LinkedHashMap<TopicIdPartition, Long> top log.error("Error processing delayed share fetch request", e); handleFetchException(shareFetch, topicPartitionData.keySet(), e); } finally { - // Releasing the lock to move ahead with the next request in queue. - releasePartitionLocks(topicPartitionData.keySet()); - // If we have a fetch request completed for a topic-partition, we release the locks for that partition, - // then we should check if there is a pending share fetch request for the topic-partition and complete it. - // We add the action to delayed actions queue to avoid an infinite call stack, which could happen if - // we directly call delayedShareFetchPurgatory.checkAndComplete - replicaManager.addToActionQueue(() -> topicPartitionData.keySet().forEach(topicIdPartition -> - replicaManager.completeDelayedShareFetchRequest( - new DelayedShareFetchGroupKey(shareFetch.groupId(), topicIdPartition.topicId(), topicIdPartition.partition())))); + releasePartitionLocksAndAddToActionQueue(topicPartitionData.keySet()); } } @@ -242,8 +277,12 @@ private void completeShareFetchRequest(LinkedHashMap<TopicIdPartition, Long> top */ @Override public boolean tryComplete() { - LinkedHashMap<TopicIdPartition, Long> topicPartitionData = acquirablePartitions(); + // Check to see if the remote fetch is in flight. If there is an in flight remote fetch we want to resolve it first. + if (remoteFetchOpt.isPresent()) { + return maybeCompletePendingRemoteFetch(); + } + LinkedHashMap<TopicIdPartition, Long> topicPartitionData = acquirablePartitions(sharePartitions); try { if (!topicPartitionData.isEmpty()) { // Update the metric to record the time taken to acquire the locks for the share partitions. @@ -252,13 +291,19 @@ public boolean tryComplete() { // replicaManager.readFromLog to populate the offset metadata and update the fetch offset metadata for // those topic partitions. LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse = maybeReadFromLog(topicPartitionData); + // Store the remote fetch info and the topic partition for which we need to perform remote fetch. + Optional<TopicPartitionRemoteFetchInfo> topicPartitionRemoteFetchInfoOpt = maybePrepareRemoteStorageFetchInfo(topicPartitionData, replicaManagerReadResponse); + + if (topicPartitionRemoteFetchInfoOpt.isPresent()) { + return maybeProcessRemoteFetch(topicPartitionData, topicPartitionRemoteFetchInfoOpt.get()); + } maybeUpdateFetchOffsetMetadata(topicPartitionData, replicaManagerReadResponse); if (anyPartitionHasLogReadError(replicaManagerReadResponse) || isMinBytesSatisfied(topicPartitionData, partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes, topicPartitionData.keySet(), topicPartitionData.size()))) { partitionsAcquired = topicPartitionData; - partitionsAlreadyFetched = replicaManagerReadResponse; + localPartitionsAlreadyFetched = replicaManagerReadResponse; boolean completedByMe = forceComplete(); // If invocation of forceComplete is not successful, then that means the request is already completed - // hence release the acquired locks. + // hence the acquired locks are already released. if (!completedByMe) { releasePartitionLocks(partitionsAcquired.keySet()); } @@ -277,10 +322,25 @@ public boolean tryComplete() { return false; } catch (Exception e) { log.error("Error processing delayed share fetch request", e); - releasePartitionLocks(topicPartitionData.keySet()); - partitionsAcquired.clear(); - partitionsAlreadyFetched.clear(); - return forceComplete(); + // In case we have a remote fetch exception, we have already released locks for partitions which have potential + // local log read. We do not release locks for partitions which have a remote storage read because we need to + // complete the share fetch request in onComplete and if we release the locks early here, some other DelayedShareFetch + // request might get the locks for those partitions without this one getting complete. + if (remoteStorageFetchException.isEmpty()) { + releasePartitionLocks(topicPartitionData.keySet()); + partitionsAcquired.clear(); + localPartitionsAlreadyFetched.clear(); + return forceComplete(); + } else { + boolean completedByMe = forceComplete(); + // If invocation of forceComplete is not successful, then that means the request is already completed + // hence the acquired locks are already released. This can occur in case of remote storage fetch if there is a thread that + // completes the operation (due to expiration) right before a different thread is about to enter tryComplete. + if (!completedByMe) { + releasePartitionLocks(partitionsAcquired.keySet()); + } + return completedByMe; + } } } @@ -288,11 +348,13 @@ public boolean tryComplete() { * Prepare fetch request structure for partitions in the share fetch request for which we can acquire records. */ // Visible for testing - LinkedHashMap<TopicIdPartition, Long> acquirablePartitions() { + LinkedHashMap<TopicIdPartition, Long> acquirablePartitions( + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitionsForAcquire + ) { // Initialize the topic partitions for which the fetch should be attempted. LinkedHashMap<TopicIdPartition, Long> topicPartitionData = new LinkedHashMap<>(); - sharePartitions.forEach((topicIdPartition, sharePartition) -> { + sharePartitionsForAcquire.forEach((topicIdPartition, sharePartition) -> { // Add the share partition to the list of partitions to be fetched only if we can // acquire the fetch lock on it. if (sharePartition.maybeAcquireFetchLock()) { @@ -529,8 +591,300 @@ Lock lock() { return lock; } + // Visible for testing. + RemoteFetch remoteFetch() { + return remoteFetchOpt.orElse(null); + } + // Visible for testing. Meter expiredRequestMeter() { return expiredRequestMeter; } + + private Optional<TopicPartitionRemoteFetchInfo> maybePrepareRemoteStorageFetchInfo( + LinkedHashMap<TopicIdPartition, Long> topicPartitionData, + LinkedHashMap<TopicIdPartition, LogReadResult> replicaManagerReadResponse + ) { + Optional<TopicPartitionRemoteFetchInfo> topicPartitionRemoteFetchInfoOpt = Optional.empty(); + for (Map.Entry<TopicIdPartition, LogReadResult> entry : replicaManagerReadResponse.entrySet()) { + TopicIdPartition topicIdPartition = entry.getKey(); + LogReadResult logReadResult = entry.getValue(); + if (logReadResult.info().delayedRemoteStorageFetch.isPresent()) { + // TODO: There is a limitation in remote storage fetch for consumer groups that we can only perform remote fetch for + // a single topic partition in a fetch request. Since, the logic of fetch is largely based on how consumer groups work, + // we are following the same logic. However, this problem should be addressed as part of KAFKA-19133 which should help us perform + // fetch for multiple remote fetch topic partition in a single share fetch request + topicPartitionRemoteFetchInfoOpt = Optional.of(new TopicPartitionRemoteFetchInfo(topicIdPartition, logReadResult)); + partitionsAcquired.put(topicIdPartition, topicPartitionData.get(topicIdPartition)); + break; + } + } + return topicPartitionRemoteFetchInfoOpt; + } + + private boolean maybeProcessRemoteFetch( + LinkedHashMap<TopicIdPartition, Long> topicPartitionData, + TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo + ) { + Set<TopicIdPartition> nonRemoteFetchTopicPartitions = new LinkedHashSet<>(); + topicPartitionData.keySet().forEach(topicIdPartition -> { + // topic partitions for which fetch would not be happening in this share fetch request. + if (!topicPartitionRemoteFetchInfo.topicIdPartition().equals(topicIdPartition)) { + nonRemoteFetchTopicPartitions.add(topicIdPartition); + } + }); + // Release fetch lock for the topic partitions that were acquired but were not a part of remote fetch and add + // them to the delayed actions queue. + releasePartitionLocksAndAddToActionQueue(nonRemoteFetchTopicPartitions); + processRemoteFetchOrException(topicPartitionRemoteFetchInfo); + // Check if remote fetch can be completed. + return maybeCompletePendingRemoteFetch(); + } + + /** + * Throws an exception if a task for remote storage fetch could not be scheduled successfully else updates remoteFetchOpt. + * @param topicPartitionRemoteFetchInfo - The remote storage fetch information. + */ + private void processRemoteFetchOrException( + TopicPartitionRemoteFetchInfo topicPartitionRemoteFetchInfo + ) { + TopicIdPartition remoteFetchTopicIdPartition = topicPartitionRemoteFetchInfo.topicIdPartition(); + RemoteStorageFetchInfo remoteStorageFetchInfo = topicPartitionRemoteFetchInfo.logReadResult().info().delayedRemoteStorageFetch.get(); + + Future<Void> remoteFetchTask; + CompletableFuture<RemoteLogReadResult> remoteFetchResult = new CompletableFuture<>(); + try { + remoteFetchTask = replicaManager.remoteLogManager().get().asyncRead( + remoteStorageFetchInfo, + result -> { + remoteFetchResult.complete(result); + replicaManager.completeDelayedShareFetchRequest(new DelayedShareFetchGroupKey(shareFetch.groupId(), remoteFetchTopicIdPartition.topicId(), remoteFetchTopicIdPartition.partition())); + } + ); + } catch (Exception e) { + // Throw the error if any in scheduling the remote fetch task. + remoteStorageFetchException = Optional.of(e); + throw e; + } + remoteFetchOpt = Optional.of(new RemoteFetch(remoteFetchTopicIdPartition, topicPartitionRemoteFetchInfo.logReadResult(), remoteFetchTask, remoteFetchResult, remoteStorageFetchInfo)); + } + + /** + * This function checks if the remote fetch can be completed or not. It should always be called once you confirm remoteFetchOpt.isPresent(). + * The operation can be completed if: + * Case a: The partition is in an offline log directory on this broker + * Case b: This broker does not know the partition it tries to fetch + * Case c: This broker is no longer the leader of the partition it tries to fetch + * Case d: The remote storage read request completed (succeeded or failed) + * @return boolean representing whether the remote fetch is completed or not. + */ + private boolean maybeCompletePendingRemoteFetch() { + boolean canComplete = false; + + TopicIdPartition topicIdPartition = remoteFetchOpt.get().topicIdPartition(); + try { + replicaManager.getPartitionOrException(topicIdPartition.topicPartition()); + } catch (KafkaStorageException e) { // Case a + log.debug("TopicPartition {} is in an offline log directory, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); + canComplete = true; + } catch (UnknownTopicOrPartitionException e) { // Case b + log.debug("Broker no longer knows of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); + canComplete = true; + } catch (NotLeaderOrFollowerException e) { // Case c + log.debug("Broker is no longer the leader or follower of topicPartition {}, satisfy {} immediately", topicIdPartition, shareFetch.fetchParams()); + canComplete = true; + } + + if (canComplete || remoteFetchOpt.get().remoteFetchResult().isDone()) { // Case d + boolean completedByMe = forceComplete(); + // If invocation of forceComplete is not successful, then that means the request is already completed + // hence release the acquired locks. + if (!completedByMe) { + releasePartitionLocks(partitionsAcquired.keySet()); + } + return completedByMe; + } else + return false; + } + + /** + * This function completes a share fetch request for which we have identified erroneous remote storage fetch in tryComplete() + * It should only be called when we know that there is remote fetch in-flight/completed. + */ + private void completeErroneousRemoteShareFetchRequest() { + try { + handleFetchException(shareFetch, partitionsAcquired.keySet(), remoteStorageFetchException.get()); + } finally { + releasePartitionLocksAndAddToActionQueue(partitionsAcquired.keySet()); + } + + } + + private void releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition> topicIdPartitions) { + // Releasing the lock to move ahead with the next request in queue. + releasePartitionLocks(topicIdPartitions); + // If we have a fetch request completed for a topic-partition, we release the locks for that partition, + // then we should check if there is a pending share fetch request for the topic-partition and complete it. + // We add the action to delayed actions queue to avoid an infinite call stack, which could happen if + // we directly call delayedShareFetchPurgatory.checkAndComplete + replicaManager.addToActionQueue(() -> topicIdPartitions.forEach(topicIdPartition -> + replicaManager.completeDelayedShareFetchRequest( + new DelayedShareFetchGroupKey(shareFetch.groupId(), topicIdPartition.topicId(), topicIdPartition.partition())))); + } + + /** + * This function completes a share fetch request for which we have identified remoteFetch during tryComplete() + * Note - This function should only be called when we know that there is remote fetch. + */ + private void completeRemoteStorageShareFetchRequest() { + LinkedHashMap<TopicIdPartition, Long> acquiredNonRemoteFetchTopicPartitionData = new LinkedHashMap<>(); + try { + List<ShareFetchPartitionData> shareFetchPartitionData = new ArrayList<>(); + int readableBytes = 0; + if (remoteFetchOpt.get().remoteFetchResult().isDone()) { + RemoteFetch remoteFetch = remoteFetchOpt.get(); + RemoteLogReadResult remoteLogReadResult = remoteFetch.remoteFetchResult().get(); + if (remoteLogReadResult.error.isPresent()) { + Throwable error = remoteLogReadResult.error.get(); + // If there is any error for the remote fetch topic partition, we populate the error accordingly. + shareFetchPartitionData.add( + new ShareFetchPartitionData( + remoteFetch.topicIdPartition(), + partitionsAcquired.get(remoteFetch.topicIdPartition()), + ReplicaManager.createLogReadResult(error).toFetchPartitionData(false) + ) + ); + } else { + FetchDataInfo info = remoteLogReadResult.fetchDataInfo.get(); + TopicIdPartition topicIdPartition = remoteFetch.topicIdPartition(); + LogReadResult logReadResult = remoteFetch.logReadResult(); + shareFetchPartitionData.add( + new ShareFetchPartitionData( + topicIdPartition, + partitionsAcquired.get(remoteFetch.topicIdPartition()), + new FetchPartitionData( + logReadResult.error(), + logReadResult.highWatermark(), + logReadResult.leaderLogStartOffset(), + info.records, + Optional.empty(), + logReadResult.lastStableOffset().isDefined() ? OptionalLong.of((Long) logReadResult.lastStableOffset().get()) : OptionalLong.empty(), + info.abortedTransactions, + logReadResult.preferredReadReplica().isDefined() ? OptionalInt.of((Integer) logReadResult.preferredReadReplica().get()) : OptionalInt.empty(), + false + ) + ) + ); + readableBytes += info.records.sizeInBytes(); + } + } else { + cancelRemoteFetchTask(); + } + + // If remote fetch bytes < shareFetch.fetchParams().maxBytes, then we will try for a local read. + if (readableBytes < shareFetch.fetchParams().maxBytes) { + // Get the local log read based topic partitions. + LinkedHashMap<TopicIdPartition, SharePartition> nonRemoteFetchSharePartitions = new LinkedHashMap<>(); + sharePartitions.forEach((topicIdPartition, sharePartition) -> { + if (!partitionsAcquired.containsKey(topicIdPartition)) { + nonRemoteFetchSharePartitions.put(topicIdPartition, sharePartition); + } + }); + acquiredNonRemoteFetchTopicPartitionData = acquirablePartitions(nonRemoteFetchSharePartitions); + if (!acquiredNonRemoteFetchTopicPartitionData.isEmpty()) { + log.trace("Fetchable local share partitions for a remote share fetch request data: {} with groupId: {} fetch params: {}", + acquiredNonRemoteFetchTopicPartitionData, shareFetch.groupId(), shareFetch.fetchParams()); + + LinkedHashMap<TopicIdPartition, LogReadResult> responseData = readFromLog( + acquiredNonRemoteFetchTopicPartitionData, + partitionMaxBytesStrategy.maxBytes(shareFetch.fetchParams().maxBytes - readableBytes, acquiredNonRemoteFetchTopicPartitionData.keySet(), acquiredNonRemoteFetchTopicPartitionData.size())); + for (Map.Entry<TopicIdPartition, LogReadResult> entry : responseData.entrySet()) { + if (entry.getValue().info().delayedRemoteStorageFetch.isEmpty()) { + shareFetchPartitionData.add( + new ShareFetchPartitionData( + entry.getKey(), + acquiredNonRemoteFetchTopicPartitionData.get(entry.getKey()), + entry.getValue().toFetchPartitionData(false) + ) + ); + } + } + } + } + + // Update metric to record acquired to requested partitions. + double acquiredRatio = (double) (partitionsAcquired.size() + acquiredNonRemoteFetchTopicPartitionData.size()) / shareFetch.topicIdPartitions().size(); + if (acquiredRatio > 0) + shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) (acquiredRatio * 100)); + + Map<TopicIdPartition, ShareFetchResponseData.PartitionData> remoteFetchResponse = ShareFetchUtils.processFetchResponse( + shareFetch, shareFetchPartitionData, sharePartitions, replicaManager, exceptionHandler); + shareFetch.maybeComplete(remoteFetchResponse); + log.trace("Remote share fetch request completed successfully, response: {}", remoteFetchResponse); + } catch (InterruptedException | ExecutionException e) { + log.error("Exception occurred in completing remote fetch {} for delayed share fetch request {}", remoteFetchOpt.get(), e); + handleExceptionInCompletingRemoteStorageShareFetchRequest(acquiredNonRemoteFetchTopicPartitionData.keySet(), e); + } catch (Exception e) { + log.error("Unexpected error in processing delayed share fetch request", e); + handleExceptionInCompletingRemoteStorageShareFetchRequest(acquiredNonRemoteFetchTopicPartitionData.keySet(), e); + } finally { + Set<TopicIdPartition> topicIdPartitions = new LinkedHashSet<>(partitionsAcquired.keySet()); + topicIdPartitions.addAll(acquiredNonRemoteFetchTopicPartitionData.keySet()); + releasePartitionLocksAndAddToActionQueue(topicIdPartitions); + } + } + + private void handleExceptionInCompletingRemoteStorageShareFetchRequest( + Set<TopicIdPartition> acquiredNonRemoteFetchTopicPartitions, + Exception e + ) { + Set<TopicIdPartition> topicIdPartitions = new LinkedHashSet<>(partitionsAcquired.keySet()); + topicIdPartitions.addAll(acquiredNonRemoteFetchTopicPartitions); + handleFetchException(shareFetch, topicIdPartitions, e); + } + + /** + * Cancel the remote storage read task, if it has not been executed yet and avoid interrupting the task if it is + * already running as it may force closing opened/cached resources as transaction index. + * Note - This function should only be called when we know that there is remote fetch. + */ + private void cancelRemoteFetchTask() { + boolean cancelled = remoteFetchOpt.get().remoteFetchTask().cancel(false); + if (!cancelled) { + log.debug("Remote fetch task for RemoteStorageFetchInfo: {} could not be cancelled and its isDone value is {}", + remoteFetchOpt.get().remoteFetchInfo(), remoteFetchOpt.get().remoteFetchTask().isDone()); + } + } + + public record RemoteFetch( + TopicIdPartition topicIdPartition, + LogReadResult logReadResult, + Future<Void> remoteFetchTask, + CompletableFuture<RemoteLogReadResult> remoteFetchResult, + RemoteStorageFetchInfo remoteFetchInfo + ) { + @Override + public String toString() { + return "RemoteFetch(" + + "topicIdPartition=" + topicIdPartition + + ", logReadResult=" + logReadResult + + ", remoteFetchTask=" + remoteFetchTask + + ", remoteFetchResult=" + remoteFetchResult + + ", remoteFetchInfo=" + remoteFetchInfo + + ")"; + } + } + + public record TopicPartitionRemoteFetchInfo( + TopicIdPartition topicIdPartition, + LogReadResult logReadResult + ) { + @Override + public String toString() { + return "TopicPartitionRemoteFetchInfo(" + + "topicIdPartition=" + topicIdPartition + + ", logReadResult=" + logReadResult + + ")"; + } + } } diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index bb8b51b40e297..43ece70ca0ee6 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -25,10 +25,15 @@ import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.message.ShareFetchResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Records; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.log.remote.storage.RemoteLogManager; import org.apache.kafka.server.purgatory.DelayedOperationKey; import org.apache.kafka.server.purgatory.DelayedOperationPurgatory; import org.apache.kafka.server.share.SharePartitionKey; @@ -46,6 +51,8 @@ import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.LogOffsetMetadata; import org.apache.kafka.storage.internals.log.LogOffsetSnapshot; +import org.apache.kafka.storage.internals.log.RemoteLogReadResult; +import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; import org.junit.jupiter.api.AfterEach; @@ -61,10 +68,15 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.stream.Collectors; +import scala.Option; import scala.Tuple2; +import scala.collection.Seq; import scala.jdk.javaapi.CollectionConverters; import static kafka.server.share.SharePartitionManagerTest.DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL; @@ -73,6 +85,7 @@ import static org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -95,6 +108,8 @@ public class DelayedShareFetchTest { private static final FetchParams FETCH_PARAMS = new FetchParams( FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty(), true); + private static final FetchDataInfo REMOTE_FETCH_INFO = new FetchDataInfo(new LogOffsetMetadata(0, 0, 0), + MemoryRecords.EMPTY, false, Optional.empty(), Optional.of(mock(RemoteStorageFetchInfo.class))); private static final BrokerTopicStats BROKER_TOPIC_STATS = new BrokerTopicStats(); private Timer mockTimer; @@ -487,7 +502,7 @@ public void testToCompleteAnAlreadyCompletedFuture() { delayedShareFetch.forceComplete(); assertTrue(delayedShareFetch.isCompleted()); // Verifying that the first forceComplete calls acquirablePartitions method in DelayedShareFetch. - Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions(); + Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions(sharePartitions); assertEquals(0, future.join().size()); assertTrue(delayedShareFetch.lock().tryLock()); delayedShareFetch.lock().unlock(); @@ -497,7 +512,7 @@ public void testToCompleteAnAlreadyCompletedFuture() { delayedShareFetch.forceComplete(); assertTrue(delayedShareFetch.isCompleted()); // Verifying that the second forceComplete does not call acquirablePartitions method in DelayedShareFetch. - Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions(); + Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions(sharePartitions); Mockito.verify(delayedShareFetch, times(0)).releasePartitionLocks(any()); assertTrue(delayedShareFetch.lock().tryLock()); // Assert both metrics shall be recorded only once. @@ -1155,6 +1170,525 @@ public void testOnCompleteExecutionOnTimeout() { assertEquals(1, delayedShareFetch.expiredRequestMeter().count()); } + @Test + public void testRemoteStorageFetchTryCompleteReturnsFalse() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + + // All the topic partitions are acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp2.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + when(sp2.canAcquireRecords()).thenReturn(true); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + sharePartitions.put(tp2, sp2); + + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + new CompletableFuture<>(), List.of(tp0, tp1, tp2), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + when(sp1.nextFetchOffset()).thenReturn(20L); + when(sp2.nextFetchOffset()).thenReturn(30L); + + // Fetch offset matches with the cached entry for sp0 but not for sp1 and sp2. Hence, a replica manager fetch will happen for sp1 and sp2. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(10, 1, 0))); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + // Mocking local log read result for tp1 and remote storage read result for tp2. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp1), Set.of(tp2))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object does not complete within tryComplete in this mock. + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + when(remoteLogManager.asyncRead(any(), any())).thenReturn(mock(Future.class)); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1, tp2))) + .build()); + + assertFalse(delayedShareFetch.isCompleted()); + assertFalse(delayedShareFetch.tryComplete()); + assertFalse(delayedShareFetch.isCompleted()); + // Remote fetch object gets created for delayed share fetch object. + assertNotNull(delayedShareFetch.remoteFetch()); + // Verify the locks are released for local log read topic partitions tp0 and tp1. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0, tp1)); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchTryCompleteThrowsException() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + + // All the topic partitions are acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + when(sp1.nextFetchOffset()).thenReturn(20L); + + // Fetch offset does not match with the cached entry for sp0 and sp1. Hence, a replica manager fetch will happen for sp0 and sp1. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + // Mocking local log read result for tp0 and remote storage read result for tp1. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0), Set.of(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Exception will be thrown during the creation of remoteFetch object. + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + when(remoteLogManager.asyncRead(any(), any())).thenThrow(new RejectedExecutionException("Exception thrown")); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + BiConsumer<SharePartitionKey, Throwable> exceptionHandler = mockExceptionHandler(); + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withExceptionHandler(exceptionHandler) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1))) + .build()); + + assertFalse(delayedShareFetch.isCompleted()); + // tryComplete returns true and goes to forceComplete once the exception occurs. + assertTrue(delayedShareFetch.tryComplete()); + assertTrue(delayedShareFetch.isCompleted()); + // The future of shareFetch completes. + assertTrue(shareFetch.isCompleted()); + assertFalse(future.isCompletedExceptionally()); + assertEquals(Set.of(tp1), future.join().keySet()); + // Exception occurred and was handled. + Mockito.verify(exceptionHandler, times(1)).accept(any(), any()); + // Verify the locks are released for both local and remote read topic partitions tp0 and tp1 because of exception occurrence. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp1)); + Mockito.verify(delayedShareFetch, times(1)).onComplete(); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchTryCompletionDueToBrokerBecomingOffline() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + + // All the topic partitions are acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp2.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + when(sp2.canAcquireRecords()).thenReturn(true); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + sharePartitions.put(tp2, sp2); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1, tp2), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + when(sp1.nextFetchOffset()).thenReturn(20L); + when(sp2.nextFetchOffset()).thenReturn(30L); + + // Fetch offset matches with the cached entry for sp0 but not for sp1 and sp2. Hence, a replica manager fetch will happen for sp1 and sp2 during tryComplete. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.of(new LogOffsetMetadata(10, 1, 0))); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + + // Mocking local log read result for tp1 and remote storage read result for tp2 on first replicaManager readFromLog call(from tryComplete). + // Mocking local log read result for tp0 and tp1 on second replicaManager readFromLog call(from onComplete). + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp1), Set.of(tp2)) + ).doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, tp1), Set.of()) + ).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object does not complete within tryComplete in this mock but the broker becomes unavailable. + Future<Void> remoteFetchTask = mock(Future.class); + doAnswer(invocation -> { + when(remoteFetchTask.isCancelled()).thenReturn(true); + return false; + }).when(remoteFetchTask).cancel(false); + + when(remoteFetchTask.cancel(false)).thenReturn(true); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + when(remoteLogManager.asyncRead(any(), any())).thenReturn(remoteFetchTask); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + when(replicaManager.getPartitionOrException(tp2.topicPartition())).thenThrow(mock(KafkaStorageException.class)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1, tp2))) + .build()); + + assertFalse(delayedShareFetch.isCompleted()); + assertTrue(delayedShareFetch.tryComplete()); + + assertTrue(delayedShareFetch.isCompleted()); + // Pending remote fetch object gets created for delayed share fetch. + assertNotNull(delayedShareFetch.remoteFetch()); + assertTrue(delayedShareFetch.remoteFetch().remoteFetchTask().isCancelled()); + // Partition locks should be released for all 3 topic partitions + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0, tp1, tp2)); + assertTrue(shareFetch.isCompleted()); + // Share fetch response contained tp0 and tp1 (local fetch) but not tp2, since it errored out. + assertEquals(Set.of(tp0, tp1), future.join().keySet()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchRequestCompletionOnFutureCompletionFailure() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + + // sp0 is acquirable, sp1 is not acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(false); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(false); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + // Fetch offset does not match with the cached entry for sp0. Hence, a replica manager fetch will happen for sp0. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + // Mocking remote storage read result for tp0. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object completes within tryComplete in this mock, hence request will move on to forceComplete. + RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult( + Optional.empty(), + Optional.of(new TimeoutException("Error occurred while creating remote fetch result")) // Remote fetch result is returned with an error. + ); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + doAnswer(invocationOnMock -> { + // Make sure that the callback is called to populate remoteFetchResult for the mock behaviour. + Consumer<RemoteLogReadResult> callback = invocationOnMock.getArgument(1); + callback.accept(remoteFetchResult); + return CompletableFuture.completedFuture(remoteFetchResult); + }).when(remoteLogManager).asyncRead(any(), any()); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1))) + .build()); + + when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + + assertFalse(delayedShareFetch.isCompleted()); + assertTrue(delayedShareFetch.tryComplete()); + + assertTrue(delayedShareFetch.isCompleted()); + // Pending remote fetch object gets created for delayed share fetch. + assertNotNull(delayedShareFetch.remoteFetch()); + // Verify the locks are released for tp0. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); + assertTrue(shareFetch.isCompleted()); + assertEquals(Set.of(tp0), future.join().keySet()); + assertEquals(Errors.REQUEST_TIMED_OUT.code(), future.join().get(tp0).errorCode()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchRequestCompletionOnFutureCompletionSuccessfully() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + + SharePartition sp0 = mock(SharePartition.class); + + // sp0 is acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + // Fetch offset does not match with the cached entry for sp0. Hence, a replica manager fetch will happen for sp0. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + // Mocking remote storage read result for tp0. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), Set.of(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object completes within tryComplete in this mock, hence request will move on to forceComplete. + RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult( + Optional.of(REMOTE_FETCH_INFO), + Optional.empty() // Remote fetch result is returned successfully without error. + ); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + doAnswer(invocationOnMock -> { + // Make sure that the callback is called to populate remoteFetchResult for the mock behaviour. + Consumer<RemoteLogReadResult> callback = invocationOnMock.getArgument(1); + callback.accept(remoteFetchResult); + return CompletableFuture.completedFuture(remoteFetchResult); + }).when(remoteLogManager).asyncRead(any(), any()); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0))) + .build()); + + when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + + assertFalse(delayedShareFetch.isCompleted()); + assertTrue(delayedShareFetch.tryComplete()); + + assertTrue(delayedShareFetch.isCompleted()); + // Pending remote fetch object gets created for delayed share fetch. + assertNotNull(delayedShareFetch.remoteFetch()); + // Verify the locks are released for tp0. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); + assertTrue(shareFetch.isCompleted()); + assertEquals(Set.of(tp0), future.join().keySet()); + assertEquals(Errors.NONE.code(), future.join().get(tp0).errorCode()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchRequestCompletionAlongWithLocalLogRead() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + TopicIdPartition tp2 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 2)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + + // All the topic partitions are acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp2.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + when(sp2.canAcquireRecords()).thenReturn(true); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + sharePartitions.put(tp2, sp2); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1, tp2), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + when(sp1.nextFetchOffset()).thenReturn(20L); + when(sp2.nextFetchOffset()).thenReturn(30L); + + // Fetch offset does not match with the cached entry for sp0, sp1 and sp2. Hence, a replica manager fetch will happen for all of them in tryComplete. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp2.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + when(sp1.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + when(sp2.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + + // Mocking local log read result for tp0, tp1 and remote storage read result for tp2 on first replicaManager readFromLog call(from tryComplete). + // Mocking local log read result for tp0 and tp1 on second replicaManager readFromLog call(from onComplete). + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, tp1), Set.of(tp2)) + ).doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(tp0, tp1), Set.of()) + ).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object completes within tryComplete in this mock, hence request will move on to forceComplete. + RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult( + Optional.of(REMOTE_FETCH_INFO), + Optional.empty() // Remote fetch result is returned successfully without error. + ); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + doAnswer(invocationOnMock -> { + // Make sure that the callback is called to populate remoteFetchResult for the mock behaviour. + Consumer<RemoteLogReadResult> callback = invocationOnMock.getArgument(1); + callback.accept(remoteFetchResult); + return CompletableFuture.completedFuture(remoteFetchResult); + }).when(remoteLogManager).asyncRead(any(), any()); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withReplicaManager(replicaManager) + .withSharePartitions(sharePartitions) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1, tp2))) + .build()); + + assertFalse(delayedShareFetch.isCompleted()); + assertTrue(delayedShareFetch.tryComplete()); + + assertTrue(delayedShareFetch.isCompleted()); + // Pending remote fetch object gets created for delayed share fetch. + assertNotNull(delayedShareFetch.remoteFetch()); + // the future of shareFetch completes. + assertTrue(shareFetch.isCompleted()); + assertEquals(Set.of(tp0, tp1, tp2), future.join().keySet()); + // Verify the locks are released for both local log and remote storage read topic partitions tp0, tp1 and tp2. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0, tp1, tp2)); + assertEquals(Errors.NONE.code(), future.join().get(tp0).errorCode()); + assertEquals(Errors.NONE.code(), future.join().get(tp1).errorCode()); + assertEquals(Errors.NONE.code(), future.join().get(tp2).errorCode()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + + @Test + public void testRemoteStorageFetchOnlyHappensForFirstTopicPartition() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 1)); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + + // sp0 and sp1 are acquirable. + when(sp0.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + + LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + sharePartitions.put(tp1, sp1); + + CompletableFuture<Map<TopicIdPartition, ShareFetchResponseData.PartitionData>> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0, tp1), BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + when(sp0.nextFetchOffset()).thenReturn(10L); + when(sp1.nextFetchOffset()).thenReturn(10L); + // Fetch offset does not match with the cached entry for sp0 and sp1. Hence, a replica manager fetch will happen for both. + when(sp0.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + when(sp1.fetchOffsetMetadata(anyLong())).thenReturn(Optional.empty()); + + LinkedHashSet<TopicIdPartition> remoteStorageFetchPartitions = new LinkedHashSet<>(); + remoteStorageFetchPartitions.add(tp0); + remoteStorageFetchPartitions.add(tp1); + + // Mocking remote storage read result for tp0 and tp1. + doAnswer(invocation -> buildLocalAndRemoteFetchResult(Set.of(), remoteStorageFetchPartitions)).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + // Remote fetch related mocks. Remote fetch object completes within tryComplete in this mock, hence request will move on to forceComplete. + RemoteLogReadResult remoteFetchResult = new RemoteLogReadResult( + Optional.of(REMOTE_FETCH_INFO), + Optional.empty() // Remote fetch result is returned successfully without error. + ); + RemoteLogManager remoteLogManager = mock(RemoteLogManager.class); + doAnswer(invocationOnMock -> { + // Make sure that the callback is called to populate remoteFetchResult for the mock behaviour. + Consumer<RemoteLogReadResult> callback = invocationOnMock.getArgument(1); + callback.accept(remoteFetchResult); + return CompletableFuture.completedFuture(remoteFetchResult); + }).when(remoteLogManager).asyncRead(any(), any()); + when(replicaManager.remoteLogManager()).thenReturn(Option.apply(remoteLogManager)); + + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withSharePartitions(sharePartitions) + .withReplicaManager(replicaManager) + .withPartitionMaxBytesStrategy(mockPartitionMaxBytes(Set.of(tp0, tp1))) + .build()); + + when(sp0.acquire(any(), anyInt(), anyInt(), anyLong(), any(), any())).thenReturn( + createShareAcquiredRecords(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1))); + + assertFalse(delayedShareFetch.isCompleted()); + assertTrue(delayedShareFetch.tryComplete()); + + assertTrue(delayedShareFetch.isCompleted()); + // Pending remote fetch object gets created for delayed share fetch. + assertNotNull(delayedShareFetch.remoteFetch()); + // Verify the locks are released separately for tp1 (from tryComplete). + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp1)); + // From onComplete, the locks will be released for both tp0 and tp1. tp0 because it was acquired from + // tryComplete and has remote fetch processed. tp1 will be reacquired in onComplete when we check for local log read. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0, tp1)); + assertTrue(shareFetch.isCompleted()); + // Share fetch response only contains the first remote storage fetch topic partition - tp0. + assertEquals(Set.of(tp0), future.join().keySet()); + assertEquals(Errors.NONE.code(), future.join().get(tp0).errorCode()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + static void mockTopicIdPartitionToReturnDataEqualToMinBytes(ReplicaManager replicaManager, TopicIdPartition topicIdPartition, int minBytes) { LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1, 1, minBytes); LogOffsetSnapshot endOffsetSnapshot = new LogOffsetSnapshot(1, mock(LogOffsetMetadata.class), @@ -1182,6 +1716,37 @@ private PartitionMaxBytesStrategy mockPartitionMaxBytes(Set<TopicIdPartition> pa return partitionMaxBytesStrategy; } + private Seq<Tuple2<TopicIdPartition, LogReadResult>> buildLocalAndRemoteFetchResult( + Set<TopicIdPartition> localLogReadTopicIdPartitions, + Set<TopicIdPartition> remoteReadTopicIdPartitions) { + List<Tuple2<TopicIdPartition, LogReadResult>> logReadResults = new ArrayList<>(); + localLogReadTopicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult( + new FetchDataInfo(new LogOffsetMetadata(0, 0, 0), MemoryRecords.EMPTY), + Option.empty(), + -1L, + -1L, + -1L, + -1L, + -1L, + Option.empty(), + Option.empty(), + Option.empty() + )))); + remoteReadTopicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult( + REMOTE_FETCH_INFO, + Option.empty(), + -1L, + -1L, + -1L, + -1L, + -1L, + Option.empty(), + Option.empty(), + Option.empty() + )))); + return CollectionConverters.asScala(logReadResults).toSeq(); + } + @SuppressWarnings("unchecked") private static BiConsumer<SharePartitionKey, Throwable> mockExceptionHandler() { return mock(BiConsumer.class); @@ -1194,6 +1759,7 @@ static class DelayedShareFetchBuilder { private LinkedHashMap<TopicIdPartition, SharePartition> sharePartitions = mock(LinkedHashMap.class); private PartitionMaxBytesStrategy partitionMaxBytesStrategy = mock(PartitionMaxBytesStrategy.class); private Time time = new MockTime(); + private final Optional<DelayedShareFetch.RemoteFetch> remoteFetch = Optional.empty(); private ShareGroupMetrics shareGroupMetrics = mock(ShareGroupMetrics.class); DelayedShareFetchBuilder withShareFetchData(ShareFetch shareFetch) { @@ -1243,7 +1809,8 @@ public DelayedShareFetch build() { sharePartitions, partitionMaxBytesStrategy, shareGroupMetrics, - time); + time, + remoteFetch); } } }