From 36a604fcf3f98ea314e5f623e7267d625ba04f7c Mon Sep 17 00:00:00 2001 From: Abhinav Dixit Date: Fri, 4 Oct 2024 22:02:24 +0530 Subject: [PATCH] KAFKA-17509: Introduce a delayed action queue to complete purgatory actions outside purgatory. (#17177) Add purgatory actions to DelayedActionQueue when partition locks are released after fetch in forceComplete. Reviewers: David Arthur , Apoorv Mittal , Jun Rao --- .../builders/ReplicaManagerBuilder.java | 4 +- .../kafka/server/share/DelayedShareFetch.java | 20 ++- .../server/share/SharePartitionManager.java | 16 +- .../scala/kafka/server/BrokerServer.scala | 9 +- .../scala/kafka/server/ReplicaManager.scala | 8 +- .../server/share/DelayedShareFetchTest.java | 143 +++++++++++++++++- .../share/SharePartitionManagerTest.java | 11 +- 7 files changed, 193 insertions(+), 18 deletions(-) diff --git a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java index 4ef1b871ec195..a694aa2a75ffb 100644 --- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java +++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java @@ -21,6 +21,7 @@ import kafka.log.remote.RemoteLogManager; import kafka.server.AddPartitionsToTxnManager; import kafka.server.AlterPartitionManager; +import kafka.server.DelayedActionQueue; import kafka.server.DelayedDeleteRecords; import kafka.server.DelayedElectLeader; import kafka.server.DelayedFetch; @@ -216,6 +217,7 @@ public ReplicaManager build() { OptionConverters.toScala(threadNamePrefix), () -> brokerEpoch, OptionConverters.toScala(addPartitionsToTxnManager), - directoryEventHandler); + directoryEventHandler, + new DelayedActionQueue()); } } diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index 359753cc0a263..1b3f3a50133ff 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -16,7 +16,9 @@ */ package kafka.server.share; +import kafka.server.ActionQueue; import kafka.server.DelayedOperation; +import kafka.server.DelayedOperationPurgatory; import kafka.server.LogReadResult; import kafka.server.QuotaFactory; import kafka.server.ReplicaManager; @@ -54,17 +56,23 @@ public class DelayedShareFetch extends DelayedOperation { private final ShareFetchData shareFetchData; private final ReplicaManager replicaManager; private final Map partitionCacheMap; + private final ActionQueue delayedActionQueue; + private final DelayedOperationPurgatory delayedShareFetchPurgatory; private Map topicPartitionDataFromTryComplete; DelayedShareFetch( ShareFetchData shareFetchData, ReplicaManager replicaManager, - Map partitionCacheMap) { + Map partitionCacheMap, + ActionQueue delayedActionQueue, + DelayedOperationPurgatory delayedShareFetchPurgatory) { super(shareFetchData.fetchParams().maxWaitMs, Option.empty()); this.shareFetchData = shareFetchData; this.replicaManager = replicaManager; this.partitionCacheMap = partitionCacheMap; + this.delayedActionQueue = delayedActionQueue; + this.delayedShareFetchPurgatory = delayedShareFetchPurgatory; this.topicPartitionDataFromTryComplete = new LinkedHashMap<>(); } @@ -131,6 +139,16 @@ public void onComplete() { } // Releasing the lock to move ahead with the next request in queue. releasePartitionLocks(shareFetchData.groupId(), topicPartitionData.keySet()); + // If we have a fetch request completed for a topic-partition, we release the locks for that partition, + // then we should check if there is a pending share fetch request for the topic-partition and complete it. + // We add the action to delayed actions queue to avoid an infinite call stack, which could happen if + // we directly call delayedShareFetchPurgatory.checkAndComplete + delayedActionQueue.add(() -> { + result.keySet().forEach(topicIdPartition -> + delayedShareFetchPurgatory.checkAndComplete( + new DelayedShareFetchKey(shareFetchData.groupId(), topicIdPartition))); + return BoxedUnit.UNIT; + }); }); } catch (Exception e) { diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java index 7b13a32c9c95d..e09931be65586 100644 --- a/core/src/main/java/kafka/server/share/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java @@ -16,6 +16,7 @@ */ package kafka.server.share; +import kafka.server.ActionQueue; import kafka.server.DelayedOperationPurgatory; import kafka.server.ReplicaManager; @@ -147,6 +148,11 @@ public class SharePartitionManager implements AutoCloseable { */ private final DelayedOperationPurgatory delayedShareFetchPurgatory; + /** + * The delayed actions queue is used to complete any pending delayed share fetch actions. + */ + private final ActionQueue delayedActionsQueue; + public SharePartitionManager( ReplicaManager replicaManager, Time time, @@ -156,6 +162,7 @@ public SharePartitionManager( int maxInFlightMessages, int shareFetchPurgatoryPurgeIntervalRequests, Persister persister, + ActionQueue delayedActionsQueue, Metrics metrics ) { this(replicaManager, @@ -167,6 +174,7 @@ public SharePartitionManager( maxInFlightMessages, shareFetchPurgatoryPurgeIntervalRequests, persister, + delayedActionsQueue, metrics ); } @@ -181,6 +189,7 @@ private SharePartitionManager( int maxInFlightMessages, int shareFetchPurgatoryPurgeIntervalRequests, Persister persister, + ActionQueue delayedActionsQueue, Metrics metrics ) { this.replicaManager = replicaManager; @@ -197,6 +206,7 @@ private SharePartitionManager( this.persister = persister; this.shareGroupMetrics = new ShareGroupMetrics(Objects.requireNonNull(metrics), time); this.delayedShareFetchPurgatory = new DelayedOperationPurgatory<>("ShareFetch", this.timer, this.replicaManager.localBrokerId(), shareFetchPurgatoryPurgeIntervalRequests, true, true); + this.delayedActionsQueue = delayedActionsQueue; } // Visible for testing. @@ -212,7 +222,8 @@ private SharePartitionManager( int maxInFlightMessages, Persister persister, Metrics metrics, - DelayedOperationPurgatory delayedShareFetchPurgatory + DelayedOperationPurgatory delayedShareFetchPurgatory, + ActionQueue delayedActionsQueue ) { this.replicaManager = replicaManager; this.time = time; @@ -227,6 +238,7 @@ private SharePartitionManager( this.persister = persister; this.shareGroupMetrics = new ShareGroupMetrics(Objects.requireNonNull(metrics), time); this.delayedShareFetchPurgatory = delayedShareFetchPurgatory; + this.delayedActionsQueue = delayedActionsQueue; } /** @@ -600,7 +612,7 @@ void maybeProcessFetchQueue() { new DelayedShareFetchKey(shareFetchData.groupId(), topicIdPartition))); // Add the share fetch to the delayed share fetch purgatory to process the fetch request. - addDelayedShareFetch(new DelayedShareFetch(shareFetchData, replicaManager, partitionCacheMap), + addDelayedShareFetch(new DelayedShareFetch(shareFetchData, replicaManager, partitionCacheMap, delayedActionsQueue, delayedShareFetchPurgatory), delayedShareFetchWatchKeys); // Release the lock so that other threads can process the queue. diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 90fd297ef0876..0d88a50bec49d 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -320,6 +320,11 @@ class BrokerServer( lifecycleManager.propagateDirectoryFailure(directoryId, config.logDirFailureTimeoutMs) } + /** + * TODO: move this action queue to handle thread so we can simplify concurrency handling + */ + val defaultActionQueue = new DelayedActionQueue + this._replicaManager = new ReplicaManager( config = config, metrics = metrics, @@ -338,7 +343,8 @@ class BrokerServer( delayedRemoteFetchPurgatoryParam = None, brokerEpochSupplier = () => lifecycleManager.brokerEpoch, addPartitionsToTxnManager = Some(addPartitionsToTxnManager), - directoryEventHandler = directoryEventHandler + directoryEventHandler = directoryEventHandler, + defaultActionQueue = defaultActionQueue ) /* start token manager */ @@ -423,6 +429,7 @@ class BrokerServer( config.shareGroupConfig.shareGroupPartitionMaxRecordLocks, config.shareGroupConfig.shareFetchPurgatoryPurgeIntervalRequests, persister, + defaultActionQueue, new Metrics() ) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index fee70d8fe371b..58873cad3126a 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -290,7 +290,8 @@ class ReplicaManager(val config: KafkaConfig, threadNamePrefix: Option[String] = None, val brokerEpochSupplier: () => Long = () => -1, addPartitionsToTxnManager: Option[AddPartitionsToTxnManager] = None, - val directoryEventHandler: DirectoryEventHandler = DirectoryEventHandler.NOOP + val directoryEventHandler: DirectoryEventHandler = DirectoryEventHandler.NOOP, + val defaultActionQueue: ActionQueue = new DelayedActionQueue ) extends Logging { private val metricsGroup = new KafkaMetricsGroup(this.getClass) @@ -741,11 +742,6 @@ class ReplicaManager(val config: KafkaConfig, localLog(topicPartition).map(_.parentDir) } - /** - * TODO: move this action queue to handle thread so we can simplify concurrency handling - */ - private val defaultActionQueue = new DelayedActionQueue - def tryCompleteActions(): Unit = defaultActionQueue.tryCompleteActions() /** diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index 3ddd2a79021e1..e4d0c84dfcd20 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -16,6 +16,8 @@ */ package kafka.server.share; +import kafka.server.DelayedActionQueue; +import kafka.server.DelayedOperationPurgatory; import kafka.server.ReplicaManager; import kafka.server.ReplicaQuota; @@ -29,22 +31,35 @@ import org.apache.kafka.server.share.fetch.ShareFetchData; import org.apache.kafka.server.storage.log.FetchIsolation; import org.apache.kafka.server.storage.log.FetchParams; +import org.apache.kafka.server.util.timer.SystemTimer; +import org.apache.kafka.server.util.timer.SystemTimerReaper; +import org.apache.kafka.server.util.timer.Timer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import scala.jdk.javaapi.CollectionConverters; + +import static kafka.server.share.SharePartitionManagerTest.DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL; import static kafka.server.share.SharePartitionManagerTest.PARTITION_MAX_BYTES; +import static kafka.server.share.SharePartitionManagerTest.buildLogReadResult; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; @@ -53,6 +68,18 @@ public class DelayedShareFetchTest { private static final int MAX_WAIT_MS = 5000; + private static Timer mockTimer; + + @BeforeEach + public void setUp() { + mockTimer = new SystemTimerReaper("DelayedShareFetchTestReaper", + new SystemTimer("DelayedShareFetchTestTimer")); + } + + @AfterEach + public void tearDown() throws Exception { + mockTimer.close(); + } @Test public void testDelayedShareFetchTryCompleteReturnsFalse() { @@ -95,6 +122,7 @@ public void testDelayedShareFetchTryCompleteReturnsFalse() { public void testDelayedShareFetchTryCompleteReturnsTrue() { String groupId = "grp"; Uuid topicId = Uuid.randomUuid(); + ReplicaManager replicaManager = mock(ReplicaManager.class); TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0)); TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1)); Map partitionMaxBytes = new HashMap<>(); @@ -118,9 +146,14 @@ public void testDelayedShareFetchTryCompleteReturnsTrue() { when(sp0.canAcquireRecords()).thenReturn(true); when(sp1.canAcquireRecords()).thenReturn(false); + when(sp0.acquire(any(), any())).thenReturn(CompletableFuture.completedFuture( + Collections.singletonList(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)))); + doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetchData) .withPartitionCacheMap(partitionCacheMap) + .withReplicaManager(replicaManager) .build(); assertFalse(delayedShareFetch.isCompleted()); @@ -200,6 +233,9 @@ public void testReplicaManagerFetchShouldHappenOnComplete() { when(sp0.canAcquireRecords()).thenReturn(true); when(sp1.canAcquireRecords()).thenReturn(false); + when(sp0.acquire(any(), any())).thenReturn(CompletableFuture.completedFuture( + Collections.singletonList(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)))); + doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp0))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); DelayedShareFetch delayedShareFetch = DelayedShareFetchBuilder.builder() .withShareFetchData(shareFetchData) .withReplicaManager(replicaManager) @@ -261,10 +297,99 @@ public void testToCompleteAnAlreadyCompletedFuture() { Mockito.verify(delayedShareFetch, times(1)).acquirablePartitions(); } + @Test + public void testForceCompleteTriggersDelayedActionsQueue() { + String groupId = "grp"; + Uuid topicId = Uuid.randomUuid(); + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1)); + TopicIdPartition tp2 = new TopicIdPartition(topicId, new TopicPartition("foo", 2)); + Map partitionMaxBytes1 = new HashMap<>(); + partitionMaxBytes1.put(tp0, PARTITION_MAX_BYTES); + partitionMaxBytes1.put(tp1, PARTITION_MAX_BYTES); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + // No share partition is available for acquiring initially. + when(sp0.maybeAcquireFetchLock()).thenReturn(false); + when(sp1.maybeAcquireFetchLock()).thenReturn(false); + when(sp2.maybeAcquireFetchLock()).thenReturn(false); + + Map partitionCacheMap = new ConcurrentHashMap<>(); + partitionCacheMap.put(new SharePartitionKey(groupId, tp0), sp0); + partitionCacheMap.put(new SharePartitionKey(groupId, tp1), sp1); + partitionCacheMap.put(new SharePartitionKey(groupId, tp2), sp2); + + ShareFetchData shareFetchData1 = new ShareFetchData( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, + 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(), + new CompletableFuture<>(), partitionMaxBytes1); + + DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( + "TestShareFetch", mockTimer, replicaManager.localBrokerId(), + DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, true, true); + + Set delayedShareFetchWatchKeys = new HashSet<>(); + partitionMaxBytes1.keySet().forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchKey(groupId, topicIdPartition))); + + DelayedShareFetch delayedShareFetch1 = DelayedShareFetchTest.DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetchData1) + .withReplicaManager(replicaManager) + .withPartitionCacheMap(partitionCacheMap) + .build(); + + // We add a delayed share fetch entry to the purgatory which will be waiting for completion since neither of the + // partitions in the share fetch request can be acquired. + delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch1, CollectionConverters.asScala(delayedShareFetchWatchKeys).toSeq()); + + assertEquals(2, delayedShareFetchPurgatory.watched()); + assertFalse(shareFetchData1.future().isDone()); + + Map partitionMaxBytes2 = new HashMap<>(); + partitionMaxBytes2.put(tp1, PARTITION_MAX_BYTES); + partitionMaxBytes2.put(tp2, PARTITION_MAX_BYTES); + ShareFetchData shareFetchData2 = new ShareFetchData( + new FetchParams(ApiKeys.SHARE_FETCH.latestVersion(), FetchRequest.ORDINARY_CONSUMER_ID, -1, MAX_WAIT_MS, + 1, 1024 * 1024, FetchIsolation.HIGH_WATERMARK, Optional.empty()), groupId, Uuid.randomUuid().toString(), + new CompletableFuture<>(), partitionMaxBytes2); + + doAnswer(invocation -> buildLogReadResult(Collections.singleton(tp1))).when(replicaManager).readFromLog(any(), any(), any(ReplicaQuota.class), anyBoolean()); + + DelayedActionQueue delayedActionQueue = spy(new DelayedActionQueue()); + DelayedShareFetch delayedShareFetch2 = DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetchData2) + .withReplicaManager(replicaManager) + .withPartitionCacheMap(partitionCacheMap) + .withDelayedActionQueue(delayedActionQueue) + .withDelayedShareFetchPurgatory(delayedShareFetchPurgatory) + .build(); + + // sp1 can be acquired now + when(sp1.maybeAcquireFetchLock()).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + when(sp1.acquire(any(), any())).thenReturn(CompletableFuture.completedFuture( + Collections.singletonList(new ShareFetchResponseData.AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 1)))); + + // when forceComplete is called for delayedShareFetch2, since tp1 is common in between delayed share fetch + // requests, it should add a "check and complete" action for request key tp1 on the purgatory. + delayedShareFetch2.forceComplete(); + assertTrue(delayedShareFetch2.isCompleted()); + assertTrue(shareFetchData2.future().isDone()); + Mockito.verify(replicaManager, times(1)).readFromLog( + any(), any(), any(ReplicaQuota.class), anyBoolean()); + assertFalse(delayedShareFetch1.isCompleted()); + Mockito.verify(delayedActionQueue, times(1)).add(any()); + Mockito.verify(delayedActionQueue, times(0)).tryCompleteActions(); + } + static class DelayedShareFetchBuilder { ShareFetchData shareFetchData = mock(ShareFetchData.class); private ReplicaManager replicaManager = mock(ReplicaManager.class); private Map partitionCacheMap = new HashMap<>(); + private DelayedActionQueue delayedActionsQueue = mock(DelayedActionQueue.class); + private DelayedOperationPurgatory delayedShareFetchPurgatory = mock(DelayedOperationPurgatory.class); DelayedShareFetchBuilder withShareFetchData(ShareFetchData shareFetchData) { this.shareFetchData = shareFetchData; @@ -281,15 +406,27 @@ DelayedShareFetchBuilder withPartitionCacheMap(Map delayedShareFetchPurgatory) { + this.delayedShareFetchPurgatory = delayedShareFetchPurgatory; + return this; + } + public static DelayedShareFetchBuilder builder() { return new DelayedShareFetchBuilder(); } public DelayedShareFetch build() { return new DelayedShareFetch( - shareFetchData, - replicaManager, - partitionCacheMap); + shareFetchData, + replicaManager, + partitionCacheMap, + delayedActionsQueue, + delayedShareFetchPurgatory); } } } diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index da8328b8f261d..b1e1bc627f8bc 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -16,6 +16,7 @@ */ package kafka.server.share; +import kafka.server.DelayedActionQueue; import kafka.server.DelayedOperationPurgatory; import kafka.server.LogReadResult; import kafka.server.ReplicaManager; @@ -130,10 +131,10 @@ public class SharePartitionManagerTest { private static final int RECORD_LOCK_DURATION_MS = 30000; private static final int MAX_DELIVERY_COUNT = 5; private static final short MAX_IN_FLIGHT_MESSAGES = 200; - static final int PARTITION_MAX_BYTES = 40000; private static final int DELAYED_SHARE_FETCH_MAX_WAIT_MS = 2000; - private static final int DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL = 1000; private static final int DELAYED_SHARE_FETCH_TIMEOUT_MS = 3000; + static final int PARTITION_MAX_BYTES = 40000; + static final int DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL = 1000; private static Timer mockTimer; @@ -2270,7 +2271,7 @@ private void assertErroneousAndValidTopicIdPartitions( assertEquals(expectedValidSet, actualValidPartitions); } - private Seq> buildLogReadResult(Set topicIdPartitions) { + static Seq> buildLogReadResult(Set topicIdPartitions) { List> logReadResults = new ArrayList<>(); topicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult( new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY), @@ -2297,6 +2298,7 @@ private static class SharePartitionManagerBuilder { private Metrics metrics = new Metrics(); private ConcurrentLinkedQueue fetchQueue = new ConcurrentLinkedQueue<>(); private DelayedOperationPurgatory delayedShareFetchPurgatory = mock(DelayedOperationPurgatory.class); + private final DelayedActionQueue delayedActionsQueue = mock(DelayedActionQueue.class); private SharePartitionManagerBuilder withReplicaManager(ReplicaManager replicaManager) { this.replicaManager = replicaManager; @@ -2359,7 +2361,8 @@ public SharePartitionManager build() { MAX_IN_FLIGHT_MESSAGES, persister, metrics, - delayedShareFetchPurgatory); + delayedShareFetchPurgatory, + delayedActionsQueue); } } }