Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion checkstyle/import-control-storage.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@
</subpackage>
</subpackage>
</subpackage>

<subpackage name="purgatory">
<allow pkg="org.apache.kafka.server.storage.log" />
</subpackage>
</subpackage>

<subpackage name="storage.internals">
Expand Down Expand Up @@ -164,7 +168,6 @@
<allow pkg="org.apache.kafka.server.log.remote.storage" />
<allow pkg="scala.jdk.javaapi" />
<allow pkg="org.apache.kafka.test" />

</subpackage>

</import-control>
5 changes: 2 additions & 3 deletions core/src/main/java/kafka/server/share/DelayedShareFetch.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.raft.errors.NotLeaderException;
import org.apache.kafka.server.LogReadResult;
import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.server.purgatory.DelayedOperation;
import org.apache.kafka.server.share.SharePartitionKey;
Expand All @@ -46,6 +45,7 @@
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.LogOffsetSnapshot;
import org.apache.kafka.storage.internals.log.LogReadResult;
import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;

Expand Down Expand Up @@ -852,13 +852,12 @@ private void completeRemoteStorageShareFetchRequest() {
if (remoteFetch.remoteFetchResult().isDone()) {
RemoteLogReadResult remoteLogReadResult = remoteFetch.remoteFetchResult().get();
if (remoteLogReadResult.error().isPresent()) {
Throwable error = remoteLogReadResult.error().get();
// If there is any error for the remote fetch topic partition, we populate the error accordingly.
shareFetchPartitionData.add(
new ShareFetchPartitionData(
remoteFetch.topicIdPartition(),
partitionsAcquired.get(remoteFetch.topicIdPartition()),
ReplicaManager.createLogReadResult(error).toFetchPartitionData(false)
new LogReadResult(Errors.forException(remoteLogReadResult.error().get())).toFetchPartitionData(false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we check that we didn't lose a useful error message by doing this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out. I checked the usage of LogReadResult and only found calls to LogReadResult.error(), so in the previous implementation, no code would utilize the error message from the Exception.
If anything is inappropriate, please feel free to correct me.

)
);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package kafka.server.share;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.server.LogReadResult;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.LogReadResult;
import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;

Expand Down
12 changes: 1 addition & 11 deletions core/src/main/scala/kafka/server/DelayedFetch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,15 @@ import java.util.concurrent.TimeUnit
import org.apache.kafka.common.TopicIdPartition
import org.apache.kafka.common.errors._
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.purgatory.DelayedOperation
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, FetchPartitionData}
import org.apache.kafka.storage.internals.log.LogOffsetMetadata
import org.apache.kafka.storage.internals.log.{FetchPartitionStatus, LogOffsetMetadata}

import scala.collection._
import scala.jdk.CollectionConverters._

case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInfo: PartitionData) {

override def toString: String = {
"[startOffsetMetadata: " + startOffsetMetadata +
", fetchInfo: " + fetchInfo +
"]"
}
}

/**
* A delayed fetch operation that can be created by the replica manager and watched
* in the fetch operation purgatory
Expand Down
146 changes: 0 additions & 146 deletions core/src/main/scala/kafka/server/DelayedRemoteFetch.scala

This file was deleted.

56 changes: 26 additions & 30 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,16 @@ import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.server.log.remote.storage.RemoteLogManager
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, DelayedRemoteListOffsets, DeleteRecordsPartitionStatus, ListOffsetsPartitionStatus, TopicPartitionOperationKey}
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, DelayedRemoteFetch, DelayedRemoteListOffsets, DeleteRecordsPartitionStatus, ListOffsetsPartitionStatus, TopicPartitionOperationKey}
import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, DelayedShareFetchPartitionKey}
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
import org.apache.kafka.server.transaction.AddPartitionsToTxnManager.TransactionSupportedOperation
import org.apache.kafka.server.util.timer.{SystemTimer, TimerTask}
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, LogReadResult, common}
import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, common}
import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchPartitionStatus, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, LogReadResult, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats

import java.io.File
Expand Down Expand Up @@ -188,19 +188,7 @@ object ReplicaManager {
-1L,
-1L,
OptionalLong.empty(),
Optional.of(e))
}

def createLogReadResult(e: Throwable): LogReadResult = {
new LogReadResult(new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
Optional.empty(),
UnifiedLog.UNKNOWN_OFFSET,
UnifiedLog.UNKNOWN_OFFSET,
UnifiedLog.UNKNOWN_OFFSET,
UnifiedLog.UNKNOWN_OFFSET,
-1L,
OptionalLong.empty(),
Optional.of(e))
Errors.forException(e));
}

private[server] def isListOffsetsTimestampUnsupported(timestamp: JLong, version: Short): Boolean = {
Expand Down Expand Up @@ -1639,7 +1627,7 @@ class ReplicaManager(val config: KafkaConfig,
private def processRemoteFetches(remoteFetchInfos: util.LinkedHashMap[TopicIdPartition, RemoteStorageFetchInfo],
params: FetchParams,
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit,
logReadResults: Seq[(TopicIdPartition, LogReadResult)],
logReadResults: util.LinkedHashMap[TopicIdPartition, LogReadResult],
fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)]): Unit = {
val remoteFetchTasks = new util.HashMap[TopicIdPartition, Future[Void]]
val remoteFetchResults = new util.HashMap[TopicIdPartition, CompletableFuture[RemoteLogReadResult]]
Expand All @@ -1651,8 +1639,15 @@ class ReplicaManager(val config: KafkaConfig,
}

val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong
val remoteFetch = new DelayedRemoteFetch(remoteFetchTasks, remoteFetchResults, remoteFetchInfos, remoteFetchMaxWaitMs,
fetchPartitionStatus, params, logReadResults, this, responseCallback)
val remoteFetch = new DelayedRemoteFetch(remoteFetchTasks,
remoteFetchResults,
remoteFetchInfos,
remoteFetchMaxWaitMs,
fetchPartitionStatus.toMap.asJava,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could initialize fetchPartitionStatus as a Map type to reduce unnecessary collection conversions

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #20769 to address this comment. PTAL.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't saw #20768 was also opened. We can merge either one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep the FIFO policy and review #20768 together

params,
logReadResults,
tp => getPartitionOrException(tp),
response => responseCallback(response.asScala.toSeq))

// create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
val delayedFetchKeys = remoteFetchTasks.asScala.map { case (tp, _) => new TopicPartitionOperationKey(tp) }.toList
Expand Down Expand Up @@ -1681,7 +1676,7 @@ class ReplicaManager(val config: KafkaConfig,

var hasDivergingEpoch = false
var hasPreferredReadReplica = false
val logReadResultMap = new mutable.HashMap[TopicIdPartition, LogReadResult]
val logReadResultMap = new util.LinkedHashMap[TopicIdPartition, LogReadResult]

logReadResults.foreach { case (topicIdPartition, logReadResult) =>
brokerTopicStats.topicStats(topicIdPartition.topicPartition.topic).totalFetchRequestRate.mark()
Expand Down Expand Up @@ -1717,14 +1712,15 @@ class ReplicaManager(val config: KafkaConfig,
// construct the fetch results from the read results
val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicIdPartition, FetchPartitionStatus)]
fetchInfos.foreach { case (topicIdPartition, partitionData) =>
logReadResultMap.get(topicIdPartition).foreach(logReadResult => {
val logReadResult = logReadResultMap.get(topicIdPartition)
if (logReadResult != null) {
val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata
fetchPartitionStatus += (topicIdPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData))
})
fetchPartitionStatus += (topicIdPartition -> new FetchPartitionStatus(logOffsetMetadata, partitionData))
}
}

if (!remoteFetchInfos.isEmpty) {
processRemoteFetches(remoteFetchInfos, params, responseCallback, logReadResults, fetchPartitionStatus.toSeq)
processRemoteFetches(remoteFetchInfos, params, responseCallback, logReadResultMap, fetchPartitionStatus.toSeq)
} else {
// If there is not enough data to respond and there is no remote data, we will let the fetch request
// wait for new data.
Expand Down Expand Up @@ -1812,7 +1808,7 @@ class ReplicaManager(val config: KafkaConfig,
-1L,
OptionalLong.of(offsetSnapshot.lastStableOffset.messageOffset),
if (preferredReadReplica.isDefined) OptionalInt.of(preferredReadReplica.get) else OptionalInt.empty(),
Optional.empty())
Errors.NONE)
} else {
log = partition.localLogWithEpochOrThrow(fetchInfo.currentLeaderEpoch, params.fetchOnlyLeader())

Expand All @@ -1836,7 +1832,7 @@ class ReplicaManager(val config: KafkaConfig,
fetchTimeMs,
OptionalLong.of(readInfo.lastStableOffset),
if (preferredReadReplica.isDefined) OptionalInt.of(preferredReadReplica.get) else OptionalInt.empty(),
Optional.empty()
Errors.NONE
)
}
} catch {
Expand All @@ -1849,7 +1845,7 @@ class ReplicaManager(val config: KafkaConfig,
_: ReplicaNotAvailableException |
_: KafkaStorageException |
_: InconsistentTopicIdException) =>
createLogReadResult(e)
new LogReadResult(Errors.forException(e))
case e: OffsetOutOfRangeException =>
handleOffsetOutOfRangeError(tp, params, fetchInfo, adjustedMaxBytes, minOneMessage, log, fetchTimeMs, e)
case e: Throwable =>
Expand All @@ -1868,7 +1864,7 @@ class ReplicaManager(val config: KafkaConfig,
UnifiedLog.UNKNOWN_OFFSET,
-1L,
OptionalLong.empty(),
Optional.of(e)
Errors.forException(e)
)
}
}
Expand Down Expand Up @@ -1949,10 +1945,10 @@ class ReplicaManager(val config: KafkaConfig,
fetchInfo.logStartOffset,
fetchTimeMs,
OptionalLong.of(log.lastStableOffset),
Optional.empty[Throwable]())
Errors.NONE)
}
} else {
createLogReadResult(exception)
new LogReadResult(Errors.forException(exception))
}
}

Expand Down
Loading