diff --git a/checkstyle/import-control-storage.xml b/checkstyle/import-control-storage.xml index 2a0f74126859a..e4194b20e2c99 100644 --- a/checkstyle/import-control-storage.xml +++ b/checkstyle/import-control-storage.xml @@ -84,6 +84,10 @@ + + + + @@ -164,7 +168,6 @@ - diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index 969029a6ea582..7b061a28bd50e 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -30,7 +30,6 @@ import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.utils.Time; import org.apache.kafka.raft.errors.NotLeaderException; -import org.apache.kafka.server.LogReadResult; import org.apache.kafka.server.metrics.KafkaMetricsGroup; import org.apache.kafka.server.purgatory.DelayedOperation; import org.apache.kafka.server.share.SharePartitionKey; @@ -46,6 +45,7 @@ import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.LogOffsetMetadata; import org.apache.kafka.storage.internals.log.LogOffsetSnapshot; +import org.apache.kafka.storage.internals.log.LogReadResult; import org.apache.kafka.storage.internals.log.RemoteLogReadResult; import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo; @@ -852,13 +852,12 @@ private void completeRemoteStorageShareFetchRequest() { if (remoteFetch.remoteFetchResult().isDone()) { RemoteLogReadResult remoteLogReadResult = remoteFetch.remoteFetchResult().get(); if (remoteLogReadResult.error().isPresent()) { - Throwable error = remoteLogReadResult.error().get(); // If there is any error for the remote fetch topic partition, we populate the error accordingly. shareFetchPartitionData.add( new ShareFetchPartitionData( remoteFetch.topicIdPartition(), partitionsAcquired.get(remoteFetch.topicIdPartition()), - ReplicaManager.createLogReadResult(error).toFetchPartitionData(false) + new LogReadResult(Errors.forException(remoteLogReadResult.error().get())).toFetchPartitionData(false) ) ); } else { diff --git a/core/src/main/java/kafka/server/share/PendingRemoteFetches.java b/core/src/main/java/kafka/server/share/PendingRemoteFetches.java index 575a32ef4662e..c25cc70677519 100644 --- a/core/src/main/java/kafka/server/share/PendingRemoteFetches.java +++ b/core/src/main/java/kafka/server/share/PendingRemoteFetches.java @@ -17,8 +17,8 @@ package kafka.server.share; import org.apache.kafka.common.TopicIdPartition; -import org.apache.kafka.server.LogReadResult; import org.apache.kafka.storage.internals.log.LogOffsetMetadata; +import org.apache.kafka.storage.internals.log.LogReadResult; import org.apache.kafka.storage.internals.log.RemoteLogReadResult; import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo; diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala index 91480bb420edd..ebdc000044052 100644 --- a/core/src/main/scala/kafka/server/DelayedFetch.scala +++ b/core/src/main/scala/kafka/server/DelayedFetch.scala @@ -24,25 +24,15 @@ import java.util.concurrent.TimeUnit import org.apache.kafka.common.TopicIdPartition import org.apache.kafka.common.errors._ import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.requests.FetchRequest.PartitionData import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.purgatory.DelayedOperation import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPartitionData} -import org.apache.kafka.storage.internals.log.LogOffsetMetadata +import org.apache.kafka.storage.internals.log.{FetchPartitionStatus, LogOffsetMetadata} import scala.collection._ import scala.jdk.CollectionConverters._ -case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInfo: PartitionData) { - - override def toString: String = { - "[startOffsetMetadata: " + startOffsetMetadata + - ", fetchInfo: " + fetchInfo + - "]" - } -} - /** * A delayed fetch operation that can be created by the replica manager and watched * in the fetch operation purgatory diff --git a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala deleted file mode 100644 index fc2926988c031..0000000000000 --- a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.server - -import com.yammer.metrics.core.Meter -import kafka.utils.Logging -import org.apache.kafka.common.TopicIdPartition -import org.apache.kafka.common.errors._ -import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.server.LogReadResult -import org.apache.kafka.server.metrics.KafkaMetricsGroup -import org.apache.kafka.server.purgatory.DelayedOperation -import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData} -import org.apache.kafka.storage.internals.log.{LogOffsetMetadata, RemoteLogReadResult, RemoteStorageFetchInfo} - -import java.util -import java.util.concurrent.{CompletableFuture, Future, TimeUnit} -import java.util.{Optional, OptionalInt, OptionalLong} -import scala.collection._ - -/** - * A remote fetch operation that can be created by the replica manager and watched - * in the remote fetch operation purgatory - */ -class DelayedRemoteFetch(remoteFetchTasks: util.Map[TopicIdPartition, Future[Void]], - remoteFetchResults: util.Map[TopicIdPartition, CompletableFuture[RemoteLogReadResult]], - remoteFetchInfos: util.Map[TopicIdPartition, RemoteStorageFetchInfo], - remoteFetchMaxWaitMs: Long, - fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)], - fetchParams: FetchParams, - localReadResults: Seq[(TopicIdPartition, LogReadResult)], - replicaManager: ReplicaManager, - responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit) - extends DelayedOperation(remoteFetchMaxWaitMs) with Logging { - - if (fetchParams.isFromFollower) { - throw new IllegalStateException(s"The follower should not invoke remote fetch. Fetch params are: $fetchParams") - } - - /** - * The operation can be completed if: - * - * Case a: This broker is no longer the leader of the partition it tries to fetch - * Case b: This broker does not know the partition it tries to fetch - * Case c: All the remote storage read request completed (succeeded or failed) - * Case d: The partition is in an offline log directory on this broker - * - * Upon completion, should return whatever data is available for each valid partition - */ - override def tryComplete(): Boolean = { - fetchPartitionStatus.foreach { - case (topicPartition, fetchStatus) => - val fetchOffset = fetchStatus.startOffsetMetadata - try { - if (fetchOffset != LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) { - replicaManager.getPartitionOrException(topicPartition.topicPartition()) - } - } catch { - case _: KafkaStorageException => // Case d - debug(s"Partition $topicPartition is in an offline log directory, satisfy $fetchParams immediately") - return forceComplete() - case _: UnknownTopicOrPartitionException => // Case b - debug(s"Broker no longer knows of partition $topicPartition, satisfy $fetchParams immediately") - return forceComplete() - case _: NotLeaderOrFollowerException => // Case a - debug("Broker is no longer the leader or follower of %s, satisfy %s immediately".format(topicPartition, fetchParams)) - return forceComplete() - } - } - // Case c - if (remoteFetchResults.values().stream().allMatch(taskResult => taskResult.isDone)) - forceComplete() - else - false - } - - override def onExpiration(): Unit = { - // cancel the remote storage read task, if it has not been executed yet and - // avoid interrupting the task if it is already running as it may force closing opened/cached resources as transaction index. - remoteFetchTasks.forEach { (topicIdPartition, task) => - if (task != null && !task.isDone) { - if (!task.cancel(false)) { - debug(s"Remote fetch task for remoteFetchInfo: ${remoteFetchInfos.get(topicIdPartition)} could not be cancelled.") - } - } - } - - DelayedRemoteFetchMetrics.expiredRequestMeter.mark() - } - - /** - * Upon completion, read whatever data is available and pass to the complete callback - */ - override def onComplete(): Unit = { - val fetchPartitionData = localReadResults.map { case (tp, result) => - val remoteFetchResult = remoteFetchResults.get(tp) - if (remoteFetchResults.containsKey(tp) - && remoteFetchResult.isDone - && result.error == Errors.NONE - && result.info.delayedRemoteStorageFetch.isPresent) { - if (remoteFetchResult.get.error.isPresent) { - tp -> ReplicaManager.createLogReadResult(remoteFetchResult.get.error.get).toFetchPartitionData(false) - } else { - val info = remoteFetchResult.get.fetchDataInfo.get - tp -> new FetchPartitionData( - result.error, - result.highWatermark, - result.leaderLogStartOffset, - info.records, - Optional.empty(), - if (result.lastStableOffset.isPresent) OptionalLong.of(result.lastStableOffset.getAsLong) else OptionalLong.empty(), - info.abortedTransactions, - if (result.preferredReadReplica.isPresent) OptionalInt.of(result.preferredReadReplica.getAsInt) else OptionalInt.empty(), - false) - } - } else { - tp -> result.toFetchPartitionData(false) - } - } - - responseCallback(fetchPartitionData) - } -} - -object DelayedRemoteFetchMetrics { - // Changing the package or class name may cause incompatibility with existing code and metrics configuration - private val metricsPackage = "kafka.server" - private val metricsClassName = "DelayedRemoteFetchMetrics" - private val metricsGroup = new KafkaMetricsGroup(metricsPackage, metricsClassName) - val expiredRequestMeter: Meter = metricsGroup.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS) -} diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index a4c5416cb3406..1d9d076a317df 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -56,16 +56,16 @@ import org.apache.kafka.server.config.ReplicationConfigs import org.apache.kafka.server.log.remote.storage.RemoteLogManager import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.network.BrokerEndPoint -import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, DelayedRemoteListOffsets, DeleteRecordsPartitionStatus, ListOffsetsPartitionStatus, TopicPartitionOperationKey} +import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, DelayedRemoteFetch, DelayedRemoteListOffsets, DeleteRecordsPartitionStatus, ListOffsetsPartitionStatus, TopicPartitionOperationKey} import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, DelayedShareFetchPartitionKey} import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData} import org.apache.kafka.server.transaction.AddPartitionsToTxnManager import org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation import org.apache.kafka.server.util.timer.{SystemTimer, TimerTask} import org.apache.kafka.server.util.{Scheduler, ShutdownableThread} -import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, LogReadResult, common} +import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, common} import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints} -import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard} +import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchPartitionStatus, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, LogReadResult, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard} import org.apache.kafka.storage.log.metrics.BrokerTopicStats import java.io.File @@ -188,19 +188,7 @@ object ReplicaManager { -1L, -1L, OptionalLong.empty(), - Optional.of(e)) - } - - def createLogReadResult(e: Throwable): LogReadResult = { - new LogReadResult(new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY), - Optional.empty(), - UnifiedLog.UNKNOWN_OFFSET, - UnifiedLog.UNKNOWN_OFFSET, - UnifiedLog.UNKNOWN_OFFSET, - UnifiedLog.UNKNOWN_OFFSET, - -1L, - OptionalLong.empty(), - Optional.of(e)) + Errors.forException(e)); } private[server] def isListOffsetsTimestampUnsupported(timestamp: JLong, version: Short): Boolean = { @@ -1639,7 +1627,7 @@ class ReplicaManager(val config: KafkaConfig, private def processRemoteFetches(remoteFetchInfos: util.LinkedHashMap[TopicIdPartition, RemoteStorageFetchInfo], params: FetchParams, responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit, - logReadResults: Seq[(TopicIdPartition, LogReadResult)], + logReadResults: util.LinkedHashMap[TopicIdPartition, LogReadResult], fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)]): Unit = { val remoteFetchTasks = new util.HashMap[TopicIdPartition, Future[Void]] val remoteFetchResults = new util.HashMap[TopicIdPartition, CompletableFuture[RemoteLogReadResult]] @@ -1651,8 +1639,15 @@ class ReplicaManager(val config: KafkaConfig, } val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong - val remoteFetch = new DelayedRemoteFetch(remoteFetchTasks, remoteFetchResults, remoteFetchInfos, remoteFetchMaxWaitMs, - fetchPartitionStatus, params, logReadResults, this, responseCallback) + val remoteFetch = new DelayedRemoteFetch(remoteFetchTasks, + remoteFetchResults, + remoteFetchInfos, + remoteFetchMaxWaitMs, + fetchPartitionStatus.toMap.asJava, + params, + logReadResults, + tp => getPartitionOrException(tp), + response => responseCallback(response.asScala.toSeq)) // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation val delayedFetchKeys = remoteFetchTasks.asScala.map { case (tp, _) => new TopicPartitionOperationKey(tp) }.toList @@ -1681,7 +1676,7 @@ class ReplicaManager(val config: KafkaConfig, var hasDivergingEpoch = false var hasPreferredReadReplica = false - val logReadResultMap = new mutable.HashMap[TopicIdPartition, LogReadResult] + val logReadResultMap = new util.LinkedHashMap[TopicIdPartition, LogReadResult] logReadResults.foreach { case (topicIdPartition, logReadResult) => brokerTopicStats.topicStats(topicIdPartition.topicPartition.topic).totalFetchRequestRate.mark() @@ -1717,14 +1712,15 @@ class ReplicaManager(val config: KafkaConfig, // construct the fetch results from the read results val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicIdPartition, FetchPartitionStatus)] fetchInfos.foreach { case (topicIdPartition, partitionData) => - logReadResultMap.get(topicIdPartition).foreach(logReadResult => { + val logReadResult = logReadResultMap.get(topicIdPartition) + if (logReadResult != null) { val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata - fetchPartitionStatus += (topicIdPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) - }) + fetchPartitionStatus += (topicIdPartition -> new FetchPartitionStatus(logOffsetMetadata, partitionData)) + } } if (!remoteFetchInfos.isEmpty) { - processRemoteFetches(remoteFetchInfos, params, responseCallback, logReadResults, fetchPartitionStatus.toSeq) + processRemoteFetches(remoteFetchInfos, params, responseCallback, logReadResultMap, fetchPartitionStatus.toSeq) } else { // If there is not enough data to respond and there is no remote data, we will let the fetch request // wait for new data. @@ -1812,7 +1808,7 @@ class ReplicaManager(val config: KafkaConfig, -1L, OptionalLong.of(offsetSnapshot.lastStableOffset.messageOffset), if (preferredReadReplica.isDefined) OptionalInt.of(preferredReadReplica.get) else OptionalInt.empty(), - Optional.empty()) + Errors.NONE) } else { log = partition.localLogWithEpochOrThrow(fetchInfo.currentLeaderEpoch, params.fetchOnlyLeader()) @@ -1836,7 +1832,7 @@ class ReplicaManager(val config: KafkaConfig, fetchTimeMs, OptionalLong.of(readInfo.lastStableOffset), if (preferredReadReplica.isDefined) OptionalInt.of(preferredReadReplica.get) else OptionalInt.empty(), - Optional.empty() + Errors.NONE ) } } catch { @@ -1849,7 +1845,7 @@ class ReplicaManager(val config: KafkaConfig, _: ReplicaNotAvailableException | _: KafkaStorageException | _: InconsistentTopicIdException) => - createLogReadResult(e) + new LogReadResult(Errors.forException(e)) case e: OffsetOutOfRangeException => handleOffsetOutOfRangeError(tp, params, fetchInfo, adjustedMaxBytes, minOneMessage, log, fetchTimeMs, e) case e: Throwable => @@ -1868,7 +1864,7 @@ class ReplicaManager(val config: KafkaConfig, UnifiedLog.UNKNOWN_OFFSET, -1L, OptionalLong.empty(), - Optional.of(e) + Errors.forException(e) ) } } @@ -1949,10 +1945,10 @@ class ReplicaManager(val config: KafkaConfig, fetchInfo.logStartOffset, fetchTimeMs, OptionalLong.of(log.lastStableOffset), - Optional.empty[Throwable]()) + Errors.NONE) } } else { - createLogReadResult(exception) + new LogReadResult(Errors.forException(exception)) } } diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index ffa9f8b11456a..8aab8eb5495c7 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -32,7 +32,6 @@ import org.apache.kafka.common.record.Records; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.server.LogReadResult; import org.apache.kafka.server.log.remote.storage.RemoteLogManager; import org.apache.kafka.server.purgatory.DelayedOperationKey; import org.apache.kafka.server.purgatory.DelayedOperationPurgatory; @@ -52,6 +51,7 @@ import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.LogOffsetMetadata; import org.apache.kafka.storage.internals.log.LogOffsetSnapshot; +import org.apache.kafka.storage.internals.log.LogReadResult; import org.apache.kafka.storage.internals.log.RemoteLogReadResult; import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; @@ -2023,7 +2023,7 @@ public void testRemoteStorageFetchCompletionPostRegisteringCallbackByPendingFetc -1L, OptionalLong.empty(), OptionalInt.empty(), - Optional.empty() + Errors.NONE )); when(pendingRemoteFetches.remoteFetches()).thenReturn(List.of(remoteFetch)); when(pendingRemoteFetches.isDone()).thenReturn(false); @@ -2104,7 +2104,7 @@ public void testRemoteStorageFetchCompletionPostRegisteringCallbackByTimerTaskCo -1L, OptionalLong.empty(), OptionalInt.empty(), - Optional.empty() + Errors.NONE )); when(pendingRemoteFetches.remoteFetches()).thenReturn(List.of(remoteFetch)); when(pendingRemoteFetches.isDone()).thenReturn(false); @@ -2179,7 +2179,7 @@ private Seq> buildLocalAndRemoteFetchRes -1L, OptionalLong.empty(), OptionalInt.empty(), - Optional.empty() + Errors.NONE )))); remoteReadTopicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult( REMOTE_FETCH_INFO, @@ -2191,7 +2191,7 @@ private Seq> buildLocalAndRemoteFetchRes -1L, OptionalLong.empty(), OptionalInt.empty(), - Optional.empty() + Errors.NONE )))); return CollectionConverters.asScala(logReadResults).toSeq(); } diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index 24a84bab64a9b..541719aea0678 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -51,7 +51,6 @@ import org.apache.kafka.common.utils.ImplicitLinkedHashCollection; import org.apache.kafka.common.utils.Time; import org.apache.kafka.coordinator.group.GroupConfigManager; -import org.apache.kafka.server.LogReadResult; import org.apache.kafka.server.common.ShareVersion; import org.apache.kafka.server.purgatory.DelayedOperationKey; import org.apache.kafka.server.purgatory.DelayedOperationPurgatory; @@ -84,6 +83,7 @@ import org.apache.kafka.server.util.timer.TimerTask; import org.apache.kafka.storage.internals.log.FetchDataInfo; import org.apache.kafka.storage.internals.log.LogOffsetMetadata; +import org.apache.kafka.storage.internals.log.LogReadResult; import org.apache.kafka.storage.internals.log.OffsetResultHolder; import org.apache.kafka.storage.log.metrics.BrokerTopicMetrics; import org.apache.kafka.storage.log.metrics.BrokerTopicStats; @@ -3226,7 +3226,7 @@ Compression.NONE, new SimpleRecord("test-key".getBytes(), "test-value".getBytes( -1L, OptionalLong.empty(), OptionalInt.empty(), - Optional.empty() + Errors.NONE )))); return CollectionConverters.asScala(logReadResults).toSeq(); } diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala index f10beb0086fa8..fa3b8465d651f 100644 --- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala +++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala @@ -25,9 +25,8 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEnd import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.MemoryRecords import org.apache.kafka.common.requests.FetchRequest -import org.apache.kafka.server.LogReadResult import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPartitionData} -import org.apache.kafka.storage.internals.log.{FetchDataInfo, LogOffsetMetadata, LogOffsetSnapshot} +import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchPartitionStatus, LogOffsetMetadata, LogOffsetSnapshot, LogReadResult} import org.junit.jupiter.api.Test import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.params.ParameterizedTest @@ -48,9 +47,9 @@ class DelayedFetchTest { val currentLeaderEpoch = Optional.of[Integer](10) val replicaId = 1 - val fetchStatus = FetchPartitionStatus( - startOffsetMetadata = new LogOffsetMetadata(fetchOffset), - fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId(), fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) + val fetchStatus = new FetchPartitionStatus( + new LogOffsetMetadata(fetchOffset), + new FetchRequest.PartitionData(topicIdPartition.topicId(), fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500) var fetchResultOpt: Option[FetchPartitionData] = None @@ -94,9 +93,9 @@ class DelayedFetchTest { val currentLeaderEpoch = Optional.of[Integer](10) val replicaId = 1 - val fetchStatus = FetchPartitionStatus( - startOffsetMetadata = new LogOffsetMetadata(fetchOffset), - fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId(), fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) + val fetchStatus = new FetchPartitionStatus( + new LogOffsetMetadata(fetchOffset), + new FetchRequest.PartitionData(topicIdPartition.topicId(), fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500) var fetchResultOpt: Option[FetchPartitionData] = None @@ -134,9 +133,9 @@ class DelayedFetchTest { val lastFetchedEpoch = Optional.of[Integer](9) val replicaId = 1 - val fetchStatus = FetchPartitionStatus( - startOffsetMetadata = new LogOffsetMetadata(fetchOffset), - fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, lastFetchedEpoch)) + val fetchStatus = new FetchPartitionStatus( + new LogOffsetMetadata(fetchOffset), + new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch, lastFetchedEpoch)) val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500) var fetchResultOpt: Option[FetchPartitionData] = None @@ -185,9 +184,9 @@ class DelayedFetchTest { val currentLeaderEpoch = Optional.of[Integer](10) val replicaId = 1 - val fetchStatus = FetchPartitionStatus( - startOffsetMetadata = new LogOffsetMetadata(fetchOffset), - fetchInfo = new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) + val fetchStatus = new FetchPartitionStatus( + new LogOffsetMetadata(fetchOffset), + new FetchRequest.PartitionData(topicIdPartition.topicId, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500) var fetchResultOpt: Option[FetchPartitionData] = None @@ -265,7 +264,7 @@ class DelayedFetchTest { -1L, -1L, OptionalLong.empty(), - if (error != Errors.NONE) Optional.of[Throwable](error.exception) else Optional.empty[Throwable]()) + error) } } diff --git a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala deleted file mode 100644 index 23b4b32b0d744..0000000000000 --- a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala +++ /dev/null @@ -1,501 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.server - -import com.yammer.metrics.core.Meter -import kafka.cluster.Partition -import org.apache.kafka.common.errors.NotLeaderOrFollowerException -import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.record.MemoryRecords -import org.apache.kafka.common.requests.FetchRequest -import org.apache.kafka.common.{TopicIdPartition, Uuid} -import org.apache.kafka.server.LogReadResult -import org.apache.kafka.server.metrics.KafkaYammerMetrics -import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPartitionData} -import org.apache.kafka.storage.internals.log._ -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.Test -import org.mockito.ArgumentMatchers.anyBoolean -import org.mockito.Mockito.{mock, never, verify, when} - -import java.util.{Collections, Optional, OptionalLong} -import java.util.concurrent.{CompletableFuture, Future} -import scala.collection._ -import scala.jdk.CollectionConverters._ - -class DelayedRemoteFetchTest { - private val maxBytes = 1024 - private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager]) - private val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic") - private val topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), 0, "topic2") - private val fetchOffset = 500L - private val logStartOffset = 0L - private val currentLeaderEpoch = Optional.of[Integer](10) - private val remoteFetchMaxWaitMs = 500 - - private val fetchStatus = FetchPartitionStatus( - startOffsetMetadata = new LogOffsetMetadata(fetchOffset), - fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) - private val fetchParams = buildFetchParams(replicaId = -1, maxWaitMs = 500) - - @Test - def testFetch(): Unit = { - var actualTopicPartition: Option[TopicIdPartition] = None - var fetchResultOpt: Option[FetchPartitionData] = None - - def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { - assertEquals(1, responses.size) - actualTopicPartition = Some(responses.head._1) - fetchResultOpt = Some(responses.head._2) - } - - val future: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]() - future.complete(buildRemoteReadResult(Errors.NONE)) - val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null) - val highWatermark = 100 - val leaderLogStartOffset = 10 - val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset) - - val delayedRemoteFetch = new DelayedRemoteFetch( - java.util.Collections.emptyMap[TopicIdPartition, Future[Void]](), - java.util.Collections.singletonMap(topicIdPartition, future), - java.util.Collections.singletonMap(topicIdPartition, fetchInfo), - remoteFetchMaxWaitMs, - Seq(topicIdPartition -> fetchStatus), - fetchParams, - Seq(topicIdPartition -> logReadInfo), - replicaManager, - callback) - - when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)) - .thenReturn(mock(classOf[Partition])) - - assertTrue(delayedRemoteFetch.tryComplete()) - assertTrue(delayedRemoteFetch.isCompleted) - assertTrue(actualTopicPartition.isDefined) - assertEquals(topicIdPartition, actualTopicPartition.get) - assertTrue(fetchResultOpt.isDefined) - - val fetchResult = fetchResultOpt.get - assertEquals(Errors.NONE, fetchResult.error) - assertEquals(highWatermark, fetchResult.highWatermark) - assertEquals(leaderLogStartOffset, fetchResult.logStartOffset) - } - - @Test - def testFollowerFetch(): Unit = { - var actualTopicPartition: Option[TopicIdPartition] = None - var fetchResultOpt: Option[FetchPartitionData] = None - - def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { - assertEquals(1, responses.size) - actualTopicPartition = Some(responses.head._1) - fetchResultOpt = Some(responses.head._2) - } - - val future: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]() - future.complete(buildRemoteReadResult(Errors.NONE)) - val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null) - val highWatermark = 100 - val leaderLogStartOffset = 10 - val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset) - val fetchParams = buildFetchParams(replicaId = 1, maxWaitMs = 500) - - assertThrows(classOf[IllegalStateException], () => new DelayedRemoteFetch( - java.util.Collections.emptyMap[TopicIdPartition, Future[Void]](), - java.util.Collections.singletonMap(topicIdPartition, future), - java.util.Collections.singletonMap(topicIdPartition, fetchInfo), - remoteFetchMaxWaitMs, - Seq(topicIdPartition -> fetchStatus), - fetchParams, - Seq(topicIdPartition -> logReadInfo), - replicaManager, - callback)) - } - - @Test - def testNotLeaderOrFollower(): Unit = { - var actualTopicPartition: Option[TopicIdPartition] = None - var fetchResultOpt: Option[FetchPartitionData] = None - - def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { - assertEquals(1, responses.size) - actualTopicPartition = Some(responses.head._1) - fetchResultOpt = Some(responses.head._2) - } - - // throw exception while getPartition - when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)) - .thenThrow(new NotLeaderOrFollowerException(s"Replica for $topicIdPartition not available")) - - val future: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]() - val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null) - - val logReadInfo = buildReadResult(Errors.NONE) - - val delayedRemoteFetch = new DelayedRemoteFetch( - java.util.Collections.emptyMap[TopicIdPartition, Future[Void]](), - java.util.Collections.singletonMap(topicIdPartition, future), - java.util.Collections.singletonMap(topicIdPartition, fetchInfo), - remoteFetchMaxWaitMs, - Seq(topicIdPartition -> fetchStatus), - fetchParams, - Seq(topicIdPartition -> logReadInfo), - replicaManager, - callback) - - // delayed remote fetch should still be able to complete - assertTrue(delayedRemoteFetch.tryComplete()) - assertTrue(delayedRemoteFetch.isCompleted) - assertEquals(topicIdPartition, actualTopicPartition.get) - assertTrue(fetchResultOpt.isDefined) - } - - @Test - def testErrorLogReadInfo(): Unit = { - var actualTopicPartition: Option[TopicIdPartition] = None - var fetchResultOpt: Option[FetchPartitionData] = None - - def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { - assertEquals(1, responses.size) - actualTopicPartition = Some(responses.head._1) - fetchResultOpt = Some(responses.head._2) - } - - when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)) - .thenReturn(mock(classOf[Partition])) - - val future: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]() - future.complete(buildRemoteReadResult(Errors.NONE)) - val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null) - - // build a read result with error - val logReadInfo = buildReadResult(Errors.FENCED_LEADER_EPOCH) - - val delayedRemoteFetch = new DelayedRemoteFetch( - java.util.Collections.emptyMap[TopicIdPartition, Future[Void]](), - java.util.Collections.singletonMap(topicIdPartition, future), - java.util.Collections.singletonMap(topicIdPartition, fetchInfo), - remoteFetchMaxWaitMs, - Seq(topicIdPartition -> fetchStatus), - fetchParams, - Seq(topicIdPartition -> logReadInfo), - replicaManager, - callback) - - assertTrue(delayedRemoteFetch.tryComplete()) - assertTrue(delayedRemoteFetch.isCompleted) - assertEquals(topicIdPartition, actualTopicPartition.get) - assertTrue(fetchResultOpt.isDefined) - assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResultOpt.get.error) - } - - @Test - def testRequestExpiry(): Unit = { - val responses = mutable.Map[TopicIdPartition, FetchPartitionData]() - - def callback(responseSeq: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { - responseSeq.foreach { case (tp, data) => - responses.put(tp, data) - } - } - - def expiresPerSecValue(): Double = { - val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala - val metric = allMetrics.find { case (n, _) => n.getMBeanName.endsWith("kafka.server:type=DelayedRemoteFetchMetrics,name=ExpiresPerSec") } - - if (metric.isEmpty) - 0 - else - metric.get._2.asInstanceOf[Meter].count - } - - val remoteFetchTaskExpired = mock(classOf[Future[Void]]) - val remoteFetchTask2 = mock(classOf[Future[Void]]) - // complete the 2nd task, and keep the 1st one expired - when(remoteFetchTask2.isDone).thenReturn(true) - - // Create futures - one completed, one not - val future1: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]() - val future2: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]() - // Only complete one remote fetch - future2.complete(buildRemoteReadResult(Errors.NONE)) - - val fetchInfo1 = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null) - val fetchInfo2 = new RemoteStorageFetchInfo(0, false, topicIdPartition2, null, null) - - val highWatermark = 100 - val leaderLogStartOffset = 10 - - val logReadInfo1 = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset) - val logReadInfo2 = buildReadResult(Errors.NONE) - - val fetchStatus1 = FetchPartitionStatus( - startOffsetMetadata = new LogOffsetMetadata(fetchOffset), - fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) - val fetchStatus2 = FetchPartitionStatus( - startOffsetMetadata = new LogOffsetMetadata(fetchOffset + 100), - fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset + 100, logStartOffset, maxBytes, currentLeaderEpoch)) - - // Set up maps for multiple partitions - val remoteFetchTasks = new java.util.HashMap[TopicIdPartition, Future[Void]]() - val remoteFetchResults = new java.util.HashMap[TopicIdPartition, CompletableFuture[RemoteLogReadResult]]() - val remoteFetchInfos = new java.util.HashMap[TopicIdPartition, RemoteStorageFetchInfo]() - - remoteFetchTasks.put(topicIdPartition, remoteFetchTaskExpired) - remoteFetchTasks.put(topicIdPartition2, remoteFetchTask2) - remoteFetchResults.put(topicIdPartition, future1) - remoteFetchResults.put(topicIdPartition2, future2) - remoteFetchInfos.put(topicIdPartition, fetchInfo1) - remoteFetchInfos.put(topicIdPartition2, fetchInfo2) - - val delayedRemoteFetch = new DelayedRemoteFetch( - remoteFetchTasks, - remoteFetchResults, - remoteFetchInfos, - remoteFetchMaxWaitMs, - Seq(topicIdPartition -> fetchStatus1, topicIdPartition2 -> fetchStatus2), - fetchParams, - Seq(topicIdPartition -> logReadInfo1, topicIdPartition2 -> logReadInfo2), - replicaManager, - callback) - - when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)) - .thenReturn(mock(classOf[Partition])) - when(replicaManager.getPartitionOrException(topicIdPartition2.topicPartition)) - .thenReturn(mock(classOf[Partition])) - - // Verify that the ExpiresPerSec metric is zero before fetching - val existingMetricVal = expiresPerSecValue() - // Verify the delayedRemoteFetch is not completed yet - assertFalse(delayedRemoteFetch.isCompleted) - - // Force the delayed remote fetch to expire - delayedRemoteFetch.run() - - // Check that the expired task was cancelled and force-completed - verify(remoteFetchTaskExpired).cancel(anyBoolean()) - verify(remoteFetchTask2, never()).cancel(anyBoolean()) - assertTrue(delayedRemoteFetch.isCompleted) - - // Check that the ExpiresPerSec metric was incremented - assertTrue(expiresPerSecValue() > existingMetricVal) - - // Fetch results should include 2 results and the expired one should return local read results - assertEquals(2, responses.size) - assertTrue(responses.contains(topicIdPartition)) - assertTrue(responses.contains(topicIdPartition2)) - - assertEquals(Errors.NONE, responses(topicIdPartition).error) - assertEquals(highWatermark, responses(topicIdPartition).highWatermark) - assertEquals(leaderLogStartOffset, responses(topicIdPartition).logStartOffset) - - assertEquals(Errors.NONE, responses(topicIdPartition2).error) - } - - @Test - def testMultiplePartitions(): Unit = { - val responses = mutable.Map[TopicIdPartition, FetchPartitionData]() - - def callback(responseSeq: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { - responseSeq.foreach { case (tp, data) => - responses.put(tp, data) - } - } - - // Create futures - one completed, one not - val future1: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]() - val future2: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]() - // Only complete one remote fetch - future1.complete(buildRemoteReadResult(Errors.NONE)) - - val fetchInfo1 = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null) - val fetchInfo2 = new RemoteStorageFetchInfo(0, false, topicIdPartition2, null, null) - - val highWatermark1 = 100 - val leaderLogStartOffset1 = 10 - val highWatermark2 = 200 - val leaderLogStartOffset2 = 20 - - val logReadInfo1 = buildReadResult(Errors.NONE, 100, 10) - val logReadInfo2 = buildReadResult(Errors.NONE, 200, 20) - - val fetchStatus1 = FetchPartitionStatus( - startOffsetMetadata = new LogOffsetMetadata(fetchOffset), - fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) - val fetchStatus2 = FetchPartitionStatus( - startOffsetMetadata = new LogOffsetMetadata(fetchOffset + 100), - fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset + 100, logStartOffset, maxBytes, currentLeaderEpoch)) - - // Set up maps for multiple partitions - val remoteFetchResults = new java.util.HashMap[TopicIdPartition, CompletableFuture[RemoteLogReadResult]]() - val remoteFetchInfos = new java.util.HashMap[TopicIdPartition, RemoteStorageFetchInfo]() - - remoteFetchResults.put(topicIdPartition, future1) - remoteFetchResults.put(topicIdPartition2, future2) - remoteFetchInfos.put(topicIdPartition, fetchInfo1) - remoteFetchInfos.put(topicIdPartition2, fetchInfo2) - - val delayedRemoteFetch = new DelayedRemoteFetch( - Collections.emptyMap[TopicIdPartition, Future[Void]](), - remoteFetchResults, - remoteFetchInfos, - remoteFetchMaxWaitMs, - Seq(topicIdPartition -> fetchStatus1, topicIdPartition2 -> fetchStatus2), - fetchParams, - Seq(topicIdPartition -> logReadInfo1, topicIdPartition2 -> logReadInfo2), - replicaManager, - callback) - - when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)) - .thenReturn(mock(classOf[Partition])) - when(replicaManager.getPartitionOrException(topicIdPartition2.topicPartition)) - .thenReturn(mock(classOf[Partition])) - - // Should not complete since future2 is not done - assertFalse(delayedRemoteFetch.tryComplete()) - assertFalse(delayedRemoteFetch.isCompleted) - - // Complete future2 - future2.complete(buildRemoteReadResult(Errors.NONE)) - - // Now it should complete - assertTrue(delayedRemoteFetch.tryComplete()) - assertTrue(delayedRemoteFetch.isCompleted) - - // Verify both partitions were processed without error - assertEquals(2, responses.size) - assertTrue(responses.contains(topicIdPartition)) - assertTrue(responses.contains(topicIdPartition2)) - - assertEquals(Errors.NONE, responses(topicIdPartition).error) - assertEquals(highWatermark1, responses(topicIdPartition).highWatermark) - assertEquals(leaderLogStartOffset1, responses(topicIdPartition).logStartOffset) - - assertEquals(Errors.NONE, responses(topicIdPartition2).error) - assertEquals(highWatermark2, responses(topicIdPartition2).highWatermark) - assertEquals(leaderLogStartOffset2, responses(topicIdPartition2).logStartOffset) - } - - @Test - def testMultiplePartitionsWithFailedResults(): Unit = { - val responses = mutable.Map[TopicIdPartition, FetchPartitionData]() - - def callback(responseSeq: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = { - responseSeq.foreach { case (tp, data) => - responses.put(tp, data) - } - } - - // Create futures - one successful, one with error - val future1: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]() - val future2: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]() - - // Created 1 successful result and 1 failed result - future1.complete(buildRemoteReadResult(Errors.NONE)) - future2.complete(buildRemoteReadResult(Errors.UNKNOWN_SERVER_ERROR)) - - val fetchInfo1 = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null) - val fetchInfo2 = new RemoteStorageFetchInfo(0, false, topicIdPartition2, null, null) - - val logReadInfo1 = buildReadResult(Errors.NONE, 100, 10) - val logReadInfo2 = buildReadResult(Errors.NONE, 200, 20) - - val fetchStatus1 = FetchPartitionStatus( - startOffsetMetadata = new LogOffsetMetadata(fetchOffset), - fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)) - val fetchStatus2 = FetchPartitionStatus( - startOffsetMetadata = new LogOffsetMetadata(fetchOffset + 100), - fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset + 100, logStartOffset, maxBytes, currentLeaderEpoch)) - - // Set up maps for multiple partitions - val remoteFetchResults = new java.util.HashMap[TopicIdPartition, CompletableFuture[RemoteLogReadResult]]() - val remoteFetchInfos = new java.util.HashMap[TopicIdPartition, RemoteStorageFetchInfo]() - - remoteFetchResults.put(topicIdPartition, future1) - remoteFetchResults.put(topicIdPartition2, future2) - remoteFetchInfos.put(topicIdPartition, fetchInfo1) - remoteFetchInfos.put(topicIdPartition2, fetchInfo2) - - val delayedRemoteFetch = new DelayedRemoteFetch( - Collections.emptyMap[TopicIdPartition, Future[Void]](), - remoteFetchResults, - remoteFetchInfos, - remoteFetchMaxWaitMs, - Seq(topicIdPartition -> fetchStatus1, topicIdPartition2 -> fetchStatus2), - fetchParams, - Seq(topicIdPartition -> logReadInfo1, topicIdPartition2 -> logReadInfo2), - replicaManager, - callback) - - when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition)) - .thenReturn(mock(classOf[Partition])) - when(replicaManager.getPartitionOrException(topicIdPartition2.topicPartition)) - .thenReturn(mock(classOf[Partition])) - - assertTrue(delayedRemoteFetch.tryComplete()) - assertTrue(delayedRemoteFetch.isCompleted) - - // Verify both partitions were processed - assertEquals(2, responses.size) - assertTrue(responses.contains(topicIdPartition)) - assertTrue(responses.contains(topicIdPartition2)) - - // First partition should be successful - val fetchResult1 = responses(topicIdPartition) - assertEquals(Errors.NONE, fetchResult1.error) - - // Second partition should have an error due to remote fetch failure - val fetchResult2 = responses(topicIdPartition2) - assertEquals(Errors.UNKNOWN_SERVER_ERROR, fetchResult2.error) - } - - private def buildFetchParams(replicaId: Int, - maxWaitMs: Int): FetchParams = { - new FetchParams( - replicaId, - 1, - maxWaitMs, - 1, - maxBytes, - FetchIsolation.LOG_END, - Optional.empty() - ) - } - - private def buildReadResult(error: Errors, - highWatermark: Int = 0, - leaderLogStartOffset: Int = 0): LogReadResult = { - new LogReadResult( - new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY, false, Optional.empty(), - Optional.of(mock(classOf[RemoteStorageFetchInfo]))), - Optional.empty(), - highWatermark, - leaderLogStartOffset, - -1L, - -1L, - -1L, - OptionalLong.empty(), - if (error != Errors.NONE) Optional.of[Throwable](error.exception) else Optional.empty[Throwable]()) - } - - private def buildRemoteReadResult(error: Errors): RemoteLogReadResult = { - new RemoteLogReadResult( - Optional.of(new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY)), - if (error != Errors.NONE) Optional.of[Throwable](error.exception) else Optional.empty[Throwable]()) - } -} diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala index 8f10811091d70..366645344ae1b 100644 --- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala @@ -33,7 +33,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.metadata.MetadataCache import org.apache.kafka.server.common.RequestLocal -import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, DelayedRemoteListOffsets, TopicPartitionOperationKey} +import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, DelayedRemoteFetch, DelayedRemoteListOffsets, TopicPartitionOperationKey} import org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation import org.apache.kafka.server.util.timer.{MockTimer, Timer} import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler} diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala index a7948ae901f14..307afad4f5f45 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala @@ -33,7 +33,7 @@ import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.server.common.KRaftVersion import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams} import org.apache.kafka.server.util.{KafkaScheduler, MockTime} -import org.apache.kafka.storage.internals.log.{FetchDataInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogOffsetSnapshot, UnifiedLog} +import org.apache.kafka.storage.internals.log.{FetchDataInfo, FetchPartitionStatus, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogOffsetSnapshot, UnifiedLog} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, Test} import org.mockito.ArgumentMatchers.{any, anyBoolean, anyInt, anyLong} @@ -171,7 +171,7 @@ class ReplicaManagerQuotasTest { when(partition.getReplica(1)).thenReturn(None) val tp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("t1", 0)) - val fetchPartitionStatus = FetchPartitionStatus( + val fetchPartitionStatus = new FetchPartitionStatus( new LogOffsetMetadata(50L, 0L, 250), new PartitionData(Uuid.ZERO_UUID, 50, 0, 1, Optional.empty())) val fetchParams = new FetchParams( @@ -222,7 +222,7 @@ class ReplicaManagerQuotasTest { .thenReturn(partition) val tidp = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("t1", 0)) - val fetchPartitionStatus = FetchPartitionStatus( + val fetchPartitionStatus = new FetchPartitionStatus( new LogOffsetMetadata(50L, 0L, 250), new PartitionData(Uuid.ZERO_UUID, 50, 0, 1, Optional.empty())) val fetchParams = new FetchParams( diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index fa43b8bc1a4c1..d0ebe4b202580 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -63,8 +63,8 @@ import org.apache.kafka.server.log.remote.TopicPartitionLog import org.apache.kafka.server.log.remote.storage._ import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics} import org.apache.kafka.server.network.BrokerEndPoint -import org.apache.kafka.server.{LogReadResult, PartitionFetchState} -import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, DelayedRemoteListOffsets} +import org.apache.kafka.server.PartitionFetchState +import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, DelayedRemoteFetch, DelayedRemoteListOffsets} import org.apache.kafka.server.share.SharePartitionKey import org.apache.kafka.server.share.fetch.{DelayedShareFetchGroupKey, DelayedShareFetchKey, ShareFetch} import org.apache.kafka.server.share.metrics.ShareGroupMetrics @@ -76,7 +76,7 @@ import org.apache.kafka.server.util.timer.{MockTimer, SystemTimer} import org.apache.kafka.server.util.{MockScheduler, MockTime, Scheduler} import org.apache.kafka.storage.internals.checkpoint.LazyOffsetCheckpoints import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache -import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, FetchDataInfo, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegments, ProducerStateManager, ProducerStateManagerConfig, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard} +import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, FetchDataInfo, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadResult, LogSegments, ProducerStateManager, ProducerStateManagerConfig, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard} import org.apache.kafka.storage.log.metrics.BrokerTopicStats import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeEach, Test} @@ -3575,13 +3575,13 @@ class ReplicaManagerTest { mock(classOf[FetchDataInfo]) }).when(spyRLM).read(any()) - val curExpiresPerSec = DelayedRemoteFetchMetrics.expiredRequestMeter.count() + val curExpiresPerSec = DelayedRemoteFetch.expiredRequestCount() replicaManager.fetchMessages(params, Seq(tidp0 -> new PartitionData(topicId, fetchOffset, 0, 100000, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), UNBOUNDED_QUOTA, fetchCallback) // advancing the clock to expire the delayed remote fetch timer.advanceClock(2000L) // verify the DelayedRemoteFetchMetrics.expiredRequestMeter.mark is called since the delayed remote fetch is expired - TestUtils.waitUntilTrue(() => (curExpiresPerSec + 1) == DelayedRemoteFetchMetrics.expiredRequestMeter.count(), "DelayedRemoteFetchMetrics.expiredRequestMeter.count() should be 1, but got: " + DelayedRemoteFetchMetrics.expiredRequestMeter.count(), 10000L) + TestUtils.waitUntilTrue(() => (curExpiresPerSec + 1) == DelayedRemoteFetch.expiredRequestCount(), "DelayedRemoteFetchMetrics.expiredRequestMeter.count() should be 1, but got: " + DelayedRemoteFetch.expiredRequestCount(), 10000L) latch.countDown() } finally { Utils.tryAll(util.Arrays.asList[Callable[Void]]( diff --git a/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteFetch.java b/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteFetch.java new file mode 100644 index 0000000000000..8c872811eb671 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/server/purgatory/DelayedRemoteFetch.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.purgatory; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.KafkaStorageException; +import org.apache.kafka.common.errors.NotLeaderOrFollowerException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.server.metrics.KafkaMetricsGroup; +import org.apache.kafka.server.storage.log.FetchParams; +import org.apache.kafka.server.storage.log.FetchPartitionData; +import org.apache.kafka.storage.internals.log.FetchDataInfo; +import org.apache.kafka.storage.internals.log.FetchPartitionStatus; +import org.apache.kafka.storage.internals.log.LogOffsetMetadata; +import org.apache.kafka.storage.internals.log.LogReadResult; +import org.apache.kafka.storage.internals.log.RemoteLogReadResult; +import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo; + +import com.yammer.metrics.core.Meter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +/** + * A remote fetch operation that can be created by the replica manager and watched + * in the remote fetch operation purgatory + */ +public class DelayedRemoteFetch extends DelayedOperation { + + private static final Logger LOG = LoggerFactory.getLogger(DelayedRemoteFetch.class); + + // For compatibility, metrics are defined to be under `kafka.server.DelayedRemoteFetchMetrics` class + private static final KafkaMetricsGroup METRICS_GROUP = new KafkaMetricsGroup("kafka.server", "DelayedRemoteFetchMetrics"); + + private static final Meter EXPIRED_REQUEST_METER = METRICS_GROUP.newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS); + + private final Map> remoteFetchTasks; + private final Map> remoteFetchResults; + private final Map remoteFetchInfos; + private final Map fetchPartitionStatus; + private final FetchParams fetchParams; + private final Map localReadResults; + private final Consumer partitionOrException; + private final Consumer> responseCallback; + + public DelayedRemoteFetch(Map> remoteFetchTasks, + Map> remoteFetchResults, + Map remoteFetchInfos, + long remoteFetchMaxWaitMs, + Map fetchPartitionStatus, + FetchParams fetchParams, + Map localReadResults, + Consumer partitionOrException, + Consumer> responseCallback) { + super(remoteFetchMaxWaitMs); + this.remoteFetchTasks = remoteFetchTasks; + this.remoteFetchResults = remoteFetchResults; + this.remoteFetchInfos = remoteFetchInfos; + this.fetchPartitionStatus = fetchPartitionStatus; + this.fetchParams = fetchParams; + this.localReadResults = localReadResults; + this.partitionOrException = partitionOrException; + this.responseCallback = responseCallback; + + if (fetchParams.isFromFollower()) { + throw new IllegalStateException("The follower should not invoke remote fetch. Fetch params are: " + fetchParams); + } + } + + /** + * The operation can be completed if: + *

+ * Case a: This broker is no longer the leader of the partition it tries to fetch + *

+ * Case b: This broker does not know the partition it tries to fetch + *

+ * Case c: All the remote storage read requests completed (succeeded or failed) + *

+ * Case d: The partition is in an offline log directory on this broker + * + * Upon completion, should return whatever data is available for each valid partition + */ + @Override + public boolean tryComplete() { + for (Map.Entry entry : fetchPartitionStatus.entrySet()) { + TopicIdPartition topicPartition = entry.getKey(); + FetchPartitionStatus fetchStatus = entry.getValue(); + LogOffsetMetadata fetchOffset = fetchStatus.startOffsetMetadata(); + try { + if (!fetchOffset.equals(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA)) { + partitionOrException.accept(topicPartition.topicPartition()); + } + } catch (KafkaStorageException e) { // Case d + LOG.debug("Partition {} is in an offline log directory, satisfy {} immediately.", topicPartition, fetchParams); + return forceComplete(); + } catch (UnknownTopicOrPartitionException e) { // Case b + LOG.debug("Broker no longer knows of partition {}, satisfy {} immediately", topicPartition, fetchParams); + return forceComplete(); + } catch (NotLeaderOrFollowerException e) { // Case a + LOG.debug("Broker is no longer the leader or follower of {}, satisfy {} immediately", topicPartition, fetchParams); + return forceComplete(); + } + } + + // Case c + if (remoteFetchResults.values().stream().allMatch(CompletableFuture::isDone)) { + return forceComplete(); + } + return false; + } + + @Override + public void onExpiration() { + // cancel the remote storage read task, if it has not been executed yet and + // avoid interrupting the task if it is already running as it may force closing opened/cached resources as transaction index. + remoteFetchTasks.forEach((topicIdPartition, task) -> { + if (task != null && !task.isDone() && !task.cancel(false)) { + LOG.debug("Remote fetch task for remoteFetchInfo: {} could not be cancelled.", remoteFetchInfos.get(topicIdPartition)); + } + }); + + EXPIRED_REQUEST_METER.mark(); + } + + /** + * Upon completion, read whatever data is available and pass to the complete callback + */ + @Override + public void onComplete() { + Map fetchPartitionData = new LinkedHashMap<>(); + localReadResults.forEach((tpId, result) -> { + CompletableFuture remoteFetchResult = remoteFetchResults.get(tpId); + if (remoteFetchResults.containsKey(tpId) + && remoteFetchResult.isDone() + && result.error() == Errors.NONE + && result.info().delayedRemoteStorageFetch.isPresent()) { + + if (remoteFetchResult.join().error().isPresent()) { + fetchPartitionData.put(tpId, + new LogReadResult(Errors.forException(remoteFetchResult.join().error().get())).toFetchPartitionData(false)); + } else { + FetchDataInfo info = remoteFetchResult.join().fetchDataInfo().get(); + fetchPartitionData.put(tpId, + new FetchPartitionData( + result.error(), + result.highWatermark(), + result.leaderLogStartOffset(), + info.records, + Optional.empty(), + result.lastStableOffset(), + info.abortedTransactions, + result.preferredReadReplica(), + false)); + } + } else { + fetchPartitionData.put(tpId, result.toFetchPartitionData(false)); + } + }); + + responseCallback.accept(fetchPartitionData); + } + + // Visible for testing + public static long expiredRequestCount() { + return EXPIRED_REQUEST_METER.count(); + } +} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchPartitionStatus.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchPartitionStatus.java new file mode 100644 index 0000000000000..5a060013b5e75 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchPartitionStatus.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.storage.internals.log; + +import org.apache.kafka.common.requests.FetchRequest.PartitionData; + +/** + * A class containing log offset metadata and fetch info for a topic partition. + */ +public record FetchPartitionStatus( + LogOffsetMetadata startOffsetMetadata, + PartitionData fetchInfo +) { +} diff --git a/server/src/main/java/org/apache/kafka/server/LogReadResult.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogReadResult.java similarity index 59% rename from server/src/main/java/org/apache/kafka/server/LogReadResult.java rename to storage/src/main/java/org/apache/kafka/storage/internals/log/LogReadResult.java index 203654391af29..057a66a25c10a 100644 --- a/server/src/main/java/org/apache/kafka/server/LogReadResult.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogReadResult.java @@ -14,12 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.server; +package org.apache.kafka.storage.internals.log; import org.apache.kafka.common.message.FetchResponseData; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.server.storage.log.FetchPartitionData; -import org.apache.kafka.storage.internals.log.FetchDataInfo; import java.util.Optional; import java.util.OptionalInt; @@ -38,7 +38,7 @@ * @param fetchTimeMs The time the fetch was received * @param lastStableOffset Current LSO or None if the result has an exception * @param preferredReadReplica the preferred read replica to be used for future fetches - * @param exception Exception if error encountered while reading from the log + * @param error Errors if error encountered while reading from the log */ public record LogReadResult( FetchDataInfo info, @@ -50,21 +50,8 @@ public record LogReadResult( long fetchTimeMs, OptionalLong lastStableOffset, OptionalInt preferredReadReplica, - Optional exception + Errors error ) { - public LogReadResult( - FetchDataInfo info, - Optional divergingEpoch, - long highWatermark, - long leaderLogStartOffset, - long leaderLogEndOffset, - long followerLogStartOffset, - long fetchTimeMs, - OptionalLong lastStableOffset) { - this(info, divergingEpoch, highWatermark, leaderLogStartOffset, leaderLogEndOffset, followerLogStartOffset, - fetchTimeMs, lastStableOffset, OptionalInt.empty(), Optional.empty()); - } - public LogReadResult( FetchDataInfo info, Optional divergingEpoch, @@ -74,44 +61,21 @@ public LogReadResult( long followerLogStartOffset, long fetchTimeMs, OptionalLong lastStableOffset, - Optional exception) { - this(info, divergingEpoch, highWatermark, leaderLogStartOffset, leaderLogEndOffset, followerLogStartOffset, - fetchTimeMs, lastStableOffset, OptionalInt.empty(), exception); - } - - public LogReadResult( - FetchDataInfo info, - Optional divergingEpoch, - long highWatermark, - long leaderLogStartOffset, - long leaderLogEndOffset, - long followerLogStartOffset, - long fetchTimeMs, - OptionalLong lastStableOffset, - OptionalInt preferredReadReplica) { + Errors error) { this(info, divergingEpoch, highWatermark, leaderLogStartOffset, leaderLogEndOffset, followerLogStartOffset, - fetchTimeMs, lastStableOffset, preferredReadReplica, Optional.empty()); - } - - public Errors error() { - if (exception.isPresent()) { - return Errors.forException(exception.get()); - } - return Errors.NONE; + fetchTimeMs, lastStableOffset, OptionalInt.empty(), error); } - @Override - public String toString() { - return "LogReadResult(info=" + info + - ", divergingEpoch=" + divergingEpoch + - ", highWatermark=" + highWatermark + - ", leaderLogStartOffset" + leaderLogStartOffset + - ", leaderLogEndOffset" + leaderLogEndOffset + - ", followerLogStartOffset" + followerLogStartOffset + - ", fetchTimeMs=" + fetchTimeMs + - ", preferredReadReplica=" + preferredReadReplica + - ", lastStableOffset=" + lastStableOffset + - ", error=" + error() + ")"; + public LogReadResult(Errors error) { + this(new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY), + Optional.empty(), + UnifiedLog.UNKNOWN_OFFSET, + UnifiedLog.UNKNOWN_OFFSET, + UnifiedLog.UNKNOWN_OFFSET, + UnifiedLog.UNKNOWN_OFFSET, + -1L, + OptionalLong.empty(), + error); } public FetchPartitionData toFetchPartitionData(boolean isReassignmentFetch) { diff --git a/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteFetchTest.java b/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteFetchTest.java new file mode 100644 index 0000000000000..d4fc0c3ef27a4 --- /dev/null +++ b/storage/src/test/java/org/apache/kafka/server/purgatory/DelayedRemoteFetchTest.java @@ -0,0 +1,480 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.purgatory; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.NotLeaderOrFollowerException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.server.metrics.KafkaYammerMetrics; +import org.apache.kafka.server.storage.log.FetchIsolation; +import org.apache.kafka.server.storage.log.FetchParams; +import org.apache.kafka.server.storage.log.FetchPartitionData; +import org.apache.kafka.storage.internals.log.FetchDataInfo; +import org.apache.kafka.storage.internals.log.FetchPartitionStatus; +import org.apache.kafka.storage.internals.log.LogOffsetMetadata; +import org.apache.kafka.storage.internals.log.LogReadResult; +import org.apache.kafka.storage.internals.log.RemoteLogReadResult; +import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo; + +import com.yammer.metrics.core.Meter; +import com.yammer.metrics.core.Metric; +import com.yammer.metrics.core.MetricName; + +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class DelayedRemoteFetchTest { + private final int maxBytes = 1024; + private final Consumer partitionOrException = mock(Consumer.class); + private final TopicIdPartition topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic"); + private final TopicIdPartition topicIdPartition2 = new TopicIdPartition(Uuid.randomUuid(), 0, "topic2"); + private final long fetchOffset = 500L; + private final long logStartOffset = 0L; + private final Optional currentLeaderEpoch = Optional.of(10); + private final int remoteFetchMaxWaitMs = 500; + + private final FetchPartitionStatus fetchStatus = new FetchPartitionStatus( + new LogOffsetMetadata(fetchOffset), + new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch) + ); + private final FetchParams fetchParams = buildFetchParams(-1, 500); + + @Test + public void testFetch() { + AtomicReference actualTopicPartition = new AtomicReference<>(); + AtomicReference fetchResultOpt = new AtomicReference<>(); + + Consumer> callback = responses -> { + assertEquals(1, responses.size()); + Map.Entry entry = responses.entrySet().iterator().next(); + actualTopicPartition.set(entry.getKey()); + fetchResultOpt.set(entry.getValue()); + }; + + CompletableFuture future = new CompletableFuture<>(); + future.complete(buildRemoteReadResult(Errors.NONE)); + + RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null); + long highWatermark = 100L; + long leaderLogStartOffset = 10L; + LogReadResult logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset); + + DelayedRemoteFetch delayedRemoteFetch = new DelayedRemoteFetch( + Map.of(), + Map.of(topicIdPartition, future), + Map.of(topicIdPartition, fetchInfo), + remoteFetchMaxWaitMs, + Map.of(topicIdPartition, fetchStatus), + fetchParams, + Map.of(topicIdPartition, logReadInfo), + partitionOrException, + callback + ); + + assertTrue(delayedRemoteFetch.tryComplete()); + assertTrue(delayedRemoteFetch.isCompleted()); + assertNotNull(actualTopicPartition.get()); + assertEquals(topicIdPartition, actualTopicPartition.get()); + assertNotNull(fetchResultOpt.get()); + + FetchPartitionData fetchResult = fetchResultOpt.get(); + assertEquals(Errors.NONE, fetchResult.error); + assertEquals(highWatermark, fetchResult.highWatermark); + assertEquals(leaderLogStartOffset, fetchResult.logStartOffset); + } + + @Test + public void testFollowerFetch() { + Consumer> callback = responses -> { + assertEquals(1, responses.size()); + }; + + CompletableFuture future = new CompletableFuture<>(); + future.complete(buildRemoteReadResult(Errors.NONE)); + RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null); + LogReadResult logReadInfo = buildReadResult(Errors.NONE, 100L, 10L); + + assertThrows(IllegalStateException.class, () -> + new DelayedRemoteFetch( + Map.of(), + Map.of(topicIdPartition, future), + Map.of(topicIdPartition, fetchInfo), + remoteFetchMaxWaitMs, + Map.of(topicIdPartition, fetchStatus), + buildFetchParams(1, 500), + Map.of(topicIdPartition, logReadInfo), + partitionOrException, + callback + )); + } + + @Test + public void testNotLeaderOrFollower() { + AtomicReference actualTopicPartition = new AtomicReference<>(); + AtomicReference fetchResultOpt = new AtomicReference<>(); + + Consumer> callback = responses -> { + assertEquals(1, responses.size()); + Map.Entry entry = responses.entrySet().iterator().next(); + actualTopicPartition.set(entry.getKey()); + fetchResultOpt.set(entry.getValue()); + }; + + // throw exception while getPartition + doThrow(new NotLeaderOrFollowerException(String.format("Replica for %s not available", topicIdPartition))) + .when(partitionOrException).accept(topicIdPartition.topicPartition()); + + CompletableFuture future = new CompletableFuture<>(); + RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null); + + LogReadResult logReadInfo = buildReadResult(Errors.NONE); + + DelayedRemoteFetch delayedRemoteFetch = new DelayedRemoteFetch( + Map.of(), + Map.of(topicIdPartition, future), + Map.of(topicIdPartition, fetchInfo), + remoteFetchMaxWaitMs, + Map.of(topicIdPartition, fetchStatus), + fetchParams, + Map.of(topicIdPartition, logReadInfo), + partitionOrException, + callback + ); + + // delayed remote fetch should still be able to complete + assertTrue(delayedRemoteFetch.tryComplete()); + assertTrue(delayedRemoteFetch.isCompleted()); + assertEquals(topicIdPartition, actualTopicPartition.get()); + assertNotNull(fetchResultOpt.get()); + } + + @Test + public void testErrorLogReadInfo() { + AtomicReference actualTopicPartition = new AtomicReference<>(); + AtomicReference fetchResultOpt = new AtomicReference<>(); + + Consumer> callback = responses -> { + assertEquals(1, responses.size()); + Map.Entry entry = responses.entrySet().iterator().next(); + actualTopicPartition.set(entry.getKey()); + fetchResultOpt.set(entry.getValue()); + }; + + CompletableFuture future = new CompletableFuture<>(); + future.complete(buildRemoteReadResult(Errors.NONE)); + + RemoteStorageFetchInfo fetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null); + + // build a read result with error + LogReadResult logReadInfo = buildReadResult(Errors.FENCED_LEADER_EPOCH); + + DelayedRemoteFetch delayedRemoteFetch = new DelayedRemoteFetch( + Map.of(), + Map.of(topicIdPartition, future), + Map.of(topicIdPartition, fetchInfo), + remoteFetchMaxWaitMs, + Map.of(topicIdPartition, fetchStatus), + fetchParams, + Map.of(topicIdPartition, logReadInfo), + partitionOrException, + callback + ); + + assertTrue(delayedRemoteFetch.tryComplete()); + assertTrue(delayedRemoteFetch.isCompleted()); + assertEquals(topicIdPartition, actualTopicPartition.get()); + assertNotNull(fetchResultOpt.get()); + assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResultOpt.get().error); + } + + private long expiresPerSecValue() { + Map allMetrics = KafkaYammerMetrics.defaultRegistry().allMetrics(); + return allMetrics.entrySet() + .stream() + .filter(e -> e.getKey().getMBeanName().endsWith("kafka.server:type=DelayedRemoteFetchMetrics,name=ExpiresPerSec")) + .findFirst() + .map(Map.Entry::getValue) + .filter(Meter.class::isInstance) + .map(Meter.class::cast) + .map(Meter::count) + .orElse(0L); + } + + @Test + public void testRequestExpiry() { + Map responses = new HashMap<>(); + + Consumer> callback = responses::putAll; + + Future remoteFetchTaskExpired = mock(Future.class); + Future remoteFetchTask2 = mock(Future.class); + // complete the 2nd task, and keep the 1st one expired + when(remoteFetchTask2.isDone()).thenReturn(true); + + // Create futures - one completed, one not + CompletableFuture future1 = new CompletableFuture<>(); + CompletableFuture future2 = new CompletableFuture<>(); + // Only complete one remote fetch + future2.complete(buildRemoteReadResult(Errors.NONE)); + + RemoteStorageFetchInfo fetchInfo1 = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null); + RemoteStorageFetchInfo fetchInfo2 = new RemoteStorageFetchInfo(0, false, topicIdPartition2, null, null); + + long highWatermark = 100L; + long leaderLogStartOffset = 10L; + + LogReadResult logReadInfo1 = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset); + LogReadResult logReadInfo2 = buildReadResult(Errors.NONE); + + FetchPartitionStatus fetchStatus1 = new FetchPartitionStatus( + new LogOffsetMetadata(fetchOffset), + new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)); + + FetchPartitionStatus fetchStatus2 = new FetchPartitionStatus( + new LogOffsetMetadata(fetchOffset + 100L), + new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset + 100L, logStartOffset, maxBytes, currentLeaderEpoch)); + + // Set up maps for multiple partitions + Map> remoteFetchTasks = Map.of(topicIdPartition, remoteFetchTaskExpired, topicIdPartition2, remoteFetchTask2); + Map> remoteFetchResults = Map.of(topicIdPartition, future1, topicIdPartition2, future2); + Map remoteFetchInfos = Map.of(topicIdPartition, fetchInfo1, topicIdPartition2, fetchInfo2); + + DelayedRemoteFetch delayedRemoteFetch = new DelayedRemoteFetch( + remoteFetchTasks, + remoteFetchResults, + remoteFetchInfos, + remoteFetchMaxWaitMs, + Map.of(topicIdPartition, fetchStatus1, topicIdPartition2, fetchStatus2), + fetchParams, + Map.of(topicIdPartition, logReadInfo1, topicIdPartition2, logReadInfo2), + partitionOrException, + callback + ); + + // Verify that the ExpiresPerSec metric is zero before fetching + long existingMetricVal = expiresPerSecValue(); + // Verify the delayedRemoteFetch is not completed yet + assertFalse(delayedRemoteFetch.isCompleted()); + + // Force the delayed remote fetch to expire + delayedRemoteFetch.run(); + + // Check that the expired task was cancelled and force-completed + verify(remoteFetchTaskExpired).cancel(anyBoolean()); + verify(remoteFetchTask2, never()).cancel(anyBoolean()); + assertTrue(delayedRemoteFetch.isCompleted()); + + // Check that the ExpiresPerSec metric was incremented + assertTrue(expiresPerSecValue() > existingMetricVal); + + // Fetch results should include 2 results and the expired one should return local read results + assertEquals(2, responses.size()); + assertTrue(responses.containsKey(topicIdPartition)); + assertTrue(responses.containsKey(topicIdPartition2)); + + assertEquals(Errors.NONE, responses.get(topicIdPartition).error); + assertEquals(highWatermark, responses.get(topicIdPartition).highWatermark); + assertEquals(leaderLogStartOffset, responses.get(topicIdPartition).logStartOffset); + + assertEquals(Errors.NONE, responses.get(topicIdPartition2).error); + } + + @Test + public void testMultiplePartitions() { + Map responses = new HashMap<>(); + + Consumer> callback = responses::putAll; + + // Create futures - one completed, one not + CompletableFuture future1 = new CompletableFuture<>(); + CompletableFuture future2 = new CompletableFuture<>(); + // Only complete one remote fetch + future1.complete(buildRemoteReadResult(Errors.NONE)); + + RemoteStorageFetchInfo fetchInfo1 = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null); + RemoteStorageFetchInfo fetchInfo2 = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null); + + long highWatermark1 = 100L; + long leaderLogStartOffset1 = 10L; + long highWatermark2 = 200L; + long leaderLogStartOffset2 = 20L; + + LogReadResult logReadInfo1 = buildReadResult(Errors.NONE, highWatermark1, leaderLogStartOffset1); + LogReadResult logReadInfo2 = buildReadResult(Errors.NONE, highWatermark2, leaderLogStartOffset2); + + FetchPartitionStatus fetchStatus1 = new FetchPartitionStatus( + new LogOffsetMetadata(fetchOffset), + new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)); + + FetchPartitionStatus fetchStatus2 = new FetchPartitionStatus( + new LogOffsetMetadata(fetchOffset + 100L), + new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset + 100L, logStartOffset, maxBytes, currentLeaderEpoch)); + + DelayedRemoteFetch delayedRemoteFetch = new DelayedRemoteFetch( + Map.of(), + Map.of(topicIdPartition, future1, topicIdPartition2, future2), + Map.of(topicIdPartition, fetchInfo1, topicIdPartition2, fetchInfo2), + remoteFetchMaxWaitMs, + Map.of(topicIdPartition, fetchStatus1, topicIdPartition2, fetchStatus2), + fetchParams, + Map.of(topicIdPartition, logReadInfo1, topicIdPartition2, logReadInfo2), + partitionOrException, + callback + ); + + // Should not complete since future2 is not done + assertFalse(delayedRemoteFetch.tryComplete()); + assertFalse(delayedRemoteFetch.isCompleted()); + + // Complete future2 + future2.complete(buildRemoteReadResult(Errors.NONE)); + + // Now it should complete + assertTrue(delayedRemoteFetch.tryComplete()); + assertTrue(delayedRemoteFetch.isCompleted()); + + // Verify both partitions were processed without error + assertEquals(2, responses.size()); + assertTrue(responses.containsKey(topicIdPartition)); + assertTrue(responses.containsKey(topicIdPartition2)); + + assertEquals(Errors.NONE, responses.get(topicIdPartition).error); + assertEquals(highWatermark1, responses.get(topicIdPartition).highWatermark); + assertEquals(leaderLogStartOffset1, responses.get(topicIdPartition).logStartOffset); + + assertEquals(Errors.NONE, responses.get(topicIdPartition2).error); + assertEquals(highWatermark2, responses.get(topicIdPartition2).highWatermark); + assertEquals(leaderLogStartOffset2, responses.get(topicIdPartition2).logStartOffset); + } + + @Test + public void testMultiplePartitionsWithFailedResults() { + Map responses = new HashMap<>(); + + Consumer> callback = responses::putAll; + + // Create futures - one successful, one with error + CompletableFuture future1 = new CompletableFuture<>(); + CompletableFuture future2 = new CompletableFuture<>(); + + // Created 1 successful result and 1 failed result + future1.complete(buildRemoteReadResult(Errors.NONE)); + future2.complete(buildRemoteReadResult(Errors.UNKNOWN_SERVER_ERROR)); + + RemoteStorageFetchInfo fetchInfo1 = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null); + RemoteStorageFetchInfo fetchInfo2 = new RemoteStorageFetchInfo(0, false, topicIdPartition, null, null); + + LogReadResult logReadInfo1 = buildReadResult(Errors.NONE, 100, 10); + LogReadResult logReadInfo2 = buildReadResult(Errors.NONE, 100, 10); + + FetchPartitionStatus fetchStatus1 = new FetchPartitionStatus( + new LogOffsetMetadata(fetchOffset), + new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch)); + + FetchPartitionStatus fetchStatus2 = new FetchPartitionStatus( + new LogOffsetMetadata(fetchOffset + 100), + new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset + 100, logStartOffset, maxBytes, currentLeaderEpoch)); + + DelayedRemoteFetch delayedRemoteFetch = new DelayedRemoteFetch( + Map.of(), + Map.of(topicIdPartition, future1, topicIdPartition2, future2), + Map.of(topicIdPartition, fetchInfo1, topicIdPartition2, fetchInfo2), + remoteFetchMaxWaitMs, + Map.of(topicIdPartition, fetchStatus1, topicIdPartition2, fetchStatus2), + fetchParams, + Map.of(topicIdPartition, logReadInfo1, topicIdPartition2, logReadInfo2), + partitionOrException, + callback + ); + + assertTrue(delayedRemoteFetch.tryComplete()); + assertTrue(delayedRemoteFetch.isCompleted()); + + // Verify both partitions were processed + assertEquals(2, responses.size()); + assertTrue(responses.containsKey(topicIdPartition)); + assertTrue(responses.containsKey(topicIdPartition2)); + + // First partition should be successful + FetchPartitionData fetchResult1 = responses.get(topicIdPartition); + assertEquals(Errors.NONE, fetchResult1.error); + + // Second partition should have an error due to remote fetch failure + FetchPartitionData fetchResult2 = responses.get(topicIdPartition2); + assertEquals(Errors.UNKNOWN_SERVER_ERROR, fetchResult2.error); + } + + private FetchParams buildFetchParams(int replicaId, int maxWaitMs) { + return new FetchParams( + replicaId, + 1, + maxWaitMs, + 1, + maxBytes, + FetchIsolation.LOG_END, + Optional.empty() + ); + } + + private LogReadResult buildReadResult(Errors error) { + return buildReadResult(error, 0, 0); + } + + private LogReadResult buildReadResult(Errors error, long highWatermark, long leaderLogStartOffset) { + return new LogReadResult( + new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY, false, Optional.empty(), + Optional.of(mock(RemoteStorageFetchInfo.class))), + Optional.empty(), + highWatermark, + leaderLogStartOffset, + -1L, + -1L, + -1L, + OptionalLong.empty(), + error); + } + + private RemoteLogReadResult buildRemoteReadResult(Errors error) { + return new RemoteLogReadResult( + Optional.of(new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY)), + error != Errors.NONE ? Optional.of(error.exception()) : Optional.empty()); + } +}