diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RangeNotSatisfiableEOFException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RangeNotSatisfiableEOFException.java
new file mode 100644
index 0000000000000..4c6b9decb0b4d
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RangeNotSatisfiableEOFException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.EOFException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Status code 416, range not satisfiable.
+ * Subclass of {@link EOFException} so that any code which expects that to
+ * be the outcome of a 416 failure will continue to work.
+ */
+@InterfaceAudience.Private
+public class RangeNotSatisfiableEOFException extends EOFException {
+
+ public RangeNotSatisfiableEOFException(
+ String operation,
+ Exception cause) {
+ super(operation);
+ initCause(cause);
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index d87bb7fc94649..d8ba36500f7c3 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -99,6 +99,14 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
public static final String OPERATION_OPEN = "open";
public static final String OPERATION_REOPEN = "re-open";
+ /**
+ * Switch for behavior on when wrappedStream.read()
+ * returns -1 or raises an EOF; the original semantics
+ * are that the stream is kept open.
+ * Value {@value}.
+ */
+ private static final boolean CLOSE_WRAPPED_STREAM_ON_NEGATIVE_READ = true;
+
/**
* This is the maximum temporary buffer size we use while
* populating the data in direct byte buffers during a vectored IO
@@ -435,16 +443,23 @@ public boolean seekToNewSource(long targetPos) throws IOException {
/**
* Perform lazy seek and adjust stream to correct position for reading.
- *
+ * If an EOF Exception is raised there are two possibilities
+ *
+ * - the stream is at the end of the file
+ * - something went wrong with the network connection
+ *
+ * This method does not attempt to distinguish; it assumes that an EOF
+ * exception is always "end of file".
* @param targetPos position from where data should be read
* @param len length of the content that needs to be read
+ * @throws RangeNotSatisfiableEOFException GET is out of range
+ * @throws IOException anything else.
*/
@Retries.RetryTranslated
private void lazySeek(long targetPos, long len) throws IOException {
Invoker invoker = context.getReadInvoker();
- invoker.maybeRetry(streamStatistics.getOpenOperations() == 0,
- "lazySeek", pathStr, true,
+ invoker.retry("lazySeek to " + targetPos, pathStr, true,
() -> {
//For lazy seek
seekInStream(targetPos, len);
@@ -478,7 +493,9 @@ public synchronized int read() throws IOException {
try {
lazySeek(nextReadPos, 1);
- } catch (EOFException e) {
+ } catch (RangeNotSatisfiableEOFException e) {
+ // attempt to GET beyond the end of the object
+ LOG.debug("Downgrading 416 response attempt to read at {} to -1 response", nextReadPos);
return -1;
}
@@ -494,8 +511,6 @@ public synchronized int read() throws IOException {
}
try {
b = wrappedStream.read();
- } catch (EOFException e) {
- return -1;
} catch (SocketTimeoutException e) {
onReadFailure(e, true);
throw e;
@@ -509,10 +524,9 @@ public synchronized int read() throws IOException {
if (byteRead >= 0) {
pos++;
nextReadPos++;
- }
-
- if (byteRead >= 0) {
incrementBytesRead(1);
+ } else {
+ streamReadResultNegative();
}
return byteRead;
}
@@ -537,6 +551,18 @@ private void onReadFailure(IOException ioe, boolean forceAbort) {
closeStream("failure recovery", forceAbort, false);
}
+ /**
+ * the read() call returned -1.
+ * this means "the connection has gone past the end of the object" or
+ * the stream has broken for some reason.
+ * so close stream (without an abort).
+ */
+ private void streamReadResultNegative() {
+ if (CLOSE_WRAPPED_STREAM_ON_NEGATIVE_READ) {
+ closeStream("wrappedStream.read() returned -1", false, false);
+ }
+ }
+
/**
* {@inheritDoc}
*
@@ -562,8 +588,8 @@ public synchronized int read(byte[] buf, int off, int len)
try {
lazySeek(nextReadPos, len);
- } catch (EOFException e) {
- // the end of the file has moved
+ } catch (RangeNotSatisfiableEOFException e) {
+ // attempt to GET beyond the end of the object
return -1;
}
@@ -581,12 +607,12 @@ public synchronized int read(byte[] buf, int off, int len)
}
try {
bytes = wrappedStream.read(buf, off, len);
- } catch (EOFException e) {
- // the base implementation swallows EOFs.
- return -1;
} catch (SocketTimeoutException e) {
onReadFailure(e, true);
throw e;
+ } catch (EOFException e) {
+ // the base implementation swallows EOFs.
+ return -1;
} catch (IOException e) {
onReadFailure(e, false);
throw e;
@@ -597,8 +623,10 @@ public synchronized int read(byte[] buf, int off, int len)
if (bytesRead > 0) {
pos += bytesRead;
nextReadPos += bytesRead;
+ incrementBytesRead(bytesRead);
+ } else {
+ streamReadResultNegative();
}
- incrementBytesRead(bytesRead);
streamStatistics.readOperationCompleted(len, bytesRead);
return bytesRead;
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
index 528a99f5e0966..a648421d5041f 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
@@ -22,7 +22,10 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
+import java.net.BindException;
+import java.net.ConnectException;
import java.net.NoRouteToHostException;
+import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.nio.file.AccessDeniedException;
@@ -197,6 +200,9 @@ protected Map, RetryPolicy> createExceptionMap() {
// implementation
policyMap.put(NoVersionAttributeException.class, fail);
+ // range header is out of scope of object; retrying won't help
+ policyMap.put(RangeNotSatisfiableEOFException.class, fail);
+
// should really be handled by resubmitting to new location;
// that's beyond the scope of this retry policy
policyMap.put(AWSRedirectException.class, fail);
@@ -204,14 +210,17 @@ protected Map, RetryPolicy> createExceptionMap() {
// throttled requests are can be retried, always
policyMap.put(AWSServiceThrottledException.class, throttlePolicy);
+ // socket exception subclass we consider unrecoverable
+ // though this is normally only found when opening a port for listening.
+ // which is never done in S3A.
+ policyMap.put(BindException.class, fail);
+
// connectivity problems are retried without worrying about idempotency
policyMap.put(ConnectTimeoutException.class, connectivityFailure);
+ policyMap.put(ConnectException.class, connectivityFailure);
// this can be a sign of an HTTP connection breaking early.
- // which can be reacted to by another attempt if the request was idempotent.
- // But: could also be a sign of trying to read past the EOF on a GET,
- // which isn't going to be recovered from
- policyMap.put(EOFException.class, retryIdempotentCalls);
+ policyMap.put(EOFException.class, connectivityFailure);
// policy on a 400/bad request still ambiguous.
// Treated as an immediate failure
@@ -227,7 +236,9 @@ protected Map, RetryPolicy> createExceptionMap() {
policyMap.put(AWSClientIOException.class, retryIdempotentCalls);
policyMap.put(AWSServiceIOException.class, retryIdempotentCalls);
policyMap.put(AWSS3IOException.class, retryIdempotentCalls);
- policyMap.put(SocketTimeoutException.class, retryIdempotentCalls);
+ // general socket exceptions
+ policyMap.put(SocketException.class, connectivityFailure);
+ policyMap.put(SocketTimeoutException.class, connectivityFailure);
// Unsupported requests do not work, however many times you try
policyMap.put(UnsupportedRequestException.class, fail);
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index 590b0b55ac45e..1c2212d76ff2b 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -89,7 +89,7 @@
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket;
-import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.*;
import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.translateDeleteException;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator;
@@ -280,10 +280,13 @@ public static IOException translateException(@Nullable String operation,
break;
// out of range. This may happen if an object is overwritten with
- // a shorter one while it is being read.
- case 416:
- ioe = new EOFException(message);
- ioe.initCause(ase);
+ // a shorter one while it is being read or openFile() was invoked
+ // passing a FileStatus or file length less than that of the object.
+ // although the HTTP specification says that the response should
+ // include a range header specifying the actual range available,
+ // this isn't picked up here.
+ case SC_416_RANGE_NOT_SATISFIABLE:
+ ioe = new RangeNotSatisfiableEOFException(message, ase);
break;
// this has surfaced as a "no response from server" message.
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
index 2e74fbd524116..31f3a68a62547 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
@@ -108,11 +108,71 @@ private InternalConstants() {
S3A_OPENFILE_KEYS = Collections.unmodifiableSet(keys);
}
+ /** 200 status code: OK. */
+ public static final int SC_200_OK = 200;
+
+ /** 301 status code: Moved Permanently. */
+ public static final int SC_301_MOVED_PERMANENTLY = 301;
+
+ /** 307 status code: Temporary Redirect. */
+ public static final int SC_307_TEMPORARY_REDIRECT = 307;
+
+ /** 400 status code: Bad Request. */
+ public static final int SC_400_BAD_REQUEST = 400;
+
+ /** 401 status code: Unauthorized. */
+ public static final int SC_401_UNAUTHORIZED = 401;
+
+ /** 403 status code: Forbidden. */
+ public static final int SC_403_FORBIDDEN = 403;
+
/** 403 error code. */
- public static final int SC_403 = 403;
+ public static final int SC_403 = SC_403_FORBIDDEN;
+
+ /** 404 status code: Not Found. */
+ public static final int SC_404_NOT_FOUND = 404;
/** 404 error code. */
- public static final int SC_404 = 404;
+ public static final int SC_404 = SC_404_NOT_FOUND;
+
+ /** 405 status code: Method Not Allowed. */
+ public static final int SC_405_METHOD_NOT_ALLOWED = 405;
+
+ /** 409 status code: Conflict. Example: creating a bucket twice. */
+ public static final int SC_409_CONFLICT = 409;
+
+ /** 410 status code: Gone. */
+ public static final int SC_410_GONE = 410;
+
+ /** 412 status code: Precondition Failed. */
+ public static final int SC_412_PRECONDITION_FAILED = 412;
+
+ /** 415 status code: Content type unsupported by this store. */
+ public static final int SC_415_UNSUPPORTED_MEDIA_TYPE = 415;
+
+ /** 416 status code: Range Not Satisfiable. */
+ public static final int SC_416_RANGE_NOT_SATISFIABLE = 416;
+
+ /** 429 status code: This is the google GCS throttle message. */
+ public static final int SC_429_TOO_MANY_REQUESTS_GCS = 429;
+
+ /** 443 status code: No Response (unofficial). */
+ public static final int SC_443_NO_RESPONSE = 443;
+
+ /** 444 status code: No Response (unofficial). */
+ public static final int SC_444_NO_RESPONSE = 444;
+
+ /** 500 status code: Internal Server Error. */
+ public static final int SC_500_INTERNAL_SERVER_ERROR = 500;
+
+ /** 501 status code: method not implemented. */
+ public static final int SC_501_NOT_IMPLEMENTED = 501;
+
+ /** 503 status code: Service Unavailable. on AWS S3: throttle response. */
+ public static final int SC_503_SERVICE_UNAVAILABLE = 503;
+
+ /** 504 Gateway Timeout. AWS SDK considers retryable. */
+ public static final int SC_504_GATEWAY_TIMEOUT = 504;
/** Name of the log for throttling events. Value: {@value}. */
public static final String THROTTLE_LOG_NAME =
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java
index fd649c436bf59..4e3382f293430 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java
@@ -22,6 +22,7 @@
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_416_RANGE_NOT_SATISFIABLE;
import static org.junit.Assert.*;
import java.io.EOFException;
@@ -38,6 +39,7 @@
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.model.AmazonS3Exception;
+import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.apache.hadoop.fs.s3a.impl.ErrorTranslation;
@@ -80,10 +82,10 @@ protected void assertContained(String text, String contained) {
text != null && text.contains(contained));
}
- protected void verifyTranslated(
+ protected E verifyTranslated(
int status,
Class expected) throws Exception {
- verifyTranslated(expected, createS3Exception(status));
+ return verifyTranslated(expected, createS3Exception(status));
}
@Test
@@ -128,7 +130,12 @@ public void test410isNotFound() throws Exception {
@Test
public void test416isEOF() throws Exception {
- verifyTranslated(416, EOFException.class);
+
+ // 416 maps the the subclass of EOFException
+ final IOException ex = verifyTranslated(SC_416_RANGE_NOT_SATISFIABLE,
+ RangeNotSatisfiableEOFException.class);
+ Assertions.assertThat(ex)
+ .isInstanceOf(EOFException.class);
}
@Test
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
index 4aae84dca8e53..cd30b80b8556e 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
@@ -21,6 +21,7 @@
import java.io.EOFException;
+import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,9 +31,11 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.statistics.IOStatistics;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
import static org.apache.hadoop.fs.contract.ContractTestUtils.readStream;
@@ -41,10 +44,12 @@
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED;
import static org.apache.hadoop.fs.s3a.performance.OperationCost.NO_HEAD_OR_LIST;
+import static org.apache.hadoop.fs.s3a.performance.OperationCostValidator.probe;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
@@ -56,11 +61,13 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest {
private static final Logger LOG =
LoggerFactory.getLogger(ITestS3AOpenCost.class);
+ public static final String TEXT = "0123456789ABCDEF";
+
private Path testFile;
private FileStatus testFileStatus;
- private long fileLength;
+ private int fileLength;
public ITestS3AOpenCost() {
super(true);
@@ -76,9 +83,9 @@ public void setup() throws Exception {
S3AFileSystem fs = getFileSystem();
testFile = methodPath();
- writeTextFile(fs, testFile, "openfile", true);
+ writeTextFile(fs, testFile, TEXT, true);
testFileStatus = fs.getFileStatus(testFile);
- fileLength = testFileStatus.getLen();
+ fileLength = (int)testFileStatus.getLen();
}
/**
@@ -137,15 +144,8 @@ public void testOpenFileShorterLength() throws Throwable {
int offset = 2;
long shortLen = fileLength - offset;
// open the file
- FSDataInputStream in2 = verifyMetrics(() ->
- fs.openFile(testFile)
- .must(FS_OPTION_OPENFILE_READ_POLICY,
- FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
- .mustLong(FS_OPTION_OPENFILE_LENGTH, shortLen)
- .build()
- .get(),
- always(NO_HEAD_OR_LIST),
- with(STREAM_READ_OPENED, 0));
+ FSDataInputStream in2 = openFile(shortLen,
+ FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL);
// verify that the statistics are in range
IOStatistics ioStatistics = extractStatistics(in2);
@@ -171,39 +171,182 @@ public void testOpenFileShorterLength() throws Throwable {
}
@Test
- public void testOpenFileLongerLength() throws Throwable {
- // do a second read with the length declared as longer
+ public void testOpenFileLongerLengthReadFully() throws Throwable {
+ // do a read with the length declared as longer
// than it is.
// An EOF will be read on readFully(), -1 on a read()
+ final int extra = 10;
+ long longLen = fileLength + extra;
+
+
+ // assert behaviors of seeking/reading past the file length.
+ // there is no attempt at recovery.
+ verifyMetrics(() -> {
+ try (FSDataInputStream in = openFile(longLen,
+ FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) {
+ byte[] out = new byte[(int) (longLen)];
+ intercept(EOFException.class, () -> {
+ in.readFully(0, out);
+ return in;
+ });
+ in.seek(longLen - 1);
+ assertEquals("read past real EOF on " + in, -1, in.read());
+ return in.toString();
+ }
+ },
+ always(NO_HEAD_OR_LIST),
+ // two GET calls were made, one for readFully,
+ // the second on the read() past the EOF
+ // the operation has got as far as S3
+ probe(true, STREAM_READ_OPENED, 1 + 1));
+
+ // now on a new stream, try a full read from after the EOF
+ verifyMetrics(() -> {
+ try (FSDataInputStream in =
+ openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) {
+ byte[] out = new byte[extra];
+ intercept(EOFException.class, () -> in.readFully(fileLength, out));
+ return in.toString();
+ }
+ },
+ // two GET calls were made, one for readFully,
+ // the second on the read() past the EOF
+ // the operation has got as far as S3
+
+ with(STREAM_READ_OPENED, 1));
+ }
+
+ /**
+ * Open a file.
+ * @param longLen length to declare
+ * @param policy read policy
+ * @return file handle
+ */
+ private FSDataInputStream openFile(final long longLen, String policy)
+ throws Exception {
S3AFileSystem fs = getFileSystem();
// set a length past the actual file length
- long longLen = fileLength + 10;
- FSDataInputStream in3 = verifyMetrics(() ->
+ return verifyMetrics(() ->
fs.openFile(testFile)
- .must(FS_OPTION_OPENFILE_READ_POLICY,
- FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
+ .must(FS_OPTION_OPENFILE_READ_POLICY, policy)
.mustLong(FS_OPTION_OPENFILE_LENGTH, longLen)
.build()
.get(),
always(NO_HEAD_OR_LIST));
+ }
+
+ /**
+ * Open a file with a length declared as longer than the actual file length.
+ * Validate input stream.read() semantics.
+ */
+ @Test
+ public void testReadPastEOF() throws Throwable {
+
+ // set a length past the actual file length
+ describe("read() up to the end of the real file");
+
+ final int extra = 10;
+ int longLen = fileLength + extra;
+ try (FSDataInputStream in = openFile(longLen,
+ FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
+ for (int i = 0; i < fileLength; i++) {
+ Assertions.assertThat(in.read())
+ .describedAs("read() at %d from stream %s", i, in)
+ .isEqualTo(TEXT.charAt(i));
+ }
+ LOG.info("Statistics after EOF {}", ioStatisticsToPrettyString(in.getIOStatistics()));
+ }
+
+ // now open and read after the EOF; this is
+ // expected to return -1 on each read; there's a GET per call.
+ // as the counters are updated on close(), the stream must be closed
+ // within the verification clause.
+ // note how there's no attempt to alter file expected length...
+ // instead the call always goes to S3.
+ // there's no information in the exception from the SDK
+ describe("reading past the end of the file");
- // assert behaviors of seeking/reading past the file length.
- // there is no attempt at recovery.
verifyMetrics(() -> {
- byte[] out = new byte[(int) longLen];
- intercept(EOFException.class,
- () -> in3.readFully(0, out));
- in3.seek(longLen - 1);
- assertEquals("read past real EOF on " + in3,
- -1, in3.read());
- in3.close();
- return in3.toString();
+ try (FSDataInputStream in =
+ openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
+ for (int i = 0; i < extra; i++) {
+ final int p = fileLength + i;
+ in.seek(p);
+ Assertions.assertThat(in.read())
+ .describedAs("read() at %d", p)
+ .isEqualTo(-1);
+ }
+ LOG.info("Statistics after EOF {}", ioStatisticsToPrettyString(in.getIOStatistics()));
+ return in.toString();
+ }
},
- // two GET calls were made, one for readFully,
- // the second on the read() past the EOF
- // the operation has got as far as S3
- with(STREAM_READ_OPENED, 2));
+ always(NO_HEAD_OR_LIST),
+ probe(Statistic.ACTION_HTTP_GET_REQUEST, extra));
+ }
+ /**
+ * Test {@code PositionedReadable.readFully()} past EOF in a file.
+ */
+ @Test
+ public void testPositionedReadableReadFullyPastEOF() throws Throwable {
+ // now, next corner case. Do a readFully() of more bytes than the file length.
+ // we expect failure.
+ // this codepath does a GET to the end of the (expected) file length, and when
+ // that GET returns -1 from the read because the bytes returned is less than
+ // expected then the readFully call fails.
+ describe("PositionedReadable.readFully() past the end of the file");
+ // set a length past the actual file length
+ final int extra = 10;
+ int longLen = fileLength + extra;
+ verifyMetrics(() -> {
+ try (FSDataInputStream in =
+ openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
+ byte[] buf = new byte[(int) (longLen + 1)];
+ // readFully will fail
+ intercept(EOFException.class, () -> {
+ in.readFully(0, buf);
+ return in;
+ });
+ return "readFully past EOF with statistics"
+ + ioStatisticsToPrettyString(in.getIOStatistics());
+ }
+ },
+ always(NO_HEAD_OR_LIST),
+ probe(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open
+ }
+
+ /**
+ * Test {@code PositionedReadable.read()} past EOF in a file.
+ */
+ @Test
+ public void testPositionedReadableReadPastEOF() throws Throwable {
+
+ // set a length past the actual file length
+ final int extra = 10;
+ int longLen = fileLength + extra;
+
+ describe("PositionedReadable.read() past the end of the file");
+ verifyMetrics(() -> {
+ try (FSDataInputStream in =
+ openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
+ byte[] buf = new byte[(int) (longLen + 1)];
+
+ // readFully will read to the end of the file
+ Assertions.assertThat(in.read(0, buf, 0, buf.length))
+ .isEqualTo(fileLength);
+
+ // now attempt to read after EOF
+ Assertions.assertThat(in.read(fileLength, buf, 0, buf.length))
+ .describedAs("PositionedReadable.read() past EOF")
+ .isEqualTo(-1);
+ // stream is closed as part of this failure
+
+ return "PositionedReadable.read()) past EOF with " + in;
+ }
+ },
+ always(NO_HEAD_OR_LIST),
+ probe(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open
}
+
}