Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -680,8 +680,24 @@ public void operationComplete(ChannelFuture future) throws Exception {
LOG.trace("SendMap operation complete; mapsToWait='{}', channel='{}'",
this.reduceContext.getMapsToWait().get(), future.channel().id());
if (!future.isSuccess()) {
Throwable cause = future.cause();
if (cause instanceof ClosedChannelException) {
LOG.debug("Ignoring closed channel error in operationComplete. channel='{}'",
future.channel().id(), cause);
future.channel().close();
return;
}
if (cause instanceof IOException) {
String message = String.valueOf(cause.getMessage());
if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
LOG.debug("Ignoring client disconnect during sendMap. channel='{}' Cause: {}",
future.channel().id(), cause.getMessage());
future.channel().close();
return;
}
}
LOG.error("Future is unsuccessful. channel='{}' Cause: ",
future.channel().id(), future.cause());
future.channel().id(), cause);
future.channel().close();
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.FileRegion;
Expand Down Expand Up @@ -52,6 +53,7 @@
import java.io.IOException;
import java.net.URL;
import java.nio.channels.Channels;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.security.cert.X509Certificate;
Expand All @@ -71,6 +73,16 @@
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;

import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
Expand All @@ -83,6 +95,11 @@
import org.apache.hadoop.security.token.Token;
import org.eclipse.jetty.http.HttpHeader;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.stream.Stream;
import org.slf4j.LoggerFactory;

import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
Expand Down Expand Up @@ -255,6 +272,58 @@ public void testInvalidMapNoDataFile() {
assertFalse(shuffle.isActive(), "closed");
}

static Stream<Arguments> operationCompleteLoggingCases() {
return Stream.of(
Arguments.of(new ClosedChannelException(),
Level.DEBUG, "Ignoring closed channel error", false),
Arguments.of(new IOException("Connection reset by peer"),
Level.DEBUG, "Ignoring client disconnect", false),
Arguments.of(new IOException("Broken pipe"),
Level.DEBUG, "Ignoring client disconnect", false),
Arguments.of(new IOException("Disk full"),
Level.ERROR, "Future is unsuccessful", true)
);
}

@ParameterizedTest
@MethodSource("operationCompleteLoggingCases")
public void testOperationCompleteLogging(
Throwable cause, Level expectedLevel, String expectedMessage, boolean expectError)
throws Exception {
Logger logger = (Logger) ShuffleHandler.LOG;
Level oldLevel = logger.getLevel();
ListAppender<ILoggingEvent> listAppender = new ListAppender<>();
listAppender.setContext(logger.getLoggerContext());
listAppender.start();
logger.setLevel(Level.DEBUG);
logger.addAppender(listAppender);

try {
ChannelFuture future = mock(ChannelFuture.class, RETURNS_DEEP_STUBS);
when(future.isSuccess()).thenReturn(false);
when(future.cause()).thenReturn(cause);

ShuffleChannelHandler.ReduceContext ctx =
mock(ShuffleChannelHandler.ReduceContext.class, RETURNS_DEEP_STUBS);

new ShuffleChannelHandler.ReduceMapFileCount(null, ctx).operationComplete(future);

verify(future.channel()).close();

boolean hasExpectedLog = listAppender.list.stream()
.anyMatch(e -> e.getLevel() == expectedLevel
&& e.getFormattedMessage().contains(expectedMessage));
boolean hasErrorLog = listAppender.list.stream()
.anyMatch(e -> e.getLevel() == Level.ERROR);

assertTrue(hasExpectedLog, "Should log " + expectedLevel + " with '" + expectedMessage + "'");
assertEquals(expectError, hasErrorLog, "Error log presence mismatch");
} finally {
logger.detachAppender(listAppender);
logger.setLevel(oldLevel);
}
}

private void drainChannel(EmbeddedChannel ch) {
Object o;
while((o = ch.readInbound())!=null) {
Expand Down
Loading