Skip to content

Commit 401003a

Browse files
HADOOP-19672 Nework switchover when apache client throw error (#7967)
Contributed by Manish Bhatt.
1 parent baba94e commit 401003a

File tree

13 files changed

+243
-57
lines changed

13 files changed

+243
-57
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,10 @@ public class AbfsConfiguration{
203203
DefaultValue = DEFAULT_HTTP_READ_TIMEOUT)
204204
private int httpReadTimeout;
205205

206+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_EXPECT_100CONTINUE_WAIT_TIMEOUT,
207+
DefaultValue = DEFAULT_EXPECT_100CONTINUE_WAIT_TIMEOUT)
208+
private int expect100ContinueWaitTimeout;
209+
206210
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT,
207211
MinValue = 0,
208212
DefaultValue = DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS)
@@ -1033,6 +1037,10 @@ public int getHttpReadTimeout() {
10331037
return this.httpReadTimeout;
10341038
}
10351039

1040+
public int getExpect100ContinueWaitTimeout() {
1041+
return this.expect100ContinueWaitTimeout;
1042+
}
1043+
10361044
public long getAzureBlockSize() {
10371045
return this.azureBlockSize;
10381046
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,13 @@ public final class ConfigurationKeys {
9797
*/
9898
public static final String AZURE_HTTP_READ_TIMEOUT = "fs.azure.http.read.timeout";
9999

100+
/**
101+
* Config to set HTTP Expect100-Continue Wait Timeout Value for Rest Operations.
102+
* Value: {@value}.
103+
*/
104+
public static final String AZURE_EXPECT_100CONTINUE_WAIT_TIMEOUT
105+
= "fs.azure.http.expect.100continue.wait.timeout";
106+
100107
// Retry strategy for getToken calls
101108
public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_COUNT = "fs.azure.oauth.token.fetch.retry.max.retries";
102109
public static final String AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF = "fs.azure.oauth.token.fetch.retry.min.backoff.interval";

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,12 @@ public final class FileSystemConfigurations {
5858
*/
5959
public static final int DEFAULT_HTTP_READ_TIMEOUT = 30_000; // 30 secs
6060

61+
/**
62+
* Default value of connection request timeout to be used when 100continue is enabled.
63+
* Value: {@value}.
64+
*/
65+
public static final int DEFAULT_EXPECT_100CONTINUE_WAIT_TIMEOUT = 3_000; // 3s
66+
6167
// Retry parameter defaults.
6268
public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MAX_ATTEMPTS = 5;
6369
public static final int DEFAULT_AZURE_OAUTH_TOKEN_FETCH_RETRY_MIN_BACKOFF_INTERVAL = 0;
@@ -214,7 +220,7 @@ public final class FileSystemConfigurations {
214220
public static final long THOUSAND = 1000L;
215221

216222
public static final HttpOperationType DEFAULT_NETWORKING_LIBRARY
217-
= HttpOperationType.JDK_HTTP_URL_CONNECTION;
223+
= HttpOperationType.APACHE_HTTP_CLIENT;
218224

219225
public static final int DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES = 3;
220226

@@ -228,7 +234,7 @@ public final class FileSystemConfigurations {
228234

229235
public static final int MAX_APACHE_HTTP_CLIENT_CACHE_WARMUP_COUNT = 5;
230236

231-
public static final int DEFAULT_APACHE_HTTP_CLIENT_CACHE_REFRESH_COUNT = 5;
237+
public static final int DEFAULT_APACHE_HTTP_CLIENT_CACHE_REFRESH_COUNT = 3;
232238

233239
public static final int MAX_APACHE_HTTP_CLIENT_CACHE_REFRESH_COUNT = 5;
234240

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsApacheHttpClient.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,13 @@ static void registerFallback() {
6565
usable = false;
6666
}
6767

68+
/**
69+
* In case, getting success response from apache client, sets the usable flag to true.
70+
*/
71+
static void setUsable() {
72+
usable = true;
73+
}
74+
6875
/**
6976
* @return if ApacheHttpClient is usable.
7077
*/
@@ -73,18 +80,27 @@ static boolean usable() {
7380
}
7481

7582
AbfsApacheHttpClient(DelegatingSSLSocketFactory delegatingSSLSocketFactory,
76-
final AbfsConfiguration abfsConfiguration, final KeepAliveCache keepAliveCache,
77-
URL baseUrl) {
83+
final AbfsConfiguration abfsConfiguration,
84+
final KeepAliveCache keepAliveCache,
85+
URL baseUrl,
86+
final boolean isCacheWarmupNeeded) {
7887
final AbfsConnectionManager connMgr = new AbfsConnectionManager(
7988
createSocketFactoryRegistry(
8089
new SSLConnectionSocketFactory(delegatingSSLSocketFactory,
8190
getDefaultHostnameVerifier())),
8291
new AbfsHttpClientConnectionFactory(), keepAliveCache,
83-
abfsConfiguration, baseUrl);
92+
abfsConfiguration, baseUrl, isCacheWarmupNeeded);
8493
final HttpClientBuilder builder = HttpClients.custom();
8594
builder.setConnectionManager(connMgr)
8695
.setRequestExecutor(
87-
new AbfsManagedHttpRequestExecutor(abfsConfiguration.getHttpReadTimeout()))
96+
// In case of Expect:100-continue, the timeout for waiting for
97+
// the 100-continue response from the server is set using
98+
// ExpectWaitContinueTimeout. For other requests, the read timeout
99+
// is set using SocketTimeout.
100+
new AbfsManagedHttpRequestExecutor(
101+
abfsConfiguration.isExpectHeaderEnabled()
102+
? abfsConfiguration.getExpect100ContinueWaitTimeout()
103+
: abfsConfiguration.getHttpReadTimeout()))
88104
.disableContentCompression()
89105
.disableRedirectHandling()
90106
.disableAutomaticRetries()

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
6060
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
6161
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion;
62+
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
6263
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
6364
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
6465
import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
@@ -188,7 +189,7 @@ public AbfsBlobClient(final URL baseUrl,
188189
final EncryptionContextProvider encryptionContextProvider,
189190
final AbfsClientContext abfsClientContext) throws IOException {
190191
super(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider,
191-
encryptionContextProvider, abfsClientContext);
192+
encryptionContextProvider, abfsClientContext, AbfsServiceType.BLOB);
192193
this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
193194
abfsConfiguration.getAzureAtomicRenameDirs()
194195
.split(AbfsHttpConstants.COMMA)));
@@ -201,7 +202,7 @@ public AbfsBlobClient(final URL baseUrl,
201202
final EncryptionContextProvider encryptionContextProvider,
202203
final AbfsClientContext abfsClientContext) throws IOException {
203204
super(baseUrl, sharedKeyCredentials, abfsConfiguration, sasTokenProvider,
204-
encryptionContextProvider, abfsClientContext);
205+
encryptionContextProvider, abfsClientContext, AbfsServiceType.BLOB);
205206
this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
206207
abfsConfiguration.getAzureAtomicRenameDirs()
207208
.split(AbfsHttpConstants.COMMA)));

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.Permissions;
5959
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
6060
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion;
61+
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
6162
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
6263
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
6364
import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType;
@@ -199,6 +200,8 @@ public abstract class AbfsClient implements Closeable {
199200

200201
private AbfsApacheHttpClient abfsApacheHttpClient;
201202

203+
private AbfsServiceType abfsServiceType;
204+
202205
/**
203206
* logging the rename failure if metadata is in an incomplete state.
204207
*/
@@ -208,7 +211,8 @@ private AbfsClient(final URL baseUrl,
208211
final SharedKeyCredentials sharedKeyCredentials,
209212
final AbfsConfiguration abfsConfiguration,
210213
final EncryptionContextProvider encryptionContextProvider,
211-
final AbfsClientContext abfsClientContext) throws IOException {
214+
final AbfsClientContext abfsClientContext,
215+
final AbfsServiceType abfsServiceType) throws IOException {
212216
this.baseUrl = baseUrl;
213217
this.sharedKeyCredentials = sharedKeyCredentials;
214218
String baseUrlString = baseUrl.toString();
@@ -220,6 +224,7 @@ private AbfsClient(final URL baseUrl,
220224
this.authType = abfsConfiguration.getAuthType(accountName);
221225
this.intercept = AbfsThrottlingInterceptFactory.getInstance(accountName, abfsConfiguration);
222226
this.renameResilience = abfsConfiguration.getRenameResilience();
227+
this.abfsServiceType = abfsServiceType;
223228

224229
if (encryptionContextProvider != null) {
225230
this.encryptionContextProvider = encryptionContextProvider;
@@ -252,9 +257,13 @@ private AbfsClient(final URL baseUrl,
252257
== HttpOperationType.APACHE_HTTP_CLIENT) {
253258
keepAliveCache = new KeepAliveCache(abfsConfiguration);
254259

260+
// Warm up the connection pool during client initialization to avoid latency during first request.
261+
// Since for every filesystem instance, we create both DFS and Blob client instance,
262+
// so warmup is done only for the default client.
255263
abfsApacheHttpClient = new AbfsApacheHttpClient(
256264
DelegatingSSLSocketFactory.getDefaultFactory(),
257-
abfsConfiguration, keepAliveCache, baseUrl);
265+
abfsConfiguration, keepAliveCache, baseUrl,
266+
abfsConfiguration.getFsConfiguredServiceType() == abfsServiceType);
258267
}
259268

260269
this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
@@ -328,25 +337,29 @@ private AbfsClient(final URL baseUrl,
328337
LOG.trace("primaryUserGroup is {}", this.primaryUserGroup);
329338
}
330339

331-
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
332-
final AbfsConfiguration abfsConfiguration,
333-
final AccessTokenProvider tokenProvider,
334-
final EncryptionContextProvider encryptionContextProvider,
335-
final AbfsClientContext abfsClientContext)
340+
public AbfsClient(final URL baseUrl,
341+
final SharedKeyCredentials sharedKeyCredentials,
342+
final AbfsConfiguration abfsConfiguration,
343+
final AccessTokenProvider tokenProvider,
344+
final EncryptionContextProvider encryptionContextProvider,
345+
final AbfsClientContext abfsClientContext,
346+
final AbfsServiceType abfsServiceType)
336347
throws IOException {
337348
this(baseUrl, sharedKeyCredentials, abfsConfiguration,
338-
encryptionContextProvider, abfsClientContext);
349+
encryptionContextProvider, abfsClientContext, abfsServiceType);
339350
this.tokenProvider = tokenProvider;
340351
}
341352

342-
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
343-
final AbfsConfiguration abfsConfiguration,
344-
final SASTokenProvider sasTokenProvider,
345-
final EncryptionContextProvider encryptionContextProvider,
346-
final AbfsClientContext abfsClientContext)
353+
public AbfsClient(final URL baseUrl,
354+
final SharedKeyCredentials sharedKeyCredentials,
355+
final AbfsConfiguration abfsConfiguration,
356+
final SASTokenProvider sasTokenProvider,
357+
final EncryptionContextProvider encryptionContextProvider,
358+
final AbfsClientContext abfsClientContext,
359+
final AbfsServiceType abfsServiceType)
347360
throws IOException {
348361
this(baseUrl, sharedKeyCredentials, abfsConfiguration,
349-
encryptionContextProvider, abfsClientContext);
362+
encryptionContextProvider, abfsClientContext, abfsServiceType);
350363
this.sasTokenProvider = sasTokenProvider;
351364
}
352365

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientHandler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,16 @@ public AbfsClientHandler(final URL baseUrl,
6868
final SASTokenProvider sasTokenProvider,
6969
final EncryptionContextProvider encryptionContextProvider,
7070
final AbfsClientContext abfsClientContext) throws IOException {
71+
// This will initialize the default and ingress service types.
72+
// This is needed before creating the clients so that we can do cache warmup
73+
// only for default client.
74+
initServiceType(abfsConfiguration);
7175
this.dfsAbfsClient = createDfsClient(baseUrl, sharedKeyCredentials,
7276
abfsConfiguration, null, sasTokenProvider, encryptionContextProvider,
7377
abfsClientContext);
7478
this.blobAbfsClient = createBlobClient(baseUrl, sharedKeyCredentials,
7579
abfsConfiguration, null, sasTokenProvider, encryptionContextProvider,
7680
abfsClientContext);
77-
initServiceType(abfsConfiguration);
7881
}
7982

8083
/**

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsConnectionManager.java

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.UUID;
2424
import java.util.concurrent.RejectedExecutionException;
2525
import java.util.concurrent.atomic.AtomicBoolean;
26+
import java.util.concurrent.atomic.AtomicInteger;
2627
import java.util.concurrent.CountDownLatch;
2728
import java.util.concurrent.ExecutionException;
2829
import java.util.concurrent.TimeUnit;
@@ -91,26 +92,34 @@ class AbfsConnectionManager implements HttpClientConnectionManager {
9192
/**
9293
* The base host for which connections are managed.
9394
*/
94-
private HttpHost baseHost;
95+
private final HttpHost baseHost;
9596

9697
AbfsConnectionManager(Registry<ConnectionSocketFactory> socketFactoryRegistry,
97-
AbfsHttpClientConnectionFactory connectionFactory, KeepAliveCache kac,
98-
final AbfsConfiguration abfsConfiguration, final URL baseUrl) {
98+
AbfsHttpClientConnectionFactory connectionFactory,
99+
KeepAliveCache kac,
100+
final AbfsConfiguration abfsConfiguration,
101+
final URL baseUrl,
102+
final boolean isCacheWarmupNeeded) {
99103
this.httpConnectionFactory = connectionFactory;
100104
this.kac = kac;
101105
this.connectionOperator = new DefaultHttpClientConnectionOperator(
102106
socketFactoryRegistry, null, null);
103107
this.abfsConfiguration = abfsConfiguration;
104-
if (abfsConfiguration.getApacheCacheWarmupCount() > 0
108+
this.baseHost = new HttpHost(baseUrl.getHost(),
109+
baseUrl.getDefaultPort(), baseUrl.getProtocol());
110+
if (isCacheWarmupNeeded && abfsConfiguration.getApacheCacheWarmupCount() > 0
105111
&& kac.getFixedThreadPool() != null) {
106112
// Warm up the cache with connections.
107113
LOG.debug("Warming up the KeepAliveCache with {} connections",
108114
abfsConfiguration.getApacheCacheWarmupCount());
109-
this.baseHost = new HttpHost(baseUrl.getHost(),
110-
baseUrl.getDefaultPort(), baseUrl.getProtocol());
111115
HttpRoute route = new HttpRoute(baseHost, null, true);
112-
cacheExtraConnection(route,
116+
int totalConnectionsCreated = cacheExtraConnection(route,
113117
abfsConfiguration.getApacheCacheWarmupCount());
118+
if (totalConnectionsCreated == 0) {
119+
AbfsApacheHttpClient.registerFallback();
120+
} else {
121+
AbfsApacheHttpClient.setUsable();
122+
}
114123
}
115124
}
116125

@@ -271,7 +280,7 @@ public void connect(final HttpClientConnection conn,
271280
LOG.debug("Connection established: {}", conn);
272281
if (context instanceof AbfsManagedHttpClientContext) {
273282
((AbfsManagedHttpClientContext) context).setConnectTime(
274-
TimeUnit.MILLISECONDS.toMillis(System.nanoTime() - start));
283+
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
275284
}
276285
}
277286

@@ -318,8 +327,9 @@ public void shutdown() {
318327
* @param route the HTTP route for which connections are created
319328
* @param numberOfConnections the number of connections to create
320329
*/
321-
private void cacheExtraConnection(final HttpRoute route,
330+
private int cacheExtraConnection(final HttpRoute route,
322331
final int numberOfConnections) {
332+
AtomicInteger totalConnectionCreated = new AtomicInteger(0);
323333
if (!isCacheRefreshInProgress.getAndSet(true)) {
324334
long start = System.nanoTime();
325335
CountDownLatch latch = new CountDownLatch(numberOfConnections);
@@ -333,6 +343,7 @@ private void cacheExtraConnection(final HttpRoute route,
333343
connect(conn, route, abfsConfiguration.getHttpConnectionTimeout(),
334344
new AbfsManagedHttpClientContext());
335345
addConnectionToCache(conn);
346+
totalConnectionCreated.incrementAndGet();
336347
} catch (Exception e) {
337348
LOG.debug("Error creating connection: {}", e.getMessage());
338349
if (conn != null) {
@@ -350,7 +361,7 @@ private void cacheExtraConnection(final HttpRoute route,
350361
} catch (RejectedExecutionException e) {
351362
LOG.debug("Task rejected for connection creation: {}",
352363
e.getMessage());
353-
return;
364+
return -1;
354365
}
355366
}
356367

@@ -370,6 +381,7 @@ private void cacheExtraConnection(final HttpRoute route,
370381
elapsedTimeMillis(start));
371382
}
372383
}
384+
return totalConnectionCreated.get();
373385
}
374386

375387
/**
@@ -383,10 +395,10 @@ private void addConnectionToCache(HttpClientConnection conn) {
383395
if (((AbfsManagedApacheHttpConnection) conn).getTargetHost()
384396
.equals(baseHost)) {
385397
boolean connAddedInKac = kac.add(conn);
386-
synchronized (connectionLock) {
387-
connectionLock.notify(); // wake up one thread only
388-
}
389398
if (connAddedInKac) {
399+
synchronized (connectionLock) {
400+
connectionLock.notify(); // wake up one thread only
401+
}
390402
LOG.debug("Connection cached: {}", conn);
391403
} else {
392404
LOG.debug("Connection not cached, and is released: {}", conn);

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
5454
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
5555
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ApiVersion;
56+
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
5657
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
5758
import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
5859
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
@@ -160,7 +161,7 @@ public AbfsDfsClient(final URL baseUrl,
160161
final EncryptionContextProvider encryptionContextProvider,
161162
final AbfsClientContext abfsClientContext) throws IOException {
162163
super(baseUrl, sharedKeyCredentials, abfsConfiguration, tokenProvider,
163-
encryptionContextProvider, abfsClientContext);
164+
encryptionContextProvider, abfsClientContext, AbfsServiceType.DFS);
164165
}
165166

166167
public AbfsDfsClient(final URL baseUrl,
@@ -170,7 +171,7 @@ public AbfsDfsClient(final URL baseUrl,
170171
final EncryptionContextProvider encryptionContextProvider,
171172
final AbfsClientContext abfsClientContext) throws IOException {
172173
super(baseUrl, sharedKeyCredentials, abfsConfiguration, sasTokenProvider,
173-
encryptionContextProvider, abfsClientContext);
174+
encryptionContextProvider, abfsClientContext, AbfsServiceType.DFS);
174175
}
175176

176177
/**

hadoop-tools/hadoop-azure/src/site/markdown/index.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -890,8 +890,8 @@ ABFS Driver can use the following networking libraries:
890890
The networking library can be configured using the configuration `fs.azure.networking.library`
891891
while initializing the filesystem.
892892
Following are the supported values:
893-
- `JDK_HTTP_URL_CONNECTION` : Use JDK networking library [Default]
894-
- `APACHE_HTTP_CLIENT` : Use Apache HttpClient
893+
- `JDK_HTTP_URL_CONNECTION` : Use JDK networking library
894+
- `APACHE_HTTP_CLIENT` : Use Apache HttpClient [Default]
895895

896896
#### <a href="ahc_networking_conf"></a>ApacheHttpClient networking layer configuration Options:
897897

0 commit comments

Comments
 (0)