2020
2121import java .io .IOException ;
2222import java .net .URI ;
23- import java .net .URISyntaxException ;
24- import java .util .regex .Matcher ;
25- import java .util .regex .Pattern ;
2623
27- import org .apache .hadoop .classification .VisibleForTesting ;
28- import org .apache .hadoop .fs .s3a .impl .AWSClientConfig ;
2924import org .slf4j .Logger ;
3025import org .slf4j .LoggerFactory ;
3126
32- import software .amazon .awssdk .awscore .util .AwsHostNameUtils ;
3327import software .amazon .awssdk .core .checksums .RequestChecksumCalculation ;
3428import software .amazon .awssdk .core .checksums .ResponseChecksumValidation ;
3529import software .amazon .awssdk .core .client .config .ClientOverrideConfiguration ;
5751import org .apache .hadoop .classification .InterfaceStability ;
5852import org .apache .hadoop .conf .Configuration ;
5953import org .apache .hadoop .conf .Configured ;
54+ import org .apache .hadoop .fs .s3a .impl .AWSClientConfig ;
55+ import org .apache .hadoop .fs .s3a .impl .RegionResolution ;
6056import org .apache .hadoop .fs .s3a .statistics .impl .AwsStatisticsCollector ;
6157import org .apache .hadoop .fs .store .LogExactlyOnce ;
6258
63- import static org .apache .hadoop .fs .s3a .Constants .AWS_REGION ;
6459import static org .apache .hadoop .fs .s3a .Constants .AWS_S3_ACCESS_GRANTS_ENABLED ;
6560import static org .apache .hadoop .fs .s3a .Constants .AWS_S3_ACCESS_GRANTS_FALLBACK_TO_IAM_ENABLED ;
66- import static org .apache .hadoop .fs .s3a .Constants .AWS_S3_CROSS_REGION_ACCESS_ENABLED ;
67- import static org .apache .hadoop .fs .s3a .Constants .AWS_S3_CROSS_REGION_ACCESS_ENABLED_DEFAULT ;
68- import static org .apache .hadoop .fs .s3a .Constants .AWS_S3_DEFAULT_REGION ;
69- import static org .apache .hadoop .fs .s3a .Constants .CENTRAL_ENDPOINT ;
70- import static org .apache .hadoop .fs .s3a .Constants .FIPS_ENDPOINT ;
7161import static org .apache .hadoop .fs .s3a .Constants .HTTP_SIGNER_CLASS_NAME ;
7262import static org .apache .hadoop .fs .s3a .Constants .HTTP_SIGNER_ENABLED ;
7363import static org .apache .hadoop .fs .s3a .Constants .HTTP_SIGNER_ENABLED_DEFAULT ;
7767import static org .apache .hadoop .fs .s3a .auth .SignerFactory .createHttpSigner ;
7868import static org .apache .hadoop .fs .s3a .impl .AWSHeaders .REQUESTER_PAYS_HEADER ;
7969import static org .apache .hadoop .fs .s3a .impl .InternalConstants .AUTH_SCHEME_AWS_SIGV_4 ;
80- import static org .apache .hadoop .util . Preconditions . checkArgument ;
70+ import static org .apache .hadoop .fs . s3a . impl . RegionResolution . calculateRegion ;
8171
8272
8373/**
@@ -92,41 +82,12 @@ public class DefaultS3ClientFactory extends Configured
9282
9383 private static final String REQUESTER_PAYS_HEADER_VALUE = "requester" ;
9484
95- private static final String S3_SERVICE_NAME = "s3" ;
96-
97- private static final Pattern VPC_ENDPOINT_PATTERN =
98- Pattern .compile ("^(?:.+\\ .)?([a-z0-9-]+)\\ .vpce\\ .amazonaws\\ .(?:com|com\\ .cn)$" );
99-
10085 /**
10186 * Subclasses refer to this.
10287 */
10388 protected static final Logger LOG =
10489 LoggerFactory .getLogger (DefaultS3ClientFactory .class );
10590
106- /**
107- * A one-off warning of default region chains in use.
108- */
109- private static final LogExactlyOnce WARN_OF_DEFAULT_REGION_CHAIN =
110- new LogExactlyOnce (LOG );
111-
112- /**
113- * Warning message printed when the SDK Region chain is in use.
114- */
115- private static final String SDK_REGION_CHAIN_IN_USE =
116- "S3A filesystem client is using"
117- + " the SDK region resolution chain." ;
118-
119-
120- /** Exactly once log to inform about ignoring the AWS-SDK Warnings for CSE. */
121- private static final LogExactlyOnce IGNORE_CSE_WARN = new LogExactlyOnce (LOG );
122-
123- /**
124- * Error message when an endpoint is set with FIPS enabled: {@value}.
125- */
126- @ VisibleForTesting
127- public static final String ERROR_ENDPOINT_WITH_FIPS =
128- "Non central endpoint cannot be set when " + FIPS_ENDPOINT + " is true" ;
129-
13091 /**
13192 * A one-off log stating whether S3 Access Grants are enabled.
13293 */
@@ -329,152 +290,39 @@ protected ClientOverrideConfiguration.Builder createClientOverrideConfiguration(
329290 */
330291 private <BuilderT extends S3BaseClientBuilder <BuilderT , ClientT >, ClientT > void configureEndpointAndRegion (
331292 BuilderT builder , S3ClientCreationParameters parameters , Configuration conf ) {
332- final String endpointStr = parameters .getEndpoint ();
333- final URI endpoint = getS3Endpoint (endpointStr , conf );
334-
335- final String configuredRegion = parameters .getRegion ();
336- Region region = null ;
337- String origin = "" ;
338293
339- // If the region was configured, set it.
340- if (configuredRegion != null && !configuredRegion .isEmpty ()) {
341- origin = AWS_REGION ;
342- region = Region .of (configuredRegion );
343- }
294+ final RegionResolution .Resolution resolution =
295+ calculateRegion (parameters , conf );
296+ LOG .debug ("Region Resolution: {}" , resolution );
344297
345- // FIPs? Log it, then reject any attempt to set an endpoint
346- final boolean fipsEnabled = parameters .isFipsEnabled ();
347- if (fipsEnabled ) {
348- LOG .debug ("Enabling FIPS mode" );
349- }
350- // always setting it guarantees the value is non-null,
298+ // always setting to true or false guarantees the value is non-null,
351299 // which tests expect.
352- builder .fipsEnabled (fipsEnabled );
353-
354- if (endpoint != null ) {
355- boolean endpointEndsWithCentral =
356- endpointStr .endsWith (CENTRAL_ENDPOINT );
357- checkArgument (!fipsEnabled || endpointEndsWithCentral , "%s : %s" ,
358- ERROR_ENDPOINT_WITH_FIPS ,
359- endpoint );
360-
361- // No region was configured,
362- // determine the region from the endpoint.
363- if (region == null ) {
364- region = getS3RegionFromEndpoint (endpointStr ,
365- endpointEndsWithCentral );
366- if (region != null ) {
367- origin = "endpoint" ;
368- }
369- }
370-
371- // No need to override endpoint with "s3.amazonaws.com".
372- // Let the client take care of endpoint resolution. Overriding
373- // the endpoint with "s3.amazonaws.com" causes 400 Bad Request
374- // errors for non-existent buckets and objects.
375- // ref: https://github.com/aws/aws-sdk-java-v2/issues/4846
376- if (!endpointEndsWithCentral ) {
377- builder .endpointOverride (endpoint );
378- LOG .debug ("Setting endpoint to {}" , endpoint );
379- } else {
380- origin = "central endpoint with cross region access" ;
381- LOG .debug ("Enabling cross region access for endpoint {}" ,
382- endpointStr );
383- }
384- }
385-
300+ builder .fipsEnabled (resolution .isUseFips ());
301+ final Region region = resolution .getRegion ();
386302 if (region != null ) {
387303 builder .region (region );
388- } else if (configuredRegion == null ) {
389- // no region is configured, and none could be determined from the endpoint.
390- // Use US_EAST_2 as default.
391- region = Region .of (AWS_S3_DEFAULT_REGION );
392- builder .region (region );
393- origin = "cross region access fallback" ;
394- } else if (configuredRegion .isEmpty ()) {
395- // region configuration was set to empty string.
396- // allow this if people really want it; it is OK to rely on this
397- // when deployed in EC2.
398- WARN_OF_DEFAULT_REGION_CHAIN .warn (SDK_REGION_CHAIN_IN_USE );
399- LOG .debug (SDK_REGION_CHAIN_IN_USE );
400- origin = "SDK region chain" ;
401304 }
402- boolean isCrossRegionAccessEnabled = conf .getBoolean (AWS_S3_CROSS_REGION_ACCESS_ENABLED ,
403- AWS_S3_CROSS_REGION_ACCESS_ENABLED_DEFAULT );
404305 // s3 cross region access
405- if (isCrossRegionAccessEnabled ) {
306+ if (resolution . isCrossRegionAccessEnabled () ) {
406307 builder .crossRegionAccessEnabled (true );
407308 }
408- LOG .debug ("Setting region to {} from {} with cross region access {}" ,
409- region , origin , isCrossRegionAccessEnabled );
309+ if (!resolution .isUseCentralEndpoint ()) {
310+ final URI endpointUri = resolution .getEndpointUri ();
311+ builder .endpointOverride (endpointUri );
312+ LOG .debug ("Setting endpoint to {}" , endpointUri );
313+ }
410314 }
411315
412316 /**
413317 * Given a endpoint string, create the endpoint URI.
414- *
318+ * <p>Kept in as subclasses use it.
415319 * @param endpoint possibly null endpoint.
416320 * @param conf config to build the URI from.
417321 * @return an endpoint uri
418322 */
419323 protected static URI getS3Endpoint (String endpoint , final Configuration conf ) {
420-
421324 boolean secureConnections = conf .getBoolean (SECURE_CONNECTIONS , DEFAULT_SECURE_CONNECTIONS );
422-
423- String protocol = secureConnections ? "https" : "http" ;
424-
425- if (endpoint == null || endpoint .isEmpty ()) {
426- // don't set an endpoint if none is configured, instead let the SDK figure it out.
427- return null ;
428- }
429-
430- if (!endpoint .contains ("://" )) {
431- endpoint = String .format ("%s://%s" , protocol , endpoint );
432- }
433-
434- try {
435- return new URI (endpoint );
436- } catch (URISyntaxException e ) {
437- throw new IllegalArgumentException (e );
438- }
439- }
440-
441- /**
442- * Parses the endpoint to get the region.
443- * If endpoint is the central one, use US_EAST_2.
444- *
445- * @param endpoint the configure endpoint.
446- * @param endpointEndsWithCentral true if the endpoint is configured as central.
447- * @return the S3 region, null if unable to resolve from endpoint.
448- */
449- @ VisibleForTesting
450- static Region getS3RegionFromEndpoint (final String endpoint ,
451- final boolean endpointEndsWithCentral ) {
452-
453- if (!endpointEndsWithCentral ) {
454- // S3 VPC endpoint parsing
455- Matcher matcher = VPC_ENDPOINT_PATTERN .matcher (endpoint );
456- if (matcher .find ()) {
457- LOG .debug ("Mapping to VPCE" );
458- LOG .debug ("Endpoint {} is vpc endpoint; parsing region as {}" , endpoint , matcher .group (1 ));
459- return Region .of (matcher .group (1 ));
460- }
461-
462- LOG .debug ("Endpoint {} is not the default; parsing" , endpoint );
463- return AwsHostNameUtils .parseSigningRegion (endpoint , S3_SERVICE_NAME ).orElse (null );
464- }
465-
466- // Select default region here to enable cross-region access.
467- // If both "fs.s3a.endpoint" and "fs.s3a.endpoint.region" are empty,
468- // Spark sets "fs.s3a.endpoint" to "s3.amazonaws.com".
469- // This applies to Spark versions with the changes of SPARK-35878.
470- // ref:
471- // https://github.com/apache/spark/blob/v3.5.0/core/
472- // src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala#L528
473- // If we do not allow cross region access, Spark would not be able to
474- // access any bucket that is not present in the given region.
475- // Hence, we should use default region us-east-2 to allow cross-region
476- // access.
477- return Region .of (AWS_S3_DEFAULT_REGION );
325+ return RegionResolution .buildEndpointUri (endpoint , secureConnections );
478326 }
479327
480328 private static <BuilderT extends S3BaseClientBuilder <BuilderT , ClientT >, ClientT > void
0 commit comments