diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index a29fab9734eb42..11020100da9c67 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -88,6 +88,7 @@ import java.nio.channels.ClosedChannelException; import java.util.Arrays; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.DO_NOT_USE_RECEIPT_VERIFICATION; import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION; @@ -106,7 +107,9 @@ class DataXceiver extends Receiver implements Runnable { public static final Logger LOG = DataNode.LOG; static final Logger CLIENT_TRACE_LOG = DataNode.CLIENT_TRACE_LOG; - + static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile( + ".*Premature EOF.*", Pattern.CASE_INSENSITIVE); + private Peer peer; private final String remoteAddress; // address of remote side private final String remoteAddressWithoutPort; // only the address, no port @@ -315,9 +318,9 @@ public void run() { } else { LOG.info("{}; {}", s, t.toString()); } - } else if (op == Op.READ_BLOCK && t instanceof SocketTimeoutException) { + } else if (isIgnorableClientDisconnect(op, t)) { String s1 = - "Likely the client has stopped reading, disconnecting it"; + "Likely the client has stopped, disconnecting it"; s1 += " (" + s + ")"; if (LOG.isTraceEnabled()) { LOG.trace(s1, t); @@ -347,6 +350,13 @@ public void run() { } } + @VisibleForTesting + static boolean isIgnorableClientDisconnect(Op op, Throwable t) { + return (op == Op.READ_BLOCK && t instanceof SocketTimeoutException) || + (op == Op.WRITE_BLOCK && t instanceof IOException && + IGNORABLE_ERROR_MESSAGE.matcher(String.valueOf(t.getMessage())).matches()); + } + /** * In this short living thread, any local states should be collected before * the thread dies away. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverBackwardsCompat.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverBackwardsCompat.java index eea64bef1f8591..4a941c0f7354b0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverBackwardsCompat.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverBackwardsCompat.java @@ -38,8 +38,14 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.ArgumentCaptor; +import java.net.SocketTimeoutException; +import java.util.stream.Stream; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; @@ -49,6 +55,7 @@ import java.net.Socket; import java.util.UUID; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; @@ -223,4 +230,24 @@ public void testBackwardsCompat() throws Exception { } } } + + static Stream ignorableClientDisconnectData() { + return Stream.of( + Arguments.of(Op.READ_BLOCK, new SocketTimeoutException("timeout"), true), + Arguments.of(Op.READ_BLOCK, new IOException("Connection reset"), false), + Arguments.of(Op.WRITE_BLOCK, new IOException("Premature EOF from inputStream"), true), + Arguments.of(Op.WRITE_BLOCK, new IOException("premature eof"), true), + Arguments.of(Op.WRITE_BLOCK, new IOException("Connection reset"), false), + Arguments.of(Op.WRITE_BLOCK, new SocketTimeoutException("timeout"), false), + Arguments.of(Op.WRITE_BLOCK, new IOException((String) null), false), + Arguments.of(Op.TRANSFER_BLOCK, new SocketTimeoutException("timeout"), false), + Arguments.of(Op.TRANSFER_BLOCK, new IOException("Premature EOF"), false) + ); + } + + @ParameterizedTest(name = "{index}: op={0}, exception={1}, expected={2}") + @MethodSource("ignorableClientDisconnectData") + void testIsIgnorableClientDisconnect(Op op, Throwable t, boolean expected) { + assertEquals(expected, DataXceiver.isIgnorableClientDisconnect(op, t)); + } }