Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,15 @@ public void routerFailureLocked() {
}
}

@Override
public void recordAsyncHandlerQueueSize(String nsId, int queueSize) {
if (nameserviceRPCMetricsMap != null) {
NameserviceRPCMetrics nsMetrics = nameserviceRPCMetricsMap.get(nsId);
if (nsMetrics != null) {
nsMetrics.setAsyncHandlerQueueSize(queueSize);
}
}
}

/**
* Get time between we receiving the operation and sending it to the Namenode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.metrics2.lib.MutableRate;

import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -55,6 +56,8 @@ public class NameserviceRPCMetrics implements NameserviceRPCMBean {
private MutableCounterLong proxyOpPermitRejected;
@Metric("Number of operations accepted to hit a namenode")
private MutableCounterLong proxyOpPermitAccepted;
@Metric("Async handler queue size")
private MutableGaugeInt asyncHandlerQueueSize;

public NameserviceRPCMetrics(Configuration conf, String nsId) {
this.nsId = NAMESERVICE_RPC_METRICS_PREFIX + nsId;
Expand Down Expand Up @@ -116,6 +119,10 @@ public long getProxyOpPermitAccepted() {
return proxyOpPermitAccepted.value();
}

public void setAsyncHandlerQueueSize(int size) {
asyncHandlerQueueSize.set(size);
}

/**
* Add the time to proxy an operation from the moment the Router sends it to
* the Namenode until it replied.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
public static final String DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY =
FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "handler.count";
public static final int DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT = 10;
public static final String DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE =
FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "queue.size";
public static final int DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE_DEFAULT = 1000;
public static final String DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY =
FEDERATION_ROUTER_ASYNC_RPC_PREFIX + "responder.count";
public static final int DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,12 @@ void init(
* If a path is in a read only mount point.
*/
void routerFailureReadOnly();

/**
* Records the size of the async handler queue for the given namespace.
*
* @param nsId the namespace identifier
* @param queueSize the current size of the async queue
*/
void recordAsyncHandlerQueueSize(String nsId, int queueSize);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION;
import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_OPTION_DEFAULT;
Expand Down Expand Up @@ -58,7 +60,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
Expand All @@ -70,7 +72,9 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -293,11 +297,11 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
private RouterRenameOption routerRenameOption;
/** Schedule the router federation rename jobs. */
private BalanceProcedureScheduler fedRenameScheduler;
private boolean enableAsync;
private Map<String, Integer> nsAsyncHandlerCount = new ConcurrentHashMap<>();
private Map<String, ExecutorService> asyncRouterHandlerExecutors = new ConcurrentHashMap<>();
private final boolean enableAsync;
private final Map<String, ThreadPoolExecutor> asyncRouterHandlerExecutors =
new ConcurrentHashMap<>();
private ThreadPoolExecutor routerDefaultAsyncHandlerExecutor;
private ExecutorService routerAsyncResponderExecutor;
private ExecutorService routerDefaultAsyncHandlerExecutor;

/**
* Construct a router RPC server.
Expand Down Expand Up @@ -504,34 +508,59 @@ public RouterRpcServer(Configuration conf, Router router,
* @param configuration the configuration.
*/
public void initAsyncThreadPools(Configuration configuration) {
LOG.info("Begin initialize asynchronous handler and responder thread pool.");
initNsAsyncHandlerCount();
Set<String> allConfiguredNS = FederationUtil.getAllConfiguredNS(configuration);
Set<String> unassignedNS = new HashSet<>();
allConfiguredNS.add(CONCURRENT_NS);

Map<String, Integer> nsAsyncActiveHandlerCount = parseNsAsyncHandlerCount(configuration);
initAsyncHandlerThreadPools(configuration, allConfiguredNS, nsAsyncActiveHandlerCount);
initAsyncResponderThreadPools(configuration);
}

private void initAsyncHandlerThreadPools(Configuration configuration,
Set<String> allConfiguredNS, Map<String, Integer> nsAsyncHandlerCount) {
LOG.info("Initializing asynchronous handler thread pools");
int asyncQueueSize = configuration.getInt(DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE,
DFS_ROUTER_ASYNC_RPC_QUEUE_SIZE_DEFAULT);
if (asyncQueueSize < 1) {
throw new IllegalArgumentException("Async queue size must be at least 1");
}
int asyncHandlerCountDefault = configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY,
DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT);
for (String nsId : allConfiguredNS) {
int dedicatedHandlers = nsAsyncHandlerCount.getOrDefault(nsId, 0);
LOG.info("Dedicated handlers {} for ns {} ", dedicatedHandlers, nsId);
if (dedicatedHandlers > 0) {
initAsyncHandlerThreadPools4Ns(nsId, dedicatedHandlers);
LOG.info("Assigned {} async handlers to nsId {} ", dedicatedHandlers, nsId);
if (dedicatedHandlers <= 0) {
dedicatedHandlers = asyncHandlerCountDefault;
LOG.info("Use default async handler count {} for ns {} to init Executors.",
asyncHandlerCountDefault, nsId);
} else {
unassignedNS.add(nsId);
LOG.info("Dedicated handlers {} for ns {} to init Executors", dedicatedHandlers, nsId);
}
}

int asyncHandlerCountDefault = configuration.getInt(DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_KEY,
DFS_ROUTER_ASYNC_RPC_HANDLER_COUNT_DEFAULT);

if (!unassignedNS.isEmpty()) {
LOG.warn("Async handler unassigned ns: {}", unassignedNS);
LOG.info("Use default async handler count {} for unassigned ns.", asyncHandlerCountDefault);
for (String nsId : unassignedNS) {
initAsyncHandlerThreadPools4Ns(nsId, asyncHandlerCountDefault);
if (dedicatedHandlers > 0) {
int finalDedicatedHandlers = dedicatedHandlers;
asyncRouterHandlerExecutors.computeIfAbsent(nsId,
id -> {
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(asyncQueueSize);
return new ThreadPoolExecutor(finalDedicatedHandlers, finalDedicatedHandlers,
0L, TimeUnit.MILLISECONDS, queue,
new AsyncThreadFactory("Router Async Handler for " + nsId + " #"));
});
LOG.info("Assigned {} async handlers with queue size {} to nsId {}", dedicatedHandlers,
asyncQueueSize, nsId);
}
}

if (routerDefaultAsyncHandlerExecutor == null) {
LOG.info("Initializing router default async handler executor, count={}",
asyncHandlerCountDefault);
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(asyncQueueSize);
routerDefaultAsyncHandlerExecutor = new ThreadPoolExecutor(asyncHandlerCountDefault,
asyncHandlerCountDefault, 0L, TimeUnit.MILLISECONDS, queue,
new AsyncThreadFactory("Router Default Async Handler #"));
}
}

private void initAsyncResponderThreadPools(Configuration configuration) {
LOG.info("Initializing asynchronous responder thread pool.");
int asyncResponderCount = configuration.getInt(DFS_ROUTER_ASYNC_RPC_RESPONDER_COUNT_KEY,
DFS_ROUTER_ASYNCRPC_RESPONDER_COUNT_DEFAULT);
if (routerAsyncResponderExecutor == null) {
Expand All @@ -540,21 +569,17 @@ public void initAsyncThreadPools(Configuration configuration) {
asyncResponderCount, new AsyncThreadFactory("Router Async Responder #"));
}
AsyncRpcProtocolPBUtil.setAsyncResponderExecutor(routerAsyncResponderExecutor);

if (routerDefaultAsyncHandlerExecutor == null) {
LOG.info("init router async default executor handler count: {}", asyncHandlerCountDefault);
routerDefaultAsyncHandlerExecutor = Executors.newFixedThreadPool(
asyncHandlerCountDefault, new AsyncThreadFactory("Router Async Default Handler #"));
}
}

private void initNsAsyncHandlerCount() {
String configNsHandler = conf.get(DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY,
private Map<String, Integer> parseNsAsyncHandlerCount(Configuration config) {
String configNsHandler = config.get(DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY,
DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_DEFAULT);
Map<String, Integer> nsAsyncHandlerCount = new HashMap<>();
if (StringUtils.isEmpty(configNsHandler)) {
LOG.error(
"The value of config key: {} is empty. Will use default conf.",
LOG.debug("No per-namespace async handler counts configured ({}). "
+ "Will use default handler count for all namespaces.",
DFS_ROUTER_ASYNC_RPC_NS_HANDLER_COUNT_KEY);
return nsAsyncHandlerCount;
}
String[] nsHandlers = configNsHandler.split(",");
for (String nsHandlerInfo : nsHandlers) {
Expand All @@ -567,11 +592,19 @@ private void initNsAsyncHandlerCount() {
}
nsAsyncHandlerCount.put(nsHandlerItems[0], Integer.parseInt(nsHandlerItems[1]));
}
return nsAsyncHandlerCount;
}

private void initAsyncHandlerThreadPools4Ns(String nsId, int dedicatedHandlers) {
asyncRouterHandlerExecutors.computeIfAbsent(nsId, id -> Executors.newFixedThreadPool(
dedicatedHandlers, new AsyncThreadFactory("Router Async Handler for " + id + " #")));
/**
* Returns the asynchronous handler executor for the specified namespace.
* If no executor is configured for the given namespace ID, returns the default executor.
* Requires async RPC pools to be initialized (async RPC enabled).
*
* @param nsId the namespace identifier
* @return the corresponding thread pool
*/
public ThreadPoolExecutor getAsyncExecutorForNamespace(String nsId) {
return asyncRouterHandlerExecutors.getOrDefault(nsId, routerDefaultAsyncHandlerExecutor);
}

/**
Expand Down Expand Up @@ -683,6 +716,16 @@ protected void serviceStop() throws Exception {
if (this.fedRenameScheduler != null) {
fedRenameScheduler.shutDown();
}
for (ThreadPoolExecutor executor : asyncRouterHandlerExecutors.values()) {
executor.shutdownNow();
}
asyncRouterHandlerExecutors.clear();
if (routerDefaultAsyncHandlerExecutor != null) {
routerDefaultAsyncHandlerExecutor.shutdownNow();
}
if (routerAsyncResponderExecutor != null) {
routerAsyncResponderExecutor.shutdownNow();
}
super.serviceStop();
}

Expand Down Expand Up @@ -2490,14 +2533,6 @@ public boolean isAsync() {
return this.enableAsync;
}

public Map<String, ExecutorService> getAsyncRouterHandlerExecutors() {
return asyncRouterHandlerExecutors;
}

public ExecutorService getRouterAsyncHandlerDefaultExecutor() {
return routerDefaultAsyncHandlerExecutor;
}

private static class AsyncThreadFactory implements ThreadFactory {
private final String namePrefix;
private final AtomicInteger threadNumber = new AtomicInteger(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;

import static org.apache.hadoop.hdfs.server.federation.router.async.utils.Async.warpCompletionException;
import static org.apache.hadoop.hdfs.server.federation.router.async.utils.AsyncUtil.asyncApply;
Expand Down Expand Up @@ -173,13 +175,19 @@ public Object invokeMethod(
+ router.getRouterId());
}
String nsid = namenodes.get(0).getNameserviceId();
final ThreadPoolExecutor nsExecutor = router.getRpcServer().getAsyncExecutorForNamespace(nsid);
if (rpcMonitor != null) {
rpcMonitor.recordAsyncHandlerQueueSize(nsid, nsExecutor.getQueue().size());
}

// transfer threadLocalContext to worker threads of executor.
ThreadLocalContext threadLocalContext = new ThreadLocalContext();
asyncComplete(null);
// Returns a CompletableFuture with RejectedExecutionException if nsExecutor is full.
asyncApplyUseExecutor((AsyncApplyFunction<Object, Object>) o -> {
if (LOG.isDebugEnabled()) {
LOG.debug("Async invoke method : {}, {}, {}, {}", method.getName(), useObserver,
namenodes.toString(), params);
LOG.debug("Async invoke method : {}, {}, {}, {}", method.getName(), useObserver, namenodes,
params);
}
threadLocalContext.transfer();
RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController();
Expand All @@ -190,8 +198,23 @@ public Object invokeMethod(
releasePermit(nsid, ugi, method, controller);
return object;
});
}, router.getRpcServer().getAsyncRouterHandlerExecutors().getOrDefault(nsid,
router.getRpcServer().getRouterAsyncHandlerDefaultExecutor()));
}, nsExecutor);

// Catch the RejectedExecutionException and convert it to StandbyException
asyncCatch((ret, e) -> {
int queueSize = nsExecutor.getQueue().size();
if (nsExecutor.isShutdown()) {
LOG.warn("Async handler executor for namespace '{}' is shutting down; task not scheduled"
+ " (queue size: {})", nsid, queueSize, e);
throw new StandbyException(
"Namespace '" + nsid + "' async handler is shutting down (queue size: " + queueSize
+ ")");
}
LOG.warn("Async handler executor rejected task for namespace '{}' (queue size: {}). {}", nsid,
queueSize, e.getMessage());
throw new StandbyException(
"Namespace '" + nsid + "' is overloaded (queue size: " + queueSize + ")");
}, RejectedExecutionException.class);
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@
</description>
</property>

<property>
<name>dfs.federation.router.async.rpc.queue.size</name>
<value>1000</value>
<description>
The bounded queue capacity for each per-nameservice async handler thread
pool. When the queue is full, new requests are rejected with a
StandbyException to prevent unbounded memory growth. Must be at least 1.
</description>
</property>

<property>
<name>dfs.federation.router.async.rpc.responder.count</name>
<value>10</value>
Expand Down
Loading
Loading