Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1748,7 +1748,7 @@
<property>
<name>fs.s3a.multiobjectdelete.enable</name>
<value>true</value>
<description>When enabled, multiple single-object delete requests are replaced by
<description>When true, multiple single-object delete requests are replaced by
a single 'delete multiple objects'-request, reducing the number of requests.
Beware: legacy S3-compatible object stores might not support this request.
</description>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1353,16 +1353,21 @@ private Constants() {
*/
public static final String CUSTOM_HEADERS_POSTFIX = ".custom.headers";

/** Custom per-request headers postfix.
* value: {@value}
*/
public static final String CUSTOM_PER_REQUEST_HEADERS_POSTFIX = ".request";

/**
* List of custom headers to be set on the service client.
* Multiple parameters can be used to specify custom headers.
* <pre>
* Usage:
* fs.s3a.client.s3.custom.headers - Headers to add on all the S3 requests.
* fs.s3a.client.sts.custom.headers - Headers to add on all the STS requests.
* fs.s3a.client.s3.custom.headers - Headers to add to all S3 requests.
* fs.s3a.client.sts.custom.headers - Headers to add to all STS requests.
*
* Examples:
* CustomHeader {@literal ->} 'Header1:Value1'
* CustomHeader {@literal ->} 'Header1=Value1'
* CustomHeaders {@literal ->} 'Header1=Value1;Value2,Header2=Value1'
* </pre>
*/
Expand All @@ -1374,6 +1379,33 @@ private Constants() {
FS_S3A_CLIENT_PREFIX + AWS_SERVICE_IDENTIFIER_S3.toLowerCase(Locale.ROOT)
+ CUSTOM_HEADERS_POSTFIX;

/**
* List of custom per-request-type headers to be set on the service client.
* Multiple parameters can be used to specify custom headers.
* <pre>
* Usage:
* fs.s3a.client.s3.custom.headers.request.REQUEST - Headers to add to all
* S3 REQUEST requests.
* fs.s3a.client.sts.custom.headers.request.REQUEST - Headers to add to all
* STS REQUEST requests.
*
* Note: REQUEST refers to the AWS request name.
*
* Examples:
* fs.s3a.client.s3.custom.headers.request.DeleteObjectRequest
* CustomHeader {@literal ->} 'Header1=Value1'
* CustomHeaders {@literal ->} 'Header1=Value1;Value2,Header2=Value1'
* </pre>
*/
public static final String CUSTOM_REQUEST_HEADERS_STS_PREFIX =
CUSTOM_HEADERS_STS + CUSTOM_PER_REQUEST_HEADERS_POSTFIX + ".";

/**
* Prefix for custom S3 request-type headers.
*/
public static final String CUSTOM_REQUEST_HEADERS_S3_PREFIX =
CUSTOM_HEADERS_S3 + CUSTOM_PER_REQUEST_HEADERS_POSTFIX + ".";

/**
* How long to wait for the thread pool to terminate when cleaning up.
* Value: {@value} seconds.
Expand Down Expand Up @@ -1770,6 +1802,20 @@ private Constants() {
*/
public static final boolean DIRECTORY_OPERATIONS_PURGE_UPLOADS_DEFAULT = false;

/**
* When true, recursive deletion of a non-empty directory uses a single delete
* request for the directory key (prefix) instead of listing and deleting contained
* objects first. Only enable this for S3-compatible endpoints that support
* deleting non-empty directories (path prefix) in one request (e.g. VAST).
* Value: {@value}.
*/
public static final String DELETE_NON_EMPTY_DIRECTORY_ENABLED =
"fs.s3a.delete.non-empty-directory.enabled";

/**
* Default value of {@link #DELETE_NON_EMPTY_DIRECTORY_ENABLED}: {@value}.
*/
public static final boolean DELETE_NON_EMPTY_DIRECTORY_ENABLED_DEFAULT = false;

/**
* Is the higher performance copy from local file to S3 enabled?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
private boolean dirOperationsPurgeUploads;

/**
* When true, recursive deletion of a non-empty directory uses a single delete
* request for the directory key (for S3-compatible endpoints that support it).
*/
private boolean deleteNonEmptyDirectoryEnabled;

/**
* Page size for deletions.
*/
Expand Down Expand Up @@ -687,6 +693,9 @@ public void initialize(URI name, Configuration originalConf)
dirOperationsPurgeUploads = conf.getBoolean(DIRECTORY_OPERATIONS_PURGE_UPLOADS,
DIRECTORY_OPERATIONS_PURGE_UPLOADS_DEFAULT);

deleteNonEmptyDirectoryEnabled = conf.getBoolean(DELETE_NON_EMPTY_DIRECTORY_ENABLED,
DELETE_NON_EMPTY_DIRECTORY_ENABLED_DEFAULT);

this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
DEFAULT_MULTIPART_UPLOAD_ENABLED);
// multipart copy and upload are the same; this just makes it explicit
Expand Down Expand Up @@ -2616,7 +2625,8 @@ public void deleteObjectAtPath(final Path path,
throws IOException {
auditSpan.activate();
once("delete", path.toString(), () ->
S3AFileSystem.this.deleteObjectAtPath(path, key, isFile));
S3AFileSystem.this.deleteObjectAtPath(
path, key, isFile));
}

@Override
Expand Down Expand Up @@ -3223,7 +3233,10 @@ void deleteObjectAtPath(Path f,
} else {
instrumentation.directoryDeleted();
}
deleteObject(key);
incrementWriteOperations();
getStore().deleteObject(getRequestFactory()
.newDeleteObjectRequestBuilder(key)
.build());
}

/**
Expand Down Expand Up @@ -3576,7 +3589,8 @@ protected boolean deleteWithoutCloseCheck(Path f, boolean recursive) throws IOEx
recursive,
new OperationCallbacksImpl(storeContext),
pageSize,
dirOperationsPurgeUploads));
dirOperationsPurgeUploads,
deleteNonEmptyDirectoryEnabled));
if (outcome) {
try {
maybeCreateFakeParentDirectory(path);
Expand Down Expand Up @@ -5412,6 +5426,9 @@ public boolean hasPathCapability(final Path path, final String capability)
case DIRECTORY_OPERATIONS_PURGE_UPLOADS:
return dirOperationsPurgeUploads;

case DELETE_NON_EMPTY_DIRECTORY_ENABLED:
return deleteNonEmptyDirectoryEnabled;

// this is a v2 sdk release.
case STORE_CAPABILITY_AWS_V2:
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import org.slf4j.Logger;
Expand Down Expand Up @@ -82,6 +85,8 @@
import static org.apache.hadoop.fs.s3a.Constants.USER_AGENT_PREFIX;
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_HEADERS_S3;
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_HEADERS_STS;
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_REQUEST_HEADERS_S3_PREFIX;
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_REQUEST_HEADERS_STS_PREFIX;
import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.enforceMinimumDuration;
import static org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.getDuration;
import static org.apache.hadoop.util.Preconditions.checkArgument;
Expand Down Expand Up @@ -429,35 +434,74 @@ private static void initSigner(Configuration conf,
private static void initRequestHeaders(Configuration conf,
ClientOverrideConfiguration.Builder clientConfig, String awsServiceIdentifier) {
String configKey = null;
String configKeyPrefix = null;
switch (awsServiceIdentifier) {
case AWS_SERVICE_IDENTIFIER_S3:
configKey = CUSTOM_HEADERS_S3;
configKeyPrefix = CUSTOM_REQUEST_HEADERS_S3_PREFIX;
break;
case AWS_SERVICE_IDENTIFIER_STS:
configKey = CUSTOM_HEADERS_STS;
configKeyPrefix = CUSTOM_REQUEST_HEADERS_STS_PREFIX;
break;
default:
// No known service.
}
if (configKey != null) {
Map<String, String> awsClientCustomHeadersMap =
S3AUtils.getTrimmedStringCollectionSplitByEquals(conf, configKey);
awsClientCustomHeadersMap.forEach((header, valueString) -> {
List<String> headerValues = Arrays.stream(valueString.split(";"))
.map(String::trim)
.filter(v -> !v.isEmpty())
.collect(Collectors.toList());
if (!headerValues.isEmpty()) {
clientConfig.putHeader(header, headerValues);
} else {
LOG.warn("Ignoring header '{}' for {} client because no values were provided",
header, awsServiceIdentifier);
}
});
applyHeaders(conf, configKey, clientConfig::putHeader,
(header) -> LOG.warn(
"Ignoring header '{}' for {} client because no values were provided",
header, awsServiceIdentifier));
LOG.debug("headers for {} client = {}", awsServiceIdentifier, clientConfig.headers());

String keyPrefix = configKeyPrefix;
Map<String, Map<String, List<String>>> requestHeaders = new HashMap<>();
Map<String, String> requestHeaderConfs = conf.getPropsWithPrefix(keyPrefix);
requestHeaderConfs.keySet().forEach((request) ->
applyHeaders(conf, keyPrefix + request,
(header, headerValues) ->
requestHeaders.computeIfAbsent(request, ignored -> new HashMap<>())
.put(header, headerValues),
(header) -> LOG.warn(
"Ignoring {} request header '{}' for {} client because "
+ "no values were provided",
request, header, awsServiceIdentifier)));
if (!requestHeaders.isEmpty()) {
clientConfig.addExecutionInterceptor(
new AddRequestHeaderInterceptor(requestHeaders));
requestHeaders.forEach((request, headers) ->
LOG.debug("{} request headers for {} client = {}",
request, awsServiceIdentifier, headers));
}
}
}

/**
* Parse the header configuration at the given key.
* @param conf hadoop configuration
* @param configKey configuration key
* @param apply callback for non-empty header values
* @param ignore callback when no values are supplied
*/
private static void applyHeaders(Configuration conf,
String configKey,
BiConsumer<String, List<String>> apply,
Consumer<String> ignore) {
Map<String, String> awsClientCustomHeadersMap =
S3AUtils.getTrimmedStringCollectionSplitByEquals(conf, configKey);
awsClientCustomHeadersMap.forEach((header, valueString) -> {
List<String> headerValues = Arrays.stream(valueString.split(";"))
.map(String::trim)
.filter(v -> !v.isEmpty())
.collect(Collectors.toList());
if (!headerValues.isEmpty()) {
apply.accept(header, headerValues);
} else {
ignore.accept(header);
}
});
}

/**
* Configures request timeout in the client configuration.
* This is independent of the timeouts set in the sync and async HTTP clients;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a.impl;

import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.function.Consumer;

import software.amazon.awssdk.awscore.AwsRequest;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.interceptor.Context;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;

/**
* Add request-specific headers to AWS requests before execution.
*/
public class AddRequestHeaderInterceptor implements ExecutionInterceptor {

private final Map<String, Consumer<AwsRequestOverrideConfiguration.Builder>>
appliers = new HashMap<>();

/**
* Create an interceptor which adds headers for specific request types.
* @param requestHeaders request-type to header mappings
*/
public AddRequestHeaderInterceptor(
Map<String, Map<String, List<String>>> requestHeaders) {
requestHeaders.forEach((request, headers) ->
appliers.put(request.toLowerCase(Locale.ROOT),
builder -> headers.forEach(builder::putHeader)));
}

@Override
public SdkRequest modifyRequest(
Context.ModifyRequest context,
ExecutionAttributes executionAttributes) {
if (!(context.request() instanceof AwsRequest)) {
return context.request();
}

AwsRequest request = (AwsRequest) context.request();
String requestName =
request.getClass().getSimpleName().toLowerCase(Locale.ROOT);
Consumer<AwsRequestOverrideConfiguration.Builder> applier =
appliers.get(requestName);

if (applier == null) {
return request;
}

AwsRequestOverrideConfiguration overrideConfiguration =
request.overrideConfiguration()
.map(AwsRequestOverrideConfiguration::toBuilder)
.orElseGet(AwsRequestOverrideConfiguration::builder)
.applyMutation(applier)
.build();
return request.toBuilder()
.overrideConfiguration(overrideConfiguration)
.build();
}
}
Loading
Loading