Skip to content

Commit

Permalink
KAFKA-17509: Introduce a delayed action queue to complete purgatory a…
Browse files Browse the repository at this point in the history
…ctions outside purgatory. (apache#17177)

Add purgatory actions to DelayedActionQueue when partition locks are released after fetch in forceComplete. 

Reviewers: David Arthur <[email protected]>, Apoorv Mittal <[email protected]>, Jun Rao <[email protected]>
  • Loading branch information
adixitconfluent authored and tedyu committed Jan 6, 2025
1 parent 497fd76 commit 36a604f
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import kafka.log.remote.RemoteLogManager;
import kafka.server.AddPartitionsToTxnManager;
import kafka.server.AlterPartitionManager;
import kafka.server.DelayedActionQueue;
import kafka.server.DelayedDeleteRecords;
import kafka.server.DelayedElectLeader;
import kafka.server.DelayedFetch;
Expand Down Expand Up @@ -216,6 +217,7 @@ public ReplicaManager build() {
OptionConverters.toScala(threadNamePrefix),
() -> brokerEpoch,
OptionConverters.toScala(addPartitionsToTxnManager),
directoryEventHandler);
directoryEventHandler,
new DelayedActionQueue());
}
}
20 changes: 19 additions & 1 deletion core/src/main/java/kafka/server/share/DelayedShareFetch.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package kafka.server.share;

import kafka.server.ActionQueue;
import kafka.server.DelayedOperation;
import kafka.server.DelayedOperationPurgatory;
import kafka.server.LogReadResult;
import kafka.server.QuotaFactory;
import kafka.server.ReplicaManager;
Expand Down Expand Up @@ -54,17 +56,23 @@ public class DelayedShareFetch extends DelayedOperation {
private final ShareFetchData shareFetchData;
private final ReplicaManager replicaManager;
private final Map<SharePartitionKey, SharePartition> partitionCacheMap;
private final ActionQueue delayedActionQueue;
private final DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory;

private Map<TopicIdPartition, FetchRequest.PartitionData> topicPartitionDataFromTryComplete;

DelayedShareFetch(
ShareFetchData shareFetchData,
ReplicaManager replicaManager,
Map<SharePartitionKey, SharePartition> partitionCacheMap) {
Map<SharePartitionKey, SharePartition> partitionCacheMap,
ActionQueue delayedActionQueue,
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory) {
super(shareFetchData.fetchParams().maxWaitMs, Option.empty());
this.shareFetchData = shareFetchData;
this.replicaManager = replicaManager;
this.partitionCacheMap = partitionCacheMap;
this.delayedActionQueue = delayedActionQueue;
this.delayedShareFetchPurgatory = delayedShareFetchPurgatory;
this.topicPartitionDataFromTryComplete = new LinkedHashMap<>();
}

Expand Down Expand Up @@ -131,6 +139,16 @@ public void onComplete() {
}
// Releasing the lock to move ahead with the next request in queue.
releasePartitionLocks(shareFetchData.groupId(), topicPartitionData.keySet());
// If we have a fetch request completed for a topic-partition, we release the locks for that partition,
// then we should check if there is a pending share fetch request for the topic-partition and complete it.
// We add the action to delayed actions queue to avoid an infinite call stack, which could happen if
// we directly call delayedShareFetchPurgatory.checkAndComplete
delayedActionQueue.add(() -> {
result.keySet().forEach(topicIdPartition ->
delayedShareFetchPurgatory.checkAndComplete(
new DelayedShareFetchKey(shareFetchData.groupId(), topicIdPartition)));
return BoxedUnit.UNIT;
});
});

} catch (Exception e) {
Expand Down
16 changes: 14 additions & 2 deletions core/src/main/java/kafka/server/share/SharePartitionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package kafka.server.share;

import kafka.server.ActionQueue;
import kafka.server.DelayedOperationPurgatory;
import kafka.server.ReplicaManager;

Expand Down Expand Up @@ -147,6 +148,11 @@ public class SharePartitionManager implements AutoCloseable {
*/
private final DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory;

/**
* The delayed actions queue is used to complete any pending delayed share fetch actions.
*/
private final ActionQueue delayedActionsQueue;

public SharePartitionManager(
ReplicaManager replicaManager,
Time time,
Expand All @@ -156,6 +162,7 @@ public SharePartitionManager(
int maxInFlightMessages,
int shareFetchPurgatoryPurgeIntervalRequests,
Persister persister,
ActionQueue delayedActionsQueue,
Metrics metrics
) {
this(replicaManager,
Expand All @@ -167,6 +174,7 @@ public SharePartitionManager(
maxInFlightMessages,
shareFetchPurgatoryPurgeIntervalRequests,
persister,
delayedActionsQueue,
metrics
);
}
Expand All @@ -181,6 +189,7 @@ private SharePartitionManager(
int maxInFlightMessages,
int shareFetchPurgatoryPurgeIntervalRequests,
Persister persister,
ActionQueue delayedActionsQueue,
Metrics metrics
) {
this.replicaManager = replicaManager;
Expand All @@ -197,6 +206,7 @@ private SharePartitionManager(
this.persister = persister;
this.shareGroupMetrics = new ShareGroupMetrics(Objects.requireNonNull(metrics), time);
this.delayedShareFetchPurgatory = new DelayedOperationPurgatory<>("ShareFetch", this.timer, this.replicaManager.localBrokerId(), shareFetchPurgatoryPurgeIntervalRequests, true, true);
this.delayedActionsQueue = delayedActionsQueue;
}

// Visible for testing.
Expand All @@ -212,7 +222,8 @@ private SharePartitionManager(
int maxInFlightMessages,
Persister persister,
Metrics metrics,
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory
DelayedOperationPurgatory<DelayedShareFetch> delayedShareFetchPurgatory,
ActionQueue delayedActionsQueue
) {
this.replicaManager = replicaManager;
this.time = time;
Expand All @@ -227,6 +238,7 @@ private SharePartitionManager(
this.persister = persister;
this.shareGroupMetrics = new ShareGroupMetrics(Objects.requireNonNull(metrics), time);
this.delayedShareFetchPurgatory = delayedShareFetchPurgatory;
this.delayedActionsQueue = delayedActionsQueue;
}

/**
Expand Down Expand Up @@ -600,7 +612,7 @@ void maybeProcessFetchQueue() {
new DelayedShareFetchKey(shareFetchData.groupId(), topicIdPartition)));

// Add the share fetch to the delayed share fetch purgatory to process the fetch request.
addDelayedShareFetch(new DelayedShareFetch(shareFetchData, replicaManager, partitionCacheMap),
addDelayedShareFetch(new DelayedShareFetch(shareFetchData, replicaManager, partitionCacheMap, delayedActionsQueue, delayedShareFetchPurgatory),
delayedShareFetchWatchKeys);

// Release the lock so that other threads can process the queue.
Expand Down
9 changes: 8 additions & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,11 @@ class BrokerServer(
lifecycleManager.propagateDirectoryFailure(directoryId, config.logDirFailureTimeoutMs)
}

/**
* TODO: move this action queue to handle thread so we can simplify concurrency handling
*/
val defaultActionQueue = new DelayedActionQueue

this._replicaManager = new ReplicaManager(
config = config,
metrics = metrics,
Expand All @@ -338,7 +343,8 @@ class BrokerServer(
delayedRemoteFetchPurgatoryParam = None,
brokerEpochSupplier = () => lifecycleManager.brokerEpoch,
addPartitionsToTxnManager = Some(addPartitionsToTxnManager),
directoryEventHandler = directoryEventHandler
directoryEventHandler = directoryEventHandler,
defaultActionQueue = defaultActionQueue
)

/* start token manager */
Expand Down Expand Up @@ -423,6 +429,7 @@ class BrokerServer(
config.shareGroupConfig.shareGroupPartitionMaxRecordLocks,
config.shareGroupConfig.shareFetchPurgatoryPurgeIntervalRequests,
persister,
defaultActionQueue,
new Metrics()
)

Expand Down
8 changes: 2 additions & 6 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,8 @@ class ReplicaManager(val config: KafkaConfig,
threadNamePrefix: Option[String] = None,
val brokerEpochSupplier: () => Long = () => -1,
addPartitionsToTxnManager: Option[AddPartitionsToTxnManager] = None,
val directoryEventHandler: DirectoryEventHandler = DirectoryEventHandler.NOOP
val directoryEventHandler: DirectoryEventHandler = DirectoryEventHandler.NOOP,
val defaultActionQueue: ActionQueue = new DelayedActionQueue
) extends Logging {
private val metricsGroup = new KafkaMetricsGroup(this.getClass)

Expand Down Expand Up @@ -741,11 +742,6 @@ class ReplicaManager(val config: KafkaConfig,
localLog(topicPartition).map(_.parentDir)
}

/**
* TODO: move this action queue to handle thread so we can simplify concurrency handling
*/
private val defaultActionQueue = new DelayedActionQueue

def tryCompleteActions(): Unit = defaultActionQueue.tryCompleteActions()

/**
Expand Down
Loading

0 comments on commit 36a604f

Please sign in to comment.