Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a;

import java.io.EOFException;

import org.apache.hadoop.classification.InterfaceAudience;

/**
* Status code 416, range not satisfiable.
* Subclass of {@link EOFException} so that any code which expects that to
* be the outcome of a 416 failure will continue to work.
*/
@InterfaceAudience.Private
public class RangeNotSatisfiableEOFException extends EOFException {

public RangeNotSatisfiableEOFException(
String operation,
Exception cause) {
super(operation);
initCause(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
public static final String OPERATION_OPEN = "open";
public static final String OPERATION_REOPEN = "re-open";

/**
* Switch for behavior on when wrappedStream.read()
* returns -1 or raises an EOF; the original semantics
* are that the stream is kept open.
* Value {@value}.
*/
private static final boolean CLOSE_WRAPPED_STREAM_ON_NEGATIVE_READ = true;

/**
* This is the maximum temporary buffer size we use while
* populating the data in direct byte buffers during a vectored IO
Expand Down Expand Up @@ -435,16 +443,23 @@ public boolean seekToNewSource(long targetPos) throws IOException {

/**
* Perform lazy seek and adjust stream to correct position for reading.
*
* If an EOF Exception is raised there are two possibilities
* <ol>
* <li>the stream is at the end of the file</li>
* <li>something went wrong with the network connection</li>
* </ol>
* This method does not attempt to distinguish; it assumes that an EOF
* exception is always "end of file".
* @param targetPos position from where data should be read
* @param len length of the content that needs to be read
* @throws RangeNotSatisfiableEOFException GET is out of range
* @throws IOException anything else.
*/
@Retries.RetryTranslated
private void lazySeek(long targetPos, long len) throws IOException {

Invoker invoker = context.getReadInvoker();
invoker.maybeRetry(streamStatistics.getOpenOperations() == 0,
"lazySeek", pathStr, true,
invoker.retry("lazySeek to " + targetPos, pathStr, true,
() -> {
//For lazy seek
seekInStream(targetPos, len);
Expand Down Expand Up @@ -478,7 +493,9 @@ public synchronized int read() throws IOException {

try {
lazySeek(nextReadPos, 1);
} catch (EOFException e) {
} catch (RangeNotSatisfiableEOFException e) {
// attempt to GET beyond the end of the object
LOG.debug("Downgrading 416 response attempt to read at {} to -1 response", nextReadPos);
return -1;
}

Expand All @@ -494,8 +511,6 @@ public synchronized int read() throws IOException {
}
try {
b = wrappedStream.read();
} catch (EOFException e) {
return -1;
} catch (SocketTimeoutException e) {
onReadFailure(e, true);
throw e;
Expand All @@ -509,10 +524,9 @@ public synchronized int read() throws IOException {
if (byteRead >= 0) {
pos++;
nextReadPos++;
}

if (byteRead >= 0) {
incrementBytesRead(1);
} else {
streamReadResultNegative();
}
return byteRead;
}
Expand All @@ -537,6 +551,18 @@ private void onReadFailure(IOException ioe, boolean forceAbort) {
closeStream("failure recovery", forceAbort, false);
}

/**
* the read() call returned -1.
* this means "the connection has gone past the end of the object" or
* the stream has broken for some reason.
* so close stream (without an abort).
*/
private void streamReadResultNegative() {
if (CLOSE_WRAPPED_STREAM_ON_NEGATIVE_READ) {
closeStream("wrappedStream.read() returned -1", false, false);
}
}

/**
* {@inheritDoc}
*
Expand All @@ -562,8 +588,8 @@ public synchronized int read(byte[] buf, int off, int len)

try {
lazySeek(nextReadPos, len);
} catch (EOFException e) {
// the end of the file has moved
} catch (RangeNotSatisfiableEOFException e) {
// attempt to GET beyond the end of the object
return -1;
}

Expand All @@ -581,12 +607,12 @@ public synchronized int read(byte[] buf, int off, int len)
}
try {
bytes = wrappedStream.read(buf, off, len);
} catch (EOFException e) {
// the base implementation swallows EOFs.
return -1;
} catch (SocketTimeoutException e) {
onReadFailure(e, true);
throw e;
} catch (EOFException e) {
// the base implementation swallows EOFs.
return -1;
} catch (IOException e) {
onReadFailure(e, false);
throw e;
Expand All @@ -597,8 +623,10 @@ public synchronized int read(byte[] buf, int off, int len)
if (bytesRead > 0) {
pos += bytesRead;
nextReadPos += bytesRead;
incrementBytesRead(bytesRead);
} else {
streamReadResultNegative();
}
incrementBytesRead(bytesRead);
streamStatistics.readOperationCompleted(len, bytesRead);
return bytesRead;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.BindException;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.nio.file.AccessDeniedException;
Expand Down Expand Up @@ -197,21 +200,27 @@ protected Map<Class<? extends Exception>, RetryPolicy> createExceptionMap() {
// implementation
policyMap.put(NoVersionAttributeException.class, fail);

// range header is out of scope of object; retrying won't help
policyMap.put(RangeNotSatisfiableEOFException.class, fail);

// should really be handled by resubmitting to new location;
// that's beyond the scope of this retry policy
policyMap.put(AWSRedirectException.class, fail);

// throttled requests are can be retried, always
policyMap.put(AWSServiceThrottledException.class, throttlePolicy);

// socket exception subclass we consider unrecoverable
// though this is normally only found when opening a port for listening.
// which is never done in S3A.
policyMap.put(BindException.class, fail);

// connectivity problems are retried without worrying about idempotency
policyMap.put(ConnectTimeoutException.class, connectivityFailure);
policyMap.put(ConnectException.class, connectivityFailure);

// this can be a sign of an HTTP connection breaking early.
// which can be reacted to by another attempt if the request was idempotent.
// But: could also be a sign of trying to read past the EOF on a GET,
// which isn't going to be recovered from
policyMap.put(EOFException.class, retryIdempotentCalls);
policyMap.put(EOFException.class, connectivityFailure);

// policy on a 400/bad request still ambiguous.
// Treated as an immediate failure
Expand All @@ -227,7 +236,9 @@ protected Map<Class<? extends Exception>, RetryPolicy> createExceptionMap() {
policyMap.put(AWSClientIOException.class, retryIdempotentCalls);
policyMap.put(AWSServiceIOException.class, retryIdempotentCalls);
policyMap.put(AWSS3IOException.class, retryIdempotentCalls);
policyMap.put(SocketTimeoutException.class, retryIdempotentCalls);
// general socket exceptions
policyMap.put(SocketException.class, connectivityFailure);
policyMap.put(SocketTimeoutException.class, connectivityFailure);

// Unsupported requests do not work, however many times you try
policyMap.put(UnsupportedRequestException.class, fail);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.*;
import static org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.translateDeleteException;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator;
Expand Down Expand Up @@ -280,10 +280,13 @@ public static IOException translateException(@Nullable String operation,
break;

// out of range. This may happen if an object is overwritten with
// a shorter one while it is being read.
case 416:
ioe = new EOFException(message);
ioe.initCause(ase);
// a shorter one while it is being read or openFile() was invoked
// passing a FileStatus or file length less than that of the object.
// although the HTTP specification says that the response should
// include a range header specifying the actual range available,
// this isn't picked up here.
case SC_416_RANGE_NOT_SATISFIABLE:
ioe = new RangeNotSatisfiableEOFException(message, ase);
break;

// this has surfaced as a "no response from server" message.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,71 @@ private InternalConstants() {
S3A_OPENFILE_KEYS = Collections.unmodifiableSet(keys);
}

/** 200 status code: OK. */
public static final int SC_200_OK = 200;

/** 301 status code: Moved Permanently. */
public static final int SC_301_MOVED_PERMANENTLY = 301;

/** 307 status code: Temporary Redirect. */
public static final int SC_307_TEMPORARY_REDIRECT = 307;

/** 400 status code: Bad Request. */
public static final int SC_400_BAD_REQUEST = 400;

/** 401 status code: Unauthorized. */
public static final int SC_401_UNAUTHORIZED = 401;

/** 403 status code: Forbidden. */
public static final int SC_403_FORBIDDEN = 403;

/** 403 error code. */
public static final int SC_403 = 403;
public static final int SC_403 = SC_403_FORBIDDEN;

/** 404 status code: Not Found. */
public static final int SC_404_NOT_FOUND = 404;

/** 404 error code. */
public static final int SC_404 = 404;
public static final int SC_404 = SC_404_NOT_FOUND;

/** 405 status code: Method Not Allowed. */
public static final int SC_405_METHOD_NOT_ALLOWED = 405;

/** 409 status code: Conflict. Example: creating a bucket twice. */
public static final int SC_409_CONFLICT = 409;

/** 410 status code: Gone. */
public static final int SC_410_GONE = 410;

/** 412 status code: Precondition Failed. */
public static final int SC_412_PRECONDITION_FAILED = 412;

/** 415 status code: Content type unsupported by this store. */
public static final int SC_415_UNSUPPORTED_MEDIA_TYPE = 415;

/** 416 status code: Range Not Satisfiable. */
public static final int SC_416_RANGE_NOT_SATISFIABLE = 416;

/** 429 status code: This is the google GCS throttle message. */
public static final int SC_429_TOO_MANY_REQUESTS_GCS = 429;

/** 443 status code: No Response (unofficial). */
public static final int SC_443_NO_RESPONSE = 443;

/** 444 status code: No Response (unofficial). */
public static final int SC_444_NO_RESPONSE = 444;

/** 500 status code: Internal Server Error. */
public static final int SC_500_INTERNAL_SERVER_ERROR = 500;

/** 501 status code: method not implemented. */
public static final int SC_501_NOT_IMPLEMENTED = 501;

/** 503 status code: Service Unavailable. on AWS S3: throttle response. */
public static final int SC_503_SERVICE_UNAVAILABLE = 503;

/** 504 Gateway Timeout. AWS SDK considers retryable. */
public static final int SC_504_GATEWAY_TIMEOUT = 504;

/** Name of the log for throttling events. Value: {@value}. */
public static final String THROTTLE_LOG_NAME =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_416_RANGE_NOT_SATISFIABLE;
import static org.junit.Assert.*;

import java.io.EOFException;
Expand All @@ -38,6 +39,7 @@
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.model.AmazonS3Exception;

import org.assertj.core.api.Assertions;
import org.junit.Test;

import org.apache.hadoop.fs.s3a.impl.ErrorTranslation;
Expand Down Expand Up @@ -80,10 +82,10 @@ protected void assertContained(String text, String contained) {
text != null && text.contains(contained));
}

protected <E extends Throwable> void verifyTranslated(
protected <E extends Throwable> E verifyTranslated(
int status,
Class<E> expected) throws Exception {
verifyTranslated(expected, createS3Exception(status));
return verifyTranslated(expected, createS3Exception(status));
}

@Test
Expand Down Expand Up @@ -128,7 +130,12 @@ public void test410isNotFound() throws Exception {

@Test
public void test416isEOF() throws Exception {
verifyTranslated(416, EOFException.class);

// 416 maps the the subclass of EOFException
final IOException ex = verifyTranslated(SC_416_RANGE_NOT_SATISFIABLE,
RangeNotSatisfiableEOFException.class);
Assertions.assertThat(ex)
.isInstanceOf(EOFException.class);
}

@Test
Expand Down
Loading