From e4331a6d94808448f27fd23626078c850afa34aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EC=9D=B4=EC=9C=A0=EB=B9=84?= Date: Tue, 24 Aug 2021 16:55:09 +0900 Subject: [PATCH 1/2] HADOOP-17861. improve YARN Registry DNS Server qps --- .../client/api/DNSOperationsFactory.java | 2 +- .../client/api/RegistryConstants.java | 7 + .../registry/server/dns/RegistryDNS.java | 168 +++++++++++------- 3 files changed, 115 insertions(+), 62 deletions(-) diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/api/DNSOperationsFactory.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/api/DNSOperationsFactory.java index 23467ebd15c8c1..f3854bc8e6b136 100644 --- a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/api/DNSOperationsFactory.java +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/api/DNSOperationsFactory.java @@ -63,7 +63,7 @@ public static DNSOperations createInstance(String name, DNSOperations operations = null; switch (impl) { case DNSJAVA: - operations = new RegistryDNS(name); + operations = new RegistryDNS(name, conf); break; default: diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/api/RegistryConstants.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/api/RegistryConstants.java index f9c0fd77d1739e..2af1d0acb579f5 100644 --- a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/api/RegistryConstants.java +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/api/RegistryConstants.java @@ -385,4 +385,11 @@ public interface RegistryConstants { * {@value}. */ String SUBPATH_COMPONENTS = "/components/"; + + + /** + * num of threads for serving dns query. + */ + String KEY_NUM_THREADS = DNS_PREFIX + "num-threads"; + } diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNS.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNS.java index ed6c906cda90db..e3efb5eef7041a 100644 --- a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNS.java +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNS.java @@ -128,6 +128,7 @@ public class RegistryDNS extends AbstractService implements DNSOperations, static final int FLAG_DNSSECOK = 1; static final int FLAG_SIGONLY = 2; + private static final int DEFAULT_NUM_THREADS = 4; private static final Logger LOG = LoggerFactory.getLogger(RegistryDNS.class); @@ -169,7 +170,12 @@ public class RegistryDNS extends AbstractService implements DNSOperations, */ public RegistryDNS(String name) { super(name); - executor = HadoopExecutors.newCachedThreadPool( + int nThreads = Runtime.getRuntime().availableProcessors() / 4; + if (nThreads < DEFAULT_NUM_THREADS) { + nThreads = DEFAULT_NUM_THREADS; + } + executor = HadoopExecutors.newFixedThreadPool( + nThreads, new ThreadFactory() { private AtomicInteger counter = new AtomicInteger(1); @@ -182,6 +188,27 @@ public Thread newThread(Runnable r) { }); } + public RegistryDNS(String name, Configuration conf) { + super(name); + int nThreads = conf.getInt(KEY_NUM_THREADS, Runtime.getRuntime().availableProcessors() / 4); + if (nThreads < DEFAULT_NUM_THREADS) { + nThreads = DEFAULT_NUM_THREADS; + } + LOG.info("using {} threads for serving dns query", nThreads); + executor = HadoopExecutors.newFixedThreadPool( + nThreads, + new ThreadFactory() { + private AtomicInteger counter = new AtomicInteger(1); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, + "RegistryDNS " + + counter.getAndIncrement()); + } + }); + } + public void initializeChannels(Configuration conf) throws Exception { if (channelsInitialized) { return; @@ -807,6 +834,65 @@ public void nioTCPClient(SocketChannel ch) throws IOException { } + /** + * Process a UDP request. + * + * @param channel the datagram channel for the request. + * @param remoteAddress the socket address of client. + * @param input the input ByteBuffer. + * @throws IOException if the udp processing generates an issue. + */ + public void nioUDPClient(DatagramChannel channel, SocketAddress remoteAddress, ByteBuffer input) throws IOException { + ByteBuffer output = ByteBuffer.allocate(4096); + byte[] in = null; + byte[] response = null; + Message query = null; + try { + try { + int position = input.position(); + in = new byte[position]; + input.flip(); + input.get(in); + query = new Message(in); + LOG.info("{}: received UDP query {}", remoteAddress, + query.getQuestion()); + response = generateReply(query, null); + if (response.length > output.capacity()) { + LOG.warn("{}: Response of UDP query {} exceeds limit {}", + remoteAddress, query.getQuestion(), output.limit()); + query.getHeader().setFlag(Flags.TC); + response = query.toWire(); + } + if (response == null) { + return; + } + } catch (IOException e) { + response = formErrorMessage(in); + if (response == null) { + LOG.debug("Error during create an error message." + + " Failed to parse a header", e); + return; + } + } + output.clear(); + output.put(response); + output.flip(); + + LOG.debug("{}: sending response", remoteAddress); + channel.send(output, remoteAddress); + } catch (Exception e) { + if (e instanceof IOException && remoteAddress != null) { + throw NetUtils.wrapException(channel.socket().getInetAddress().getHostName(), + channel.socket().getPort(), + ((InetSocketAddress) remoteAddress).getHostName(), + ((InetSocketAddress) remoteAddress).getPort(), + (IOException) e); + } else { + throw e; + } + } + } + /** * Calculate the inbound message length, which is related in the message as an * unsigned short value. @@ -834,9 +920,8 @@ private int getMessgeLength(ByteBuffer buf) throws EOFException { */ public void serveNIOTCP(ServerSocketChannel serverSocketChannel, InetAddress addr, int port) throws Exception { - try { - - while (true) { + while (true) { + try { final SocketChannel socketChannel = serverSocketChannel.accept(); if (socketChannel != null) { executor.submit(new Callable() { @@ -850,10 +935,9 @@ public Boolean call() throws Exception { } else { Thread.sleep(500); } + } catch (Exception e) { + LOG.error("Error during accept", e); } - } catch (IOException e) { - throw NetUtils.wrapException(addr.getHostName(), port, - addr.getHostName(), port, e); } } @@ -870,7 +954,7 @@ private ServerSocketChannel openTCPChannel(InetAddress addr, int port) ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); try { serverSocketChannel.socket().bind(new InetSocketAddress(addr, port)); - serverSocketChannel.configureBlocking(false); + serverSocketChannel.configureBlocking(true); } catch (IOException e) { throw NetUtils.wrapException(null, 0, InetAddress.getLocalHost().getHostName(), @@ -919,7 +1003,7 @@ public void addNIOUDP(final InetAddress addr, final int port) @Override public Boolean call() throws Exception { try { - serveNIOUDP(udpChannel, addr, port); + serveNIOUDP(udpChannel); } catch (Exception e) { LOG.error("Error initializing DNS UDP listener", e); throw e; @@ -933,60 +1017,22 @@ public Boolean call() throws Exception { * Process an inbound UDP request. * * @param channel the UDP datagram channel. - * @param addr local host address. - * @param port local port. - * @throws IOException if the UDP processing fails. */ - private synchronized void serveNIOUDP(DatagramChannel channel, - InetAddress addr, int port) throws Exception { - SocketAddress remoteAddress = null; - try { - - ByteBuffer input = ByteBuffer.allocate(4096); - ByteBuffer output = ByteBuffer.allocate(4096); - byte[] in = null; - - while (true) { + private synchronized void serveNIOUDP(DatagramChannel channel) { + while (true) { + try { + ByteBuffer input = ByteBuffer.allocate(4096); input.clear(); - try { - remoteAddress = channel.receive(input); - } catch (IOException e) { - LOG.debug("Error during message receipt", e); - continue; - } - Message query; - byte[] response = null; - try { - int position = input.position(); - in = new byte[position]; - input.flip(); - input.get(in); - query = new Message(in); - LOG.info("{}: received UDP query {}", remoteAddress, - query.getQuestion()); - response = generateReply(query, null); - if (response == null) { - continue; + SocketAddress remoteAddress = channel.receive(input); + executor.submit(new Callable() { + @Override + public Boolean call() throws Exception { + nioUDPClient(channel, remoteAddress, input); + return true; } - } catch (IOException e) { - response = formErrorMessage(in); - } - output.clear(); - output.put(response); - output.flip(); - - LOG.debug("{}: sending response", remoteAddress); - channel.send(output, remoteAddress); - } - } catch (Exception e) { - if (e instanceof IOException && remoteAddress != null) { - throw NetUtils.wrapException(addr.getHostName(), - port, - ((InetSocketAddress) remoteAddress).getHostName(), - ((InetSocketAddress) remoteAddress).getPort(), - (IOException) e); - } else { - throw e; + }); + } catch (Exception e) { + LOG.debug("Error during message receive", e); } } } @@ -1394,7 +1440,7 @@ byte addAnswer(Message response, Name name, int type, int dclass, } } else if (sr.isSuccessful()) { List rrsets = sr.answers(); - LOG.info("found answers {}", rrsets); + LOG.debug("found answers {}", rrsets); for (RRset rrset : rrsets) { addRRset(name, response, rrset, Section.ANSWER, flags); } From 633004ed7e7c35e3524133b29c724b17031d04a1 Mon Sep 17 00:00:00 2001 From: Yubi Lee Date: Sun, 26 Apr 2026 08:41:51 +0900 Subject: [PATCH 2/2] HADOOP-17861. Address PR feedback --- .../registry/server/dns/RegistryDNS.java | 130 +++++++++++------- 1 file changed, 78 insertions(+), 52 deletions(-) diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNS.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNS.java index e3efb5eef7041a..7b429bea74765f 100644 --- a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNS.java +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNS.java @@ -794,21 +794,12 @@ public void nioTCPClient(SocketChannel ch) throws IOException { buf.get(in, 0, messageLength); - Message query; - byte[] response; - try { - query = new Message(in); - LOG.info("received TCP query {}", query.getQuestion()); - response = generateReply(query, ch.socket()); - if (response == null) { - return; - } - } catch (IOException e) { - response = formErrorMessage(in); + byte[] response = generateTCPReply(in, ch.socket()); + if (response == null) { + return; } ByteBuffer out = ByteBuffer.allocate(response.length + 2); - out.clear(); byte[] data = new byte[2]; data[1] = (byte)(response.length & 0xFF); @@ -834,6 +825,24 @@ public void nioTCPClient(SocketChannel ch) throws IOException { } + /** + * Parse a TCP query and produce a reply, falling back to a FORMERR + * response if the query cannot be parsed. + * + * @param in the raw query bytes. + * @param socket the originating socket. + * @return the reply bytes, or {@code null} if no response should be sent. + */ + private byte[] generateTCPReply(byte[] in, Socket socket) { + try { + Message query = new Message(in); + LOG.info("received TCP query {}", query.getQuestion()); + return generateReply(query, socket); + } catch (IOException e) { + return formErrorMessage(in); + } + } + /** * Process a UDP request. * @@ -842,54 +851,71 @@ public void nioTCPClient(SocketChannel ch) throws IOException { * @param input the input ByteBuffer. * @throws IOException if the udp processing generates an issue. */ - public void nioUDPClient(DatagramChannel channel, SocketAddress remoteAddress, ByteBuffer input) throws IOException { - ByteBuffer output = ByteBuffer.allocate(4096); - byte[] in = null; - byte[] response = null; - Message query = null; + public void nioUDPClient(DatagramChannel channel, + SocketAddress remoteAddress, + ByteBuffer input) throws IOException { try { - try { - int position = input.position(); - in = new byte[position]; - input.flip(); - input.get(in); - query = new Message(in); - LOG.info("{}: received UDP query {}", remoteAddress, - query.getQuestion()); - response = generateReply(query, null); - if (response.length > output.capacity()) { - LOG.warn("{}: Response of UDP query {} exceeds limit {}", - remoteAddress, query.getQuestion(), output.limit()); - query.getHeader().setFlag(Flags.TC); - response = query.toWire(); - } - if (response == null) { - return; - } - } catch (IOException e) { - response = formErrorMessage(in); - if (response == null) { - LOG.debug("Error during create an error message." - + " Failed to parse a header", e); - return; - } + int position = input.position(); + byte[] in = new byte[position]; + input.flip(); + input.get(in); + + byte[] response = generateUDPReply(in, remoteAddress, 4096); + if (response == null) { + return; } - output.clear(); + + ByteBuffer output = ByteBuffer.allocate(4096); output.put(response); output.flip(); LOG.debug("{}: sending response", remoteAddress); channel.send(output, remoteAddress); - } catch (Exception e) { - if (e instanceof IOException && remoteAddress != null) { - throw NetUtils.wrapException(channel.socket().getInetAddress().getHostName(), - channel.socket().getPort(), - ((InetSocketAddress) remoteAddress).getHostName(), - ((InetSocketAddress) remoteAddress).getPort(), - (IOException) e); - } else { - throw e; + } catch (IOException e) { + if (remoteAddress instanceof InetSocketAddress) { + InetSocketAddress isa = (InetSocketAddress) remoteAddress; + throw NetUtils.wrapException( + channel.socket().getInetAddress().getHostName(), + channel.socket().getPort(), + isa.getHostName(), isa.getPort(), e); + } + throw e; + } + } + + /** + * Parse a UDP query and produce a reply, truncating with the TC flag if + * the response exceeds {@code maxSize}, or falling back to a FORMERR + * response if the query cannot be parsed. + * + * @param in the raw query bytes. + * @param remoteAddress the client address (for logging). + * @param maxSize the maximum allowed response size. + * @return the reply bytes, or {@code null} if no response should be sent. + */ + private byte[] generateUDPReply(byte[] in, SocketAddress remoteAddress, + int maxSize) { + try { + Message query = new Message(in); + LOG.info("{}: received UDP query {}", remoteAddress, + query.getQuestion()); + byte[] response = generateReply(query, null); + if (response == null) { + return null; + } + if (response.length > maxSize) { + LOG.warn("{}: Response of UDP query {} exceeds limit {}", + remoteAddress, query.getQuestion(), maxSize); + query.getHeader().setFlag(Flags.TC); + response = query.toWire(); + } + return response; + } catch (IOException e) { + byte[] err = formErrorMessage(in); + if (err == null) { + LOG.debug("Failed to parse header while creating error response", e); } + return err; } }