diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java index 3b1d5e55781dff..efee11e90ef684 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java @@ -285,6 +285,15 @@ public void routerFailureLocked() { } } + @Override + public void recordAsyncHandlerQueueSize(String nsId, int queueSize) { + if (nameserviceRPCMetricsMap != null) { + NameserviceRPCMetrics nsMetrics = nameserviceRPCMetricsMap.get(nsId); + if (nsMetrics != null) { + nsMetrics.setAsyncHandlerQueueSize(queueSize); + } + } + } /** * Get time between we receiving the operation and sending it to the Namenode. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NameserviceRPCMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NameserviceRPCMetrics.java index 6874de0f602625..09ed6ce072c12c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NameserviceRPCMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/NameserviceRPCMetrics.java @@ -24,6 +24,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.metrics2.lib.MutableRate; import java.util.concurrent.ThreadLocalRandom; @@ -55,6 +56,8 @@ public class NameserviceRPCMetrics implements NameserviceRPCMBean { private MutableCounterLong proxyOpPermitRejected; @Metric("Number of operations accepted to hit a namenode") private MutableCounterLong proxyOpPermitAccepted; + @Metric("Async handler queue size") + private MutableGaugeInt asyncHandlerQueueSize; public NameserviceRPCMetrics(Configuration conf, String nsId) { this.nsId = NAMESERVICE_RPC_METRICS_PREFIX + nsId; @@ -116,6 +119,10 @@ public long getProxyOpPermitAccepted() { return proxyOpPermitAccepted.value(); } + public void setAsyncHandlerQueueSize(int size) { + asyncHandlerQueueSize.set(size); + } + /** * Add the time to proxy an operation from the moment the Router sends it to * the Namenode until it replied. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index cedcddfdeb862d..33262ba123fd2e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -85,6 +85,9 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { public static final String DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY = FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "handler.count"; public static final int DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT = 10; + public static final String DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE = + FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "queue.size"; + public static final int DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE_DEFAULT = 1000; public static final String DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY = FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "responder.count"; public static final int DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT = 10; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java index 27385a2686ce5c..69859bf861547b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java @@ -138,4 +138,12 @@ void init( * If a path is in a read only mount point. */ void routerFailureReadOnly(); + + /** + * Records the size of the async handler queue for the given namespace. + * + * @param nsId the namespace identifier + * @param queueSize the current size of the async queue + */ + void recordAsyncHandlerQueueSize(String nsId, int queueSize); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index c0f57336a7ac2e..672634b363b5e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -25,6 +25,8 @@ import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION; import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT; @@ -58,7 +60,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; -import java.util.HashSet; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -70,7 +72,9 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -293,11 +297,11 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol, private RouterRenameOption routerRenameOption; /** Schedule the router federation rename jobs. */ private BalanceProcedureScheduler fedRenameScheduler; - private boolean enableAsync; - private Map nsAsyncHandlerCount = new ConcurrentHashMap<>(); - private Map asyncRouterHandlerExecutors = new ConcurrentHashMap<>(); + private final boolean enableAsync; + private final Map asyncRouterHandlerExecutors = + new ConcurrentHashMap<>(); + private ThreadPoolExecutor routerDefaultAsyncHandlerExecutor; private ExecutorService routerAsyncResponderExecutor; - private ExecutorService routerDefaultAsyncHandlerExecutor; /** * Construct a router RPC server. @@ -504,34 +508,59 @@ public RouterRpcServer(Configuration conf, Router router, * @param configuration the configuration. */ public void initAsyncThreadPools(Configuration configuration) { - LOG.info("Begin initialize asynchronous handler and responder thread pool."); - initNsAsyncHandlerCount(); Set allConfiguredNS = FederationUtil.getAllConfiguredNS(configuration); - Set unassignedNS = new HashSet<>(); allConfiguredNS.add(CONCURRENT_NS); - + Map nsAsyncActiveHandlerCount = parseNsAsyncHandlerCount(configuration); + initAsyncHandlerThreadPools(configuration, allConfiguredNS, nsAsyncActiveHandlerCount); + initAsyncResponderThreadPools(configuration); + } + + private void initAsyncHandlerThreadPools(Configuration configuration, + Set allConfiguredNS, Map nsAsyncHandlerCount) { + LOG.info("Initializing asynchronous handler thread pools"); + int asyncQueueSize = configuration.getInt(DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE, + DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE_DEFAULT); + if (asyncQueueSize < 1) { + throw new IllegalArgumentException("Async queue size must be at least 1"); + } + int asyncHandlerCountDefault = configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY, + DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT); for (String nsId : allConfiguredNS) { int dedicatedHandlers = nsAsyncHandlerCount.getOrDefault(nsId, 0); - LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId); - if (dedicatedHandlers > 0) { - initAsyncHandlerThreadPools4Ns(nsId, dedicatedHandlers); - LOG.info("Assigned {} async handlers to nsId {} ", dedicatedHandlers, nsId); + if (dedicatedHandlers <= 0) { + dedicatedHandlers = asyncHandlerCountDefault; + LOG.info("Use default async handler count {} for ns {} to init Executors.", + asyncHandlerCountDefault, nsId); } else { - unassignedNS.add(nsId); + LOG.info("Dedicated handlers {} for ns {} to init Executors", dedicatedHandlers, nsId); } - } - - int asyncHandlerCountDefault = configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY, - DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT); - if (!unassignedNS.isEmpty()) { - LOG.warn("Async handler unassigned ns: {}", unassignedNS); - LOG.info("Use default async handler count {} for unassigned ns.", asyncHandlerCountDefault); - for (String nsId : unassignedNS) { - initAsyncHandlerThreadPools4Ns(nsId, asyncHandlerCountDefault); + if (dedicatedHandlers > 0) { + int finalDedicatedHandlers = dedicatedHandlers; + asyncRouterHandlerExecutors.computeIfAbsent(nsId, + id -> { + LinkedBlockingQueue queue = new LinkedBlockingQueue<>(asyncQueueSize); + return new ThreadPoolExecutor(finalDedicatedHandlers, finalDedicatedHandlers, + 0L, TimeUnit.MILLISECONDS, queue, + new AsyncThreadFactory("Router Async Handler for " + nsId + " #")); + }); + LOG.info("Assigned {} async handlers with queue size {} to nsId {}", dedicatedHandlers, + asyncQueueSize, nsId); } } + if (routerDefaultAsyncHandlerExecutor == null) { + LOG.info("Initializing router default async handler executor, count={}", + asyncHandlerCountDefault); + LinkedBlockingQueue queue = new LinkedBlockingQueue<>(asyncQueueSize); + routerDefaultAsyncHandlerExecutor = new ThreadPoolExecutor(asyncHandlerCountDefault, + asyncHandlerCountDefault, 0L, TimeUnit.MILLISECONDS, queue, + new AsyncThreadFactory("Router Default Async Handler #")); + } + } + + private void initAsyncResponderThreadPools(Configuration configuration) { + LOG.info("Initializing asynchronous responder thread pool."); int asyncResponderCount = configuration.getInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY, DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT); if (routerAsyncResponderExecutor == null) { @@ -540,21 +569,17 @@ public void initAsyncThreadPools(Configuration configuration) { asyncResponderCount, new AsyncThreadFactory("Router Async Responder #")); } AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(routerAsyncResponderExecutor); - - if (routerDefaultAsyncHandlerExecutor == null) { - LOG.info("init router async default executor handler count: {}", asyncHandlerCountDefault); - routerDefaultAsyncHandlerExecutor = Executors.newFixedThreadPool( - asyncHandlerCountDefault, new AsyncThreadFactory("Router Async Default Handler #")); - } } - private void initNsAsyncHandlerCount() { - String configNsHandler = conf.get(DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY, + private Map parseNsAsyncHandlerCount(Configuration config) { + String configNsHandler = config.get(DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY, DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT); + Map nsAsyncHandlerCount = new HashMap<>(); if (StringUtils.isEmpty(configNsHandler)) { - LOG.error( - "The value of config key: {} is empty. Will use default conf.", + LOG.debug("No per-namespace async handler counts configured ({}). " + + "Will use default handler count for all namespaces.", DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY); + return nsAsyncHandlerCount; } String[] nsHandlers = configNsHandler.split(","); for (String nsHandlerInfo : nsHandlers) { @@ -567,11 +592,19 @@ private void initNsAsyncHandlerCount() { } nsAsyncHandlerCount.put(nsHandlerItems[0], Integer.parseInt(nsHandlerItems[1])); } + return nsAsyncHandlerCount; } - private void initAsyncHandlerThreadPools4Ns(String nsId, int dedicatedHandlers) { - asyncRouterHandlerExecutors.computeIfAbsent(nsId, id -> Executors.newFixedThreadPool( - dedicatedHandlers, new AsyncThreadFactory("Router Async Handler for " + id + " #"))); + /** + * Returns the asynchronous handler executor for the specified namespace. + * If no executor is configured for the given namespace ID, returns the default executor. + * Requires async RPC pools to be initialized (async RPC enabled). + * + * @param nsId the namespace identifier + * @return the corresponding thread pool + */ + public ThreadPoolExecutor getAsyncExecutorForNamespace(String nsId) { + return asyncRouterHandlerExecutors.getOrDefault(nsId, routerDefaultAsyncHandlerExecutor); } /** @@ -683,6 +716,16 @@ protected void serviceStop() throws Exception { if (this.fedRenameScheduler != null) { fedRenameScheduler.shutDown(); } + for (ThreadPoolExecutor executor : asyncRouterHandlerExecutors.values()) { + executor.shutdownNow(); + } + asyncRouterHandlerExecutors.clear(); + if (routerDefaultAsyncHandlerExecutor != null) { + routerDefaultAsyncHandlerExecutor.shutdownNow(); + } + if (routerAsyncResponderExecutor != null) { + routerAsyncResponderExecutor.shutdownNow(); + } super.serviceStop(); } @@ -2490,14 +2533,6 @@ public boolean isAsync() { return this.enableAsync; } - public Map getAsyncRouterHandlerExecutors() { - return asyncRouterHandlerExecutors; - } - - public ExecutorService getRouterAsyncHandlerDefaultExecutor() { - return routerDefaultAsyncHandlerExecutor; - } - private static class AsyncThreadFactory implements ThreadFactory { private final String namePrefix; private final AtomicInteger threadNumber = new AtomicInteger(1); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java index 13f6dd3b952ebf..ebc5601c3bffea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/async/RouterAsyncRpcClient.java @@ -57,6 +57,8 @@ import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException; import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply; @@ -173,13 +175,19 @@ public Object invokeMethod( + router.getRouterId()); } String nsid = namenodes.get(0).getNameserviceId(); + final ThreadPoolExecutor nsExecutor = router.getRpcServer().getAsyncExecutorForNamespace(nsid); + if (rpcMonitor != null) { + rpcMonitor.recordAsyncHandlerQueueSize(nsid, nsExecutor.getQueue().size()); + } + // transfer threadLocalContext to worker threads of executor. ThreadLocalContext threadLocalContext = new ThreadLocalContext(); asyncComplete(null); + // Returns a CompletableFuture with RejectedExecutionException if nsExecutor is full. asyncApplyUseExecutor((AsyncApplyFunction) o -> { if (LOG.isDebugEnabled()) { - LOG.debug("Async invoke method : {}, {}, {}, {}", method.getName(), useObserver, - namenodes.toString(), params); + LOG.debug("Async invoke method : {}, {}, {}, {}", method.getName(), useObserver, namenodes, + params); } threadLocalContext.transfer(); RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); @@ -190,8 +198,23 @@ public Object invokeMethod( releasePermit(nsid, ugi, method, controller); return object; }); - }, router.getRpcServer().getAsyncRouterHandlerExecutors().getOrDefault(nsid, - router.getRpcServer().getRouterAsyncHandlerDefaultExecutor())); + }, nsExecutor); + + // Catch the RejectedExecutionException and convert it to StandbyException + asyncCatch((ret, e) -> { + int queueSize = nsExecutor.getQueue().size(); + if (nsExecutor.isShutdown()) { + LOG.warn("Async handler executor for namespace '{}' is shutting down; task not scheduled" + + " (queue size: {})", nsid, queueSize, e); + throw new StandbyException( + "Namespace '" + nsid + "' async handler is shutting down (queue size: " + queueSize + + ")"); + } + LOG.warn("Async handler executor rejected task for namespace '{}' (queue size: {}). {}", nsid, + queueSize, e.getMessage()); + throw new StandbyException( + "Namespace '" + nsid + "' is overloaded (queue size: " + queueSize + ")"); + }, RejectedExecutionException.class); return null; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index 470dc61e8eb410..31dcbd9b8c602e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -66,6 +66,16 @@ + + dfs.federation.router.async.rpc.queue.size + 1000 + + The bounded queue capacity for each per-nameservice async handler thread + pool. When the queue is full, new requests are rejected with a + StandbyException to prevent unbounded memory growth. Must be at least 1. + + + dfs.federation.router.async.rpc.responder.count 10 diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncHandlerQueueOverflow.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncHandlerQueueOverflow.java new file mode 100644 index 00000000000000..ed2c1d70deba0b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/async/TestRouterAsyncHandlerQueueOverflow.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router.async; + +import java.util.EnumSet; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadPoolExecutor; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.mockito.Mockito; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.fairness.RouterAsyncRpcFairnessPolicyController; +import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; +import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod; +import org.apache.hadoop.hdfs.server.federation.router.RemoteParam; +import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapterMockitoUtil; +import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.LambdaTestUtils; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; +import static org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.DEFAULT_HEARTBEAT_INTERVAL_MS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_MAX_ASYNCCALL_PERMIT_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE; +import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.syncReturn; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; + +public class TestRouterAsyncHandlerQueueOverflow { + /** + * Federated HDFS cluster. + */ + private static MiniRouterDFSCluster cluster; + private static String ns0; + private static RouterRpcServer routerRpcServer; + private static RouterAsyncRpcClient asyncRpcClient; + + private final static int QUEUE_CAP = 2; + private static CountDownLatch testLatch; + + @BeforeAll + public static void setUpCluster() throws Exception { + cluster = new MiniRouterDFSCluster(true, 2, 3, DEFAULT_HEARTBEAT_INTERVAL_MS, 1000); + cluster.startCluster(); + + // Making one Namenode active per nameservice + if (cluster.isHighAvailability()) { + for (String ns : cluster.getNameservices()) { + cluster.switchToActive(ns, NAMENODES[0]); + cluster.switchToStandby(ns, NAMENODES[1]); + cluster.switchToObserver(ns, NAMENODES[2]); + } + } + // Start routers with only an RPC service + Configuration routerConf = new RouterConfigBuilder().metrics().rpc().build(); + + routerConf.setInt(DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE, QUEUE_CAP); + routerConf.setInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY, 1); + routerConf.setInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY, 1); + routerConf.set(DFS_ROUTER_MONITOR_NAMENODE, + cluster.getNameservices().get(0) + "," + cluster.getNameservices().get(1)); + routerConf.setClass(DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS, + RouterAsyncRpcFairnessPolicyController.class, RouterRpcFairnessPolicyController.class); + routerConf.setInt(DFS_ROUTER_FAIRNESS_ACQUIRE_TIMEOUT, 60000); + routerConf.setInt(DFS_ROUTER_ASYNC_RPC_MAX_ASYNCCALL_PERMIT_KEY, 1); + cluster.addRouterOverrides(routerConf); + cluster.startRouters(); + + cluster.registerNamenodes(); + cluster.waitNamenodeRegistration(); + cluster.waitActiveNamespaces(); + + testLatch = new CountDownLatch(1); + ns0 = cluster.getNameservices().get(0); + MiniRouterDFSCluster.NamenodeContext nn0 = cluster.getNamenode(ns0, null); + FSNamesystem spyNamesystem = NameNodeAdapterMockitoUtil.spyOnNamesystem(nn0.getNamenode()); + // Mock one slow operation. Any public interface from FSNamesystem will do. + Mockito.doAnswer(invocationOnMock -> { + String invokePath = invocationOnMock.getArgument(1); + if (invokePath.startsWith("/veryBigOperation")) { + testLatch.await(); + } else { + return invocationOnMock.callRealMethod(); + } + return null; + }).when(spyNamesystem).getFilesBlockingDecom(anyLong(), anyString()); + + MiniRouterDFSCluster.RouterContext router = cluster.getRandomRouter(); + routerRpcServer = router.getRouterRpcServer(); + routerRpcServer.initAsyncThreadPools(routerConf); + asyncRpcClient = new RouterAsyncRpcClient(routerConf, router.getRouter(), + routerRpcServer.getNamenodeResolver(), routerRpcServer.getRPCMonitor(), + routerRpcServer.getRouterStateIdContext()); + } + + @AfterAll + public static void shutdownCluster() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + @Timeout(value = 10) + public void testInvokeMethodQueueOverflow() throws Exception { + RemoteMethod method = + new RemoteMethod("listOpenFiles", new Class[] {long.class, EnumSet.class, String.class}, + 0, EnumSet.of(OpenFilesIterator.OpenFilesType.BLOCKING_DECOMMISSION), + new RemoteParam()); + UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); + Class protocol = method.getProtocol(); + String bigPath = "/veryBigOperation"; + Object[] params = + new Object[] {0, EnumSet.of(OpenFilesIterator.OpenFilesType.BLOCKING_DECOMMISSION), + bigPath}; + List namenodes = + asyncRpcClient.getOrderedNamenodes(ns0, true); + // Downstream namespace processing this huge request + asyncRpcClient.invokeMethod(ugi, namenodes, true, protocol, method.getMethod(), params); + ThreadPoolExecutor nsExecutor = routerRpcServer.getAsyncExecutorForNamespace(ns0); + Thread.sleep(500); + assertEquals(0, nsExecutor.getQueue().size()); + // Successfully sent this request downstream, but all subsequent ones will get stuck + assertEquals(1, nsExecutor.getCompletedTaskCount()); + + // Async handler handling, blocking at acquirePermit + asyncRpcClient.invokeMethod(ugi, namenodes, true, protocol, method.getMethod(), params); + Thread.sleep(500); + assertEquals(0, nsExecutor.getQueue().size()); + assertEquals(1, nsExecutor.getCompletedTaskCount()); + // Stuck in queue + asyncRpcClient.invokeMethod(ugi, namenodes, true, protocol, method.getMethod(), params); + assertEquals(1, nsExecutor.getQueue().size()); + // Insert one more call, also stuck in queue + asyncRpcClient.invokeMethod(ugi, namenodes, true, protocol, method.getMethod(), params); + assertEquals(2, nsExecutor.getQueue().size()); + // Queue full, rejected + asyncRpcClient.invokeMethod(ugi, namenodes, true, protocol, method.getMethod(), params); + assertEquals(2, nsExecutor.getQueue().size()); + String expectedMsg = "Namespace '" + ns0 + "' is overloaded (queue size: " + QUEUE_CAP + ")"; + LambdaTestUtils.intercept(StandbyException.class, expectedMsg, + () -> syncReturn(FileStatus.class)); + // Unstuck the namenode so we can terminate this test + testLatch.countDown(); + } +}