Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
public static final int MULTIPART_MIN_SIZE = 5 * 1024 * 1024;

// s3 access key
public static final String ACCESS_KEY = "fs.s3a.access.key";

Check failure on line 68 in hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java#L68

javadoc: warning: no comment

// s3 secret key
public static final String SECRET_KEY = "fs.s3a.secret.key";
Expand Down Expand Up @@ -1353,16 +1353,21 @@
*/
public static final String CUSTOM_HEADERS_POSTFIX = ".custom.headers";

/** Custom per-request headers postfix.
* value: {@value}
*/
public static final String CUSTOM_PER_REQUEST_HEADERS_POSTFIX = ".request";

/**
* List of custom headers to be set on the service client.
* Multiple parameters can be used to specify custom headers.
* <pre>
* Usage:
* fs.s3a.client.s3.custom.headers - Headers to add on all the S3 requests.
* fs.s3a.client.sts.custom.headers - Headers to add on all the STS requests.
* fs.s3a.client.s3.custom.headers - Headers to add to all S3 requests.
* fs.s3a.client.sts.custom.headers - Headers to add to all STS requests.
*
* Examples:
* CustomHeader {@literal ->} 'Header1:Value1'
* CustomHeader {@literal ->} 'Header1=Value1'
* CustomHeaders {@literal ->} 'Header1=Value1;Value2,Header2=Value1'
* </pre>
*/
Expand All @@ -1374,6 +1379,31 @@
FS_S3A_CLIENT_PREFIX + AWS_SERVICE_IDENTIFIER_S3.toLowerCase(Locale.ROOT)
+ CUSTOM_HEADERS_POSTFIX;

/**
* List of custom per-request-type headers to be set on the service client.
* Multiple parameters can be used to specify custom headers.
* <pre>
* Usage:
* fs.s3a.client.s3.custom.headers.request.REQUEST - Headers to add to all S3 REQUEST requests.
* fs.s3a.client.sts.custom.headers.request.REQUEST - Headers to add to all STS REQUEST requests.
*
* Note: REQUEST refers to the AWS S3 request name.
* See <a href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/s3/model/S3Request.html">subclasses of S3Request</a>
* and <a href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/sts/model/StsRequest.html">subclasses of StsRequest</a>
* for all existing requests.
*
* Examples:
* fs.s3a.client.s3.custom.headers.request.DeleteObjectRequest
* CustomHeader {@literal ->} 'Header1=Value1'
* CustomHeaders {@literal ->} 'Header1=Value1;Value2,Header2=Value1'
* </pre>
*/
public static final String CUSTOM_REQUEST_HEADERS_STS_PREFIX =
CUSTOM_HEADERS_STS + CUSTOM_PER_REQUEST_HEADERS_POSTFIX + ".";

public static final String CUSTOM_REQUEST_HEADERS_S3_PREFIX =
CUSTOM_HEADERS_S3 + CUSTOM_PER_REQUEST_HEADERS_POSTFIX + ".";

/**
* How long to wait for the thread pool to terminate when cleaning up.
* Value: {@value} seconds.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import org.slf4j.Logger;
Expand Down Expand Up @@ -82,6 +85,8 @@
import static org.apache.hadoop.fs.s3a.Constants.USER_AGENT_PREFIX;
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_HEADERS_S3;
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_HEADERS_STS;
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_REQUEST_HEADERS_S3_PREFIX;
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_REQUEST_HEADERS_STS_PREFIX;
import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.enforceMinimumDuration;
import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.getDuration;
import static org.apache.hadoop.util.Preconditions.checkArgument;
Expand Down Expand Up @@ -420,6 +425,32 @@ private static void initSigner(Configuration conf,
}
}

/**
* Parses header configuration at the given key. Calls apply callback for headers with
* non-empty header values list. Calls ignore callback for headers with empty header values list.
*
* @param conf hadoop configuration
* @param configKey configuration key
* @param apply apply callback
* @param ignore ignore callback
*/
private static void applyHeaders(Configuration conf, String configKey,
BiConsumer<String, List<String>> apply, Consumer<String> ignore) {
Map<String, String> awsClientCustomHeadersMap =
S3AUtils.getTrimmedStringCollectionSplitByEquals(conf, configKey);
awsClientCustomHeadersMap.forEach((header, valueString) -> {
List<String> headerValues = Arrays.stream(valueString.split(";"))
.map(String::trim)
.filter(v -> !v.isEmpty())
.collect(Collectors.toList());
if (!headerValues.isEmpty()) {
apply.accept(header, headerValues);
} else {
ignore.accept(header);
}
});
}

/**
* Initialize custom request headers for AWS clients.
* @param conf hadoop configuration
Expand All @@ -429,32 +460,44 @@ private static void initSigner(Configuration conf,
private static void initRequestHeaders(Configuration conf,
ClientOverrideConfiguration.Builder clientConfig, String awsServiceIdentifier) {
String configKey = null;
String configKeyPrefix = null;
switch (awsServiceIdentifier) {
case AWS_SERVICE_IDENTIFIER_S3:
configKey = CUSTOM_HEADERS_S3;
configKeyPrefix = CUSTOM_REQUEST_HEADERS_S3_PREFIX;
break;
case AWS_SERVICE_IDENTIFIER_STS:
configKey = CUSTOM_HEADERS_STS;
configKeyPrefix = CUSTOM_REQUEST_HEADERS_STS_PREFIX;
break;
default:
// No known service.
}
if (configKey != null) {
Map<String, String> awsClientCustomHeadersMap =
S3AUtils.getTrimmedStringCollectionSplitByEquals(conf, configKey);
awsClientCustomHeadersMap.forEach((header, valueString) -> {
List<String> headerValues = Arrays.stream(valueString.split(";"))
.map(String::trim)
.filter(v -> !v.isEmpty())
.collect(Collectors.toList());
if (!headerValues.isEmpty()) {
clientConfig.putHeader(header, headerValues);
} else {
LOG.warn("Ignoring header '{}' for {} client because no values were provided",
header, awsServiceIdentifier);
}
});
// headers for all requests are provided via clientConfig.putHeader
applyHeaders(conf, configKey, clientConfig::putHeader,
(header) -> LOG.warn("Ignoring header '{}' for {} client because no values were provided",
header, awsServiceIdentifier));
LOG.debug("headers for {} client = {}", awsServiceIdentifier, clientConfig.headers());

// per-request headers are provided via AddRequestHeaderInterceptor
String keyPrefix = configKeyPrefix;
Map<String, Map<String, List<String>>> requestHeaders = new HashMap<>();
Map<String, String> requestHeaderConfs = conf.getPropsWithPrefix(keyPrefix);
requestHeaderConfs.keySet().forEach((request) ->
applyHeaders(conf, keyPrefix + request,
(header, headerValues) ->
requestHeaders.computeIfAbsent(request, c -> new HashMap<>()).put(header, headerValues),
(header) -> LOG.warn("Ignoring {} request header '{}' for {} client because " +
"no values were provided", request, header, awsServiceIdentifier)
)
);
if (!requestHeaders.isEmpty()) {
clientConfig.addExecutionInterceptor(new AddRequestHeaderInterceptor(requestHeaders));
requestHeaders.forEach((request, headers) ->
LOG.debug("{} request headers for {} client = {}", request, awsServiceIdentifier, headers)
);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a.impl;

import software.amazon.awssdk.awscore.AwsRequest;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.interceptor.Context;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;

import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Consumer;

public class AddRequestHeaderInterceptor implements ExecutionInterceptor {
private final Map<String, Consumer<AwsRequestOverrideConfiguration.Builder>> appliers = new HashMap<>();

public AddRequestHeaderInterceptor(Map<String, Map<String, List<String>>> requestHeaders) {
requestHeaders.forEach((request, headers) -> appliers
.put(request.toLowerCase(Locale.ROOT), (b) -> headers.forEach(b::putHeader))
);
}

public SdkRequest modifyRequest(Context.ModifyRequest context, ExecutionAttributes executionAttributes) {
assert context.request() instanceof AwsRequest;

AwsRequest request = (AwsRequest) context.request();
String requestName = request.getClass().getSimpleName().toLowerCase(Locale.ROOT);
Consumer<AwsRequestOverrideConfiguration.Builder> applier = appliers.get(requestName);

if (applier != null) {
AwsRequestOverrideConfiguration overrideConfiguration =
request.overrideConfiguration()
.map(AwsRequestOverrideConfiguration::toBuilder)
.orElseGet(AwsRequestOverrideConfiguration::builder)
.applyMutation(applier)
.build();
return request.toBuilder().overrideConfiguration(overrideConfiguration).build();
} else {
return request;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -951,10 +951,14 @@ The switch to turn S3A auditing on or off.

### Configuring Custom Headers for AWS Service Clients

You can set custom headers for S3 and STS requests. These headers are set on client level, and will be sent for all requests made to these services.
You can set custom headers for S3 and STS requests. Headers can be set on client level and request type level.
Client level headers are sent for all requests made through the client. Request type level headers are sent for
requests of the respective type only.

#### Client Level Headers

**Configuration Properties:**
- `fs.s3a.client.s3.custom.headers`: Custom headers for S3 service requests.
- `fs.s3a.client.s3.custom.headers`: Sets custom headers for S3 service requests.
- `fs.s3a.client.sts.custom.headers`: Sets custom headers for all requests to AWS STS.

**Header Format:**
Expand All @@ -973,6 +977,33 @@ Custom headers should be specified as key-value pairs, separated by `=`. Multipl
</property>
```

#### Request-type Level Headers

**Configuration Properties:**
- `fs.s3a.client.s3.custom.headers.request.REQUEST`: Sets custom headers for S3 service requests of type `REQUEST`.
- `fs.s3a.client.sts.custom.headers.request.REQUEST`: Sets custom headers for all requests to AWS STS of type `REQUEST`.

Note: `REQUEST` refers to the AWS S3 and STS request name. These request type names are case-insensitive.
See <a href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/s3/model/S3Request.html">subclasses of S3Request</a>
and <a href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/sts/model/StsRequest.html">subclasses of StsRequest</a>
for all existing requests.

**Header Format:**
Custom headers should be specified as key-value pairs, separated by `=`. Multiple values for a single header can be separated by `;`. Multiple headers can be separated by `,`.


```xml
<property>
<name>fs.s3a.client.s3.custom.headers.request.ListObjectsV2Request</name>
<value>Header1=Value1</value>
</property>

<property>
<name>fs.s3a.client.sts.custom.headers.request.deleteobjectrequest</name>
<value>Header1=Value1;Value2,Header2=Value1</value>
</property>
```

## <a name="retry_and_recovery"></a>Retry and Recovery

The S3A client makes a best-effort attempt at recovering from network failures;
Expand Down
Loading
Loading