diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleChannelHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleChannelHandler.java index d0d0e74c9970b0..3eae966c64ec6e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleChannelHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleChannelHandler.java @@ -680,8 +680,24 @@ public void operationComplete(ChannelFuture future) throws Exception { LOG.trace("SendMap operation complete; mapsToWait='{}', channel='{}'", this.reduceContext.getMapsToWait().get(), future.channel().id()); if (!future.isSuccess()) { + Throwable cause = future.cause(); + if (cause instanceof ClosedChannelException) { + LOG.debug("Ignoring closed channel error in operationComplete. channel='{}'", + future.channel().id(), cause); + future.channel().close(); + return; + } + if (cause instanceof IOException) { + String message = String.valueOf(cause.getMessage()); + if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) { + LOG.debug("Ignoring client disconnect during sendMap. channel='{}' Cause: {}", + future.channel().id(), cause.getMessage()); + future.channel().close(); + return; + } + } LOG.error("Future is unsuccessful. channel='{}' Cause: ", - future.channel().id(), future.cause()); + future.channel().id(), cause); future.channel().close(); return; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleChannelHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleChannelHandler.java index 07ec12a42b8868..946f76247c70c9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleChannelHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleChannelHandler.java @@ -22,6 +22,7 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.FileRegion; @@ -52,6 +53,7 @@ import java.io.IOException; import java.net.URL; import java.nio.channels.Channels; +import java.nio.channels.ClosedChannelException; import java.nio.channels.WritableByteChannel; import java.nio.charset.StandardCharsets; import java.security.cert.X509Certificate; @@ -71,6 +73,16 @@ import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import ch.qos.logback.classic.Level; +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.read.ListAppender; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; @@ -83,6 +95,11 @@ import org.apache.hadoop.security.token.Token; import org.eclipse.jetty.http.HttpHeader; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.stream.Stream; import org.slf4j.LoggerFactory; import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH; @@ -255,6 +272,58 @@ public void testInvalidMapNoDataFile() { assertFalse(shuffle.isActive(), "closed"); } + static Stream operationCompleteLoggingCases() { + return Stream.of( + Arguments.of(new ClosedChannelException(), + Level.DEBUG, "Ignoring closed channel error", false), + Arguments.of(new IOException("Connection reset by peer"), + Level.DEBUG, "Ignoring client disconnect", false), + Arguments.of(new IOException("Broken pipe"), + Level.DEBUG, "Ignoring client disconnect", false), + Arguments.of(new IOException("Disk full"), + Level.ERROR, "Future is unsuccessful", true) + ); + } + + @ParameterizedTest + @MethodSource("operationCompleteLoggingCases") + public void testOperationCompleteLogging( + Throwable cause, Level expectedLevel, String expectedMessage, boolean expectError) + throws Exception { + Logger logger = (Logger) ShuffleHandler.LOG; + Level oldLevel = logger.getLevel(); + ListAppender listAppender = new ListAppender<>(); + listAppender.setContext(logger.getLoggerContext()); + listAppender.start(); + logger.setLevel(Level.DEBUG); + logger.addAppender(listAppender); + + try { + ChannelFuture future = mock(ChannelFuture.class, RETURNS_DEEP_STUBS); + when(future.isSuccess()).thenReturn(false); + when(future.cause()).thenReturn(cause); + + ShuffleChannelHandler.ReduceContext ctx = + mock(ShuffleChannelHandler.ReduceContext.class, RETURNS_DEEP_STUBS); + + new ShuffleChannelHandler.ReduceMapFileCount(null, ctx).operationComplete(future); + + verify(future.channel()).close(); + + boolean hasExpectedLog = listAppender.list.stream() + .anyMatch(e -> e.getLevel() == expectedLevel + && e.getFormattedMessage().contains(expectedMessage)); + boolean hasErrorLog = listAppender.list.stream() + .anyMatch(e -> e.getLevel() == Level.ERROR); + + assertTrue(hasExpectedLog, "Should log " + expectedLevel + " with '" + expectedMessage + "'"); + assertEquals(expectError, hasErrorLog, "Error log presence mismatch"); + } finally { + logger.detachAppender(listAppender); + logger.setLevel(oldLevel); + } + } + private void drainChannel(EmbeddedChannel ch) { Object o; while((o = ch.readInbound())!=null) {