diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 0a32a0507755f6..d615f8cc4fb4b9 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -189,6 +189,11 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_MAX_RETRY_ATTEMPTS) private int maxIoRetries; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_MAX_IO_PREFETCH_RETRIES, + MinValue = 0, + DefaultValue = DEFAULT_PREFETCH_MAX_RETRY_ATTEMPTS) + private int maxIoPrefetchRetries; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CUSTOM_TOKEN_FETCH_RETRY_COUNT, MinValue = 0, DefaultValue = DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT) @@ -1142,6 +1147,10 @@ public int getMaxIoRetries() { return this.maxIoRetries; } + public int getPrefetchMaxIoRetries() { + return this.maxIoPrefetchRetries; + } + public int getCustomTokenFetchRetryCount() { return this.customTokenFetchRetryCount; } @@ -1977,6 +1986,11 @@ public void setMaxIoRetries(int maxIoRetries) { this.maxIoRetries = maxIoRetries; } + @VisibleForTesting + public void setMaxIoPrefetchRetries(int maxIoPrefetchRetries) { + this.maxIoPrefetchRetries = maxIoPrefetchRetries; + } + @VisibleForTesting void setMaxBackoffIntervalMilliseconds(int maxBackoffInterval) { this.maxBackoffInterval = maxBackoffInterval; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 6656eabddd49ab..a2afb0bc955a91 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -1923,6 +1923,8 @@ private AbfsClientContext populateAbfsClientContext() { return new AbfsClientContextBuilder() .withExponentialRetryPolicy( new ExponentialRetryPolicy(abfsConfiguration)) + .withPrefetchExponentialRetryPolicy( + ExponentialRetryPolicy.forPrefetch(abfsConfiguration)) .withStaticRetryPolicy( new StaticRetryPolicy(abfsConfiguration)) .withTailLatencyRequestTimeoutRetryPolicy( diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index d2dbe17650820e..820e4946c7eed6 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -136,6 +136,7 @@ public final class ConfigurationKeys { public static final String AZURE_STATIC_RETRY_INTERVAL = "fs.azure.static.retry.interval"; public static final String AZURE_BACKOFF_INTERVAL = "fs.azure.io.retry.backoff.interval"; public static final String AZURE_MAX_IO_RETRIES = "fs.azure.io.retry.max.retries"; + public static final String AZURE_MAX_IO_PREFETCH_RETRIES = "fs.azure.io.prefetch.retry.max.retries"; public static final String AZURE_CUSTOM_TOKEN_FETCH_RETRY_COUNT = "fs.azure.custom.token.fetch.retry.count"; /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 328c878401c65a..b26053f8ff4830 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -51,6 +51,7 @@ public final class FileSystemConfigurations { public static final int DEFAULT_STATIC_RETRY_INTERVAL = 1_000; // 1s public static final int DEFAULT_BACKOFF_INTERVAL = 500; // 500ms public static final int DEFAULT_MAX_RETRY_ATTEMPTS = 30; + public static final int DEFAULT_PREFETCH_MAX_RETRY_ATTEMPTS = 2; public static final int DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT = 3; /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index a781e12c86a423..cbff2ced2e675e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -158,6 +158,7 @@ public abstract class AbfsClient implements Closeable { private final SharedKeyCredentials sharedKeyCredentials; private ApiVersion xMsVersion = ApiVersion.getCurrentVersion(); private final ExponentialRetryPolicy exponentialRetryPolicy; + private final ExponentialRetryPolicy prefetchRetryPolicy; private final StaticRetryPolicy staticRetryPolicy; private final TailLatencyRequestTimeoutRetryPolicy tailLatencyRequestTimeoutRetryPolicy; private final String filesystem; @@ -212,6 +213,7 @@ private AbfsClient(final URL baseUrl, this.filesystem = baseUrlString.substring(indexLastForwardSlash + 1); this.abfsConfiguration = abfsConfiguration; this.exponentialRetryPolicy = abfsClientContext.getExponentialRetryPolicy(); + this.prefetchRetryPolicy = abfsClientContext.getPrefetchExponentialRetryPolicy(); this.staticRetryPolicy = abfsClientContext.getStaticRetryPolicy(); this.tailLatencyRequestTimeoutRetryPolicy = abfsClientContext.getTailLatencyRequestTimeoutRetryPolicy(); this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT)); @@ -374,6 +376,10 @@ ExponentialRetryPolicy getExponentialRetryPolicy() { return exponentialRetryPolicy; } + ExponentialRetryPolicy getPrefetchRetryPolicy() { + return prefetchRetryPolicy; + } + StaticRetryPolicy getStaticRetryPolicy() { return staticRetryPolicy; } @@ -397,6 +403,22 @@ && getAbfsConfiguration().getStaticRetryForConnectionTimeoutEnabled()) { return getExponentialRetryPolicy(); } + /** + * Returns the retry policy based on tracing context and failure reason. + * @param tc the tracing context containing read type information. + * @param failureReason the reason for the failure to determine retry policy. + * @return retry policy to be used, prefetch policy if enabled and prefetch read, otherwise standard policy. + */ + public AbfsRetryPolicy getRetryPolicy(TracingContext tc, final String failureReason){ + if (getAbfsConfiguration().isEnablePrefetchRequestPriority() + && ReadType.PREFETCH_READ.equals(tc.getReadType())){ + return getPrefetchRetryPolicy(); + } + else { + return getRetryPolicy(failureReason); + } + } + SharedKeyCredentials getSharedKeyCredentials() { return sharedKeyCredentials; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContext.java index 27b2d5996e02ea..f89a0317550d89 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContext.java @@ -25,6 +25,7 @@ public class AbfsClientContext { private final ExponentialRetryPolicy exponentialRetryPolicy; + private final ExponentialRetryPolicy prefetchExponentialRetryPolicy; private final StaticRetryPolicy staticRetryPolicy; private final TailLatencyRequestTimeoutRetryPolicy tailLatencyRequestTimeoutRetryPolicy; private final AbfsPerfTracker abfsPerfTracker; @@ -33,11 +34,13 @@ public class AbfsClientContext { AbfsClientContext( ExponentialRetryPolicy exponentialRetryPolicy, + ExponentialRetryPolicy prefetchExponentialRetryPolicy, StaticRetryPolicy staticRetryPolicy, TailLatencyRequestTimeoutRetryPolicy tailLatencyRequestTimeoutRetryPolicy, AbfsPerfTracker abfsPerfTracker, AbfsCounters abfsCounters, String fileSystemId) { this.exponentialRetryPolicy = exponentialRetryPolicy; + this.prefetchExponentialRetryPolicy = prefetchExponentialRetryPolicy; this.staticRetryPolicy = staticRetryPolicy; this.tailLatencyRequestTimeoutRetryPolicy = tailLatencyRequestTimeoutRetryPolicy; this.abfsPerfTracker = abfsPerfTracker; @@ -49,6 +52,10 @@ public ExponentialRetryPolicy getExponentialRetryPolicy() { return exponentialRetryPolicy; } + public ExponentialRetryPolicy getPrefetchExponentialRetryPolicy() { + return prefetchExponentialRetryPolicy; + } + public StaticRetryPolicy getStaticRetryPolicy() { return staticRetryPolicy; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContextBuilder.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContextBuilder.java index 5a175d27263017..3c5501607b1367 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContextBuilder.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContextBuilder.java @@ -25,6 +25,7 @@ public class AbfsClientContextBuilder { private ExponentialRetryPolicy exponentialRetryPolicy; + private ExponentialRetryPolicy prefetchExponentialRetryPolicy; private StaticRetryPolicy staticRetryPolicy; private TailLatencyRequestTimeoutRetryPolicy tailLatencyRequestTimeoutRetryPolicy; private AbfsPerfTracker abfsPerfTracker; @@ -37,6 +38,12 @@ public AbfsClientContextBuilder withExponentialRetryPolicy( return this; } + public AbfsClientContextBuilder withPrefetchExponentialRetryPolicy( + final ExponentialRetryPolicy prefetchExponentialRetryPolicy) { + this.prefetchExponentialRetryPolicy = prefetchExponentialRetryPolicy; + return this; + } + public AbfsClientContextBuilder withStaticRetryPolicy( final StaticRetryPolicy staticRetryPolicy) { this.staticRetryPolicy = staticRetryPolicy; @@ -74,6 +81,7 @@ public AbfsClientContext build() { //validate the values return new AbfsClientContext( exponentialRetryPolicy, + prefetchExponentialRetryPolicy, staticRetryPolicy, tailLatencyRequestTimeoutRetryPolicy, abfsPerfTracker, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index eba5b09337775e..e9ad01aaae0914 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -551,6 +551,8 @@ protected int readInternal(final long position, final byte[] b, final int offset nextSize = min((long) readAheadBlockSize, contentLength - nextOffset); } + getReadBufferManager().setPrefetchRequestPriorityEnabled(client.getAbfsConfiguration().isEnablePrefetchRequestPriority()); + // try reading from buffers first receivedBytes = getReadBufferManager().getBlock(this, position, length, b); bytesFromReadAhead += receivedBytes; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index 1b55084fb45717..82dd8ae35f1ae0 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -484,7 +484,7 @@ private boolean executeHttpOperation(final int retryCount, int status = httpOperation.getStatusCode(); failureReason = RetryReason.getAbbreviation(null, status, httpOperation.getStorageErrorMessage()); - retryPolicy = client.getRetryPolicy(failureReason); + retryPolicy = client.getRetryPolicy(tracingContext, failureReason); if (retryPolicy.shouldRetry(retryCount, httpOperation.getStatusCode())) { return false; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java index f1f2bc8be346f0..3e690297dd868f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java @@ -94,6 +94,21 @@ public ExponentialRetryPolicy(AbfsConfiguration conf) { conf.getBackoffIntervalMilliseconds()); } + /** + * Creates an {@link ExponentialRetryPolicy} configured with prefetch retry + * settings read from the given {@link AbfsConfiguration}. + * + * @param conf The {@link AbfsConfiguration} from which to retrieve prefetch retry configuration. + * @return a new {@link ExponentialRetryPolicy} for prefetch operations. + */ + public static ExponentialRetryPolicy forPrefetch(AbfsConfiguration conf) { + return new ExponentialRetryPolicy( + conf.getPrefetchMaxIoRetries(), + conf.getMinBackoffIntervalMilliseconds(), + conf.getMaxBackoffIntervalMilliseconds(), + conf.getBackoffIntervalMilliseconds()); + } + /** * Initializes a new instance of the {@link ExponentialRetryPolicy} class. * diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java index 5b53d641a20df8..e885ac09d56c09 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java @@ -47,6 +47,7 @@ public abstract class ReadBufferManager { private Queue readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet private LinkedList inProgressList = new LinkedList<>(); // requests being processed by worker threads private LinkedList completedReadList = new LinkedList<>(); // buffers available for reading + private boolean isPrefetchRequestPriorityEnabled; /** * Initializes the ReadBufferManager singleton instance. Creates the read buffers and threads. @@ -198,6 +199,15 @@ protected static void setReadAheadBlockSize(int readAheadBlockSize) { blockSize = readAheadBlockSize; } + /** + * Sets prefetch request priority according to the config set + * + * @param isEnabled true if its enabled, false otherwise + */ + protected void setPrefetchRequestPriorityEnabled(boolean isEnabled) { + isPrefetchRequestPriorityEnabled = isEnabled; + } + /** * Gets the queue of read-ahead requests. * @@ -274,6 +284,15 @@ int getCompletedReadListSize() { return completedReadList.size(); } + /** + * Checks if prefetch request priority is enabled. + * + * @return true if prefetch request priority is enabled, false otherwise + */ + boolean isPrefetchRequestPriorityEnabled() { + return isPrefetchRequestPriorityEnabled; + } + /** * Simulates full buffer usage and adds a failed buffer for testing. * diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java index c034d856596038..f2fdb6b17b9f47 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java @@ -485,7 +485,9 @@ private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long if (buf.getStatus() == ReadBufferStatus.READ_FAILED) { // To prevent new read requests to fail due to old read-ahead attempts, // return exception only from buffers that failed within last thresholdAgeMilliseconds - if ((currentTimeMillis() - (buf.getTimeStamp()) < getThresholdAgeMilliseconds())) { + if (!isPrefetchRequestPriorityEnabled() && ( + currentTimeMillis() - (buf.getTimeStamp()) + < getThresholdAgeMilliseconds())) { throw buf.getErrException(); } else { return 0; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java index c271a5e354acda..aa2641b49c3c2f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java @@ -709,7 +709,7 @@ private int getBlockFromCompletedQueue(final String eTag, final long position, if (buf.getStatus() == ReadBufferStatus.READ_FAILED) { // To prevent new read requests to fail due to old read-ahead attempts, // return exception only from buffers that failed within last getThresholdAgeMilliseconds() - if ((currentTimeMillis() - (buf.getTimeStamp()) + if (!isPrefetchRequestPriorityEnabled() && (currentTimeMillis() - (buf.getTimeStamp()) < getThresholdAgeMilliseconds())) { throw buf.getErrException(); } else { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java index cd1a2af7d6c59b..58b14ce496444d 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java @@ -386,6 +386,11 @@ public static void addGeneralMockBehaviourToAbfsClient(final AbfsClient abfsClie Mockito.doReturn(staticRetryPolicy).when(abfsClient).getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION); Mockito.doReturn(exponentialRetryPolicy).when(abfsClient).getRetryPolicy( AdditionalMatchers.not(eq(CONNECTION_TIMEOUT_ABBREVIATION))); + Mockito.doReturn(staticRetryPolicy).when(abfsClient).getRetryPolicy( + any(TracingContext.class), eq(CONNECTION_TIMEOUT_ABBREVIATION)); + Mockito.doReturn(exponentialRetryPolicy).when(abfsClient).getRetryPolicy( + any(TracingContext.class), + AdditionalMatchers.not(eq(CONNECTION_TIMEOUT_ABBREVIATION))); // Defining behavior of static retry policy Mockito.doReturn(true).when(staticRetryPolicy) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java index daa1bda78c2e79..a68ebb5262cf61 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java @@ -416,10 +416,12 @@ public static AbfsClient createTestClientFromCurrentContext( AbfsClientContext abfsClientContext = new AbfsClientContextBuilder().withAbfsPerfTracker(tracker) - .withExponentialRetryPolicy( - new ExponentialRetryPolicy(abfsConfig.getMaxIoRetries())) - .withAbfsCounters(abfsCounters) - .build(); + .withExponentialRetryPolicy( + new ExponentialRetryPolicy(abfsConfig.getMaxIoRetries())) + .withPrefetchExponentialRetryPolicy( + new ExponentialRetryPolicy(abfsConfig.getPrefetchMaxIoRetries())) + .withAbfsCounters(abfsCounters) + .build(); // Create test AbfsClient AbfsClient testClient; @@ -524,6 +526,8 @@ public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance, new ExponentialRetryPolicy(1)); when(client.getRetryPolicy(any())).thenReturn( new ExponentialRetryPolicy(1)); + when(client.getRetryPolicy(any(TracingContext.class), any())).thenReturn( + new ExponentialRetryPolicy(1)); when(client.createDefaultUriQueryBuilder()).thenCallRealMethod(); when(client.createRequestUrl(any(), any())).thenCallRealMethod(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestExponentialRetryPolicy.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestExponentialRetryPolicy.java index a2560dd6181eaf..06d4f1d12868cf 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestExponentialRetryPolicy.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestExponentialRetryPolicy.java @@ -23,10 +23,12 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DOT; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_BACKOFF_INTERVAL; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_BACKOFF_INTERVAL; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_IO_PREFETCH_RETRIES; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_IO_RETRIES; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MIN_BACKOFF_INTERVAL; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_AUTOTHROTTLING; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT1_NAME; import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ACCOUNT_KEY; @@ -54,6 +56,8 @@ import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; +import org.apache.hadoop.fs.azurebfs.constants.ReadType; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; /** * Unit test ITestExponentialRetryPolicy. @@ -340,4 +344,86 @@ private void testMaxIOConfig(AbfsConfiguration abfsConfig) { .describedAs("When all retries are exhausted, the retryCount will be same as max configured.") .isEqualTo(abfsConfig.getMaxIoRetries()); } + + /** + * Test prefetch exponential retry policy max retries configuration. + * Validates that prefetch retry policy correctly uses the configured + * max prefetch IO retries value and respects retry limits. + * + * @throws Exception if test fails + */ + @Test + public void testPrefetchExponentialRetryPolicyMaxRetries() throws Exception { + AbfsConfiguration abfsConfig = getAbfsConfig(); + int prefetchRetries = 5; + abfsConfig.setMaxIoPrefetchRetries(prefetchRetries); + ExponentialRetryPolicy prefetchPolicy = ExponentialRetryPolicy.forPrefetch(abfsConfig); + + Assertions.assertThat(prefetchPolicy.getMaxRetryCount()) + .describedAs("Prefetch policy should use getPrefetchMaxIoRetries value") + .isEqualTo(prefetchRetries); + + // Validate retry logic + for (int i = 0; i < prefetchRetries; i++) { + Assertions.assertThat(prefetchPolicy.shouldRetry(i, -1)) + .describedAs("Should retry for attempts less than maxPrefetchIoRetries") + .isTrue(); + } + Assertions.assertThat(prefetchPolicy.shouldRetry(prefetchRetries, -1)) + .describedAs("Should not retry when attempts equal to maxPrefetchIoRetries") + .isFalse(); + } + + /** + * Test client getRetryPolicy returns correct policy based on configuration. + * Validates that: + * - When prefetch priority is enabled, prefetch retry policy is returned for PREFETCH_READ + * - When prefetch priority is disabled, default retry policy is returned + * - Retry counts are correctly configured for each policy type + * + * @throws Exception if test fails + */ + @Test + public void testGetRetryPolicyBasedOnConfiguration() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + TracingContext tc = mock(TracingContext.class); + when(tc.getReadType()).thenReturn(ReadType.PREFETCH_READ); + + // Test with prefetch priority enabled + Configuration configWithPrefetch = new Configuration(); + configWithPrefetch.set(FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY, "true"); + configWithPrefetch.set(AZURE_MAX_IO_PREFETCH_RETRIES, "4"); + AbfsConfiguration abfsConfigWithPrefetch = new AbfsConfiguration(configWithPrefetch, + DUMMY_ACCOUNT_NAME); + AbfsClient clientWithPrefetch = ITestAbfsClient.createTestClientFromCurrentContext( + fs.getAbfsStore().getClient(), abfsConfigWithPrefetch); + + AbfsRetryPolicy prefetchPolicy = clientWithPrefetch.getRetryPolicy(tc, ""); + Assertions.assertThat(prefetchPolicy) + .describedAs("Should return prefetchExponentialRetryPolicy for PREFETCH_READ when enabled") + .isEqualTo(clientWithPrefetch.getPrefetchRetryPolicy()); + Assertions.assertThat(prefetchPolicy.getMaxRetryCount()) + .describedAs("Prefetch policy should have correct max retry count") + .isEqualTo(4); + Assertions.assertThat(prefetchPolicy.shouldRetry(0, -1)).isTrue(); + Assertions.assertThat(prefetchPolicy.shouldRetry(3, -1)).isTrue(); + Assertions.assertThat(prefetchPolicy.shouldRetry(4, -1)).isFalse(); + + // Test with prefetch priority disabled + Configuration configWithoutPrefetch = new Configuration(); + configWithoutPrefetch.set(FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY, "false"); + configWithoutPrefetch.set(AZURE_MAX_IO_RETRIES, "7"); + AbfsConfiguration abfsConfigWithoutPrefetch = new AbfsConfiguration(configWithoutPrefetch, + DUMMY_ACCOUNT_NAME); + AbfsClient clientWithoutPrefetch = ITestAbfsClient.createTestClientFromCurrentContext( + fs.getAbfsStore().getClient(), abfsConfigWithoutPrefetch); + + AbfsRetryPolicy defaultPolicy = clientWithoutPrefetch.getRetryPolicy(tc, "anyFailureReason"); + Assertions.assertThat(defaultPolicy) + .describedAs("Should return default exponentialRetryPolicy when prefetch priority is disabled") + .isEqualTo(clientWithoutPrefetch.getExponentialRetryPolicy()); + Assertions.assertThat(defaultPolicy.getMaxRetryCount()) + .describedAs("Default policy should have correct max retry count") + .isEqualTo(7); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index 40c3440ef25c59..ccf1a39187828f 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -25,6 +25,8 @@ import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.net.URL; +import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; import java.util.List; @@ -74,6 +76,8 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SPLIT_NO_LIMIT; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_IO_PREFETCH_RETRIES; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_IO_RETRIES; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_REQUEST_PRIORITY; import static org.apache.hadoop.fs.azurebfs.constants.ReadType.DIRECT_READ; @@ -87,10 +91,13 @@ import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; @@ -170,6 +177,7 @@ AbfsRestOperation getMockRestOpWithMetadata() { AbfsClient getMockAbfsClient() throws URISyntaxException { // Mock failure for client.read() AbfsClient client = mock(AbfsClient.class); + AbfsConfiguration abfsConfig = mock(AbfsConfiguration.class); AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd"))); Mockito.doReturn(abfsCounters).when(client).getAbfsCounters(); AbfsPerfTracker tracker = new AbfsPerfTracker( @@ -177,7 +185,10 @@ AbfsClient getMockAbfsClient() throws URISyntaxException { this.getAccountName(), this.getConfiguration()); when(client.getAbfsPerfTracker()).thenReturn(tracker); + when(client.getAbfsConfiguration()).thenReturn(abfsConfig); + when(abfsConfig.isEnablePrefetchRequestPriority()) + .thenReturn(getConfiguration().isEnablePrefetchRequestPriority()); return client; } @@ -329,6 +340,73 @@ private void checkGetPathStatusCalls(Path testFile, FileStatus fileStatus, Mockito.reset(mockClient); //clears invocation count for next test case } + /** + * Verifies that when prefetch read fails and + * FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY is true (default), the + * exception is NOT propagated to the caller. The stream should silently + * fall back or exhaust its (lower) retry budget without surfacing an error. + * The failed reads should be of readtype PR and the successful one of MR. + */ + @Test + public void testPrefetchFailureNotPropagatedToUserWhenPriorityEnabled() throws Exception { + AbfsClient client = getMockAbfsClient(); + + AbfsRestOperation successOp = getMockRestOp(); + when(client.getAbfsConfiguration().isEnablePrefetchRequestPriority()) + .thenReturn(true); + + doAnswer(invocation -> { + TracingContext tc = invocation.getArgument(8); + if (ReadType.PREFETCH_READ.equals(tc.getReadType())) { + throw new TimeoutException("Internal Server error for prefetch"); + } + return successOp; + }).when(client) + .read(any(String.class), any(Long.class), any(byte[].class), + any(Integer.class), any(Integer.class), any(String.class), + any(String.class), any(), any(TracingContext.class)); + + AbfsInputStream inputStream = getAbfsInputStream(client, "testFailedReadAheadReadTypes.txt"); + + inputStream.read(new byte[1 * ONE_KB]); // triggers all 4 read calls + + // Verify count remains same as earlier test + verifyReadCallCount(client, 4); + + // Capture TracingContext arguments (9th parameter) + ArgumentCaptor traceCaptor = ArgumentCaptor.forClass(TracingContext.class); + verify(client, times(4)) + .read(any(String.class), any(Long.class), any(byte[].class), + any(Integer.class), any(Integer.class), any(String.class), + any(String.class), any(), traceCaptor.capture()); + + List contextList = traceCaptor.getAllValues(); + + // Exactly 1 missed-cache read and 3 prefetch reads. + List prefetchContexts = new ArrayList<>(); + TracingContext missedCacheContext = null; + for (TracingContext ctx : contextList) { + if (MISSEDCACHE_READ.equals(ctx.getReadType())) { + missedCacheContext = ctx; + } else { + prefetchContexts.add(ctx); + } + } + + assertThat(missedCacheContext) + .describedAs("Exactly one missed-cache read should have occurred") + .isNotNull(); + assertThat(prefetchContexts.size()) + .describedAs("Exactly 3 prefetch reads should have occurred") + .isEqualTo(3); + + for (TracingContext ctx : prefetchContexts) { + verifyHeaderForReadTypeInTracingContextHeader(ctx, ReadType.PREFETCH_READ, -1); + } + verifyHeaderForReadTypeInTracingContextHeader(missedCacheContext, MISSEDCACHE_READ, 0); + } + + @Test public void testOpenFileWithOptions() throws Exception { AzureBlobFileSystem fs = getFileSystem(); @@ -1052,6 +1130,9 @@ public void testFailedReadAhead() throws Exception { AbfsClient client = getMockAbfsClient(); AbfsRestOperation successOp = getMockRestOp(); + when(client.getAbfsConfiguration().isEnablePrefetchRequestPriority()) + .thenReturn(false); + // Stub : // Read request leads to 3 readahead calls: Fail all 3 readahead-client.read() // Actual read request fails with the failure in readahead thread @@ -1129,6 +1210,9 @@ public void testOlderReadAheadFailure() throws Exception { AbfsClient client = getMockAbfsClient(); AbfsRestOperation successOp = getMockRestOp(); + when(client.getAbfsConfiguration().isEnablePrefetchRequestPriority()) + .thenReturn(false); + // Stub : // First Read request leads to 3 readahead calls: Fail all 3 readahead-client.read() // A second read request will see that readahead had failed for data in @@ -1737,63 +1821,63 @@ public void testAdaptiveInputStream() throws Exception { } } - /* - * Helper method to execute read and verify if priority header is added or not as expected - */ - private void executePrefetchReadTest(TracingContext tracingContext, - Configuration rawConfig, - boolean shouldHaveHeader) throws Exception { - try (AzureBlobFileSystem azureFs = (AzureBlobFileSystem) FileSystem.newInstance( - rawConfig)) { - AzureBlobFileSystemStore store = Mockito.spy(azureFs.getAbfsStore()); - - AbfsClient abfsClient = Mockito.spy(store.getClient()); - Mockito.doReturn(abfsClient).when(store).getClient(); - - List headersList = new ArrayList<>(); - - doAnswer(invocation -> { - AbfsRestOperation realOp - = (AbfsRestOperation) invocation.callRealMethod(); - AbfsRestOperation spiedOp = spy(realOp); - - headersList.addAll(spiedOp.getRequestHeaders()); - - doNothing().when(spiedOp).execute(any(TracingContext.class)); - return spiedOp; - }) - .when(abfsClient) - .getAbfsRestOperation( - any(AbfsRestOperationType.class), - anyString(), - any(URL.class), - anyList(), - any(byte[].class), - anyInt(), - anyInt(), - nullable(String.class) - ); - - abfsClient.read( - "dummy-path", 0L, new byte[1], 0, 1, - "etag", "leaseId", null, tracingContext); - - AbfsConfiguration abfsConfig = store.getAbfsConfiguration(); - if (shouldHaveHeader) { - assertThat(headersList) - .anySatisfy(header -> { - assertThat(header.getName()).isEqualTo( - X_MS_REQUEST_PRIORITY); - assertThat(header.getValue()).isEqualTo( - abfsConfig.getPrefetchRequestPriorityValue()); - }); - } else { - assertThat(headersList) - .noneSatisfy(header -> assertThat(header.getName()).isEqualTo( - X_MS_REQUEST_PRIORITY)); - } + /* + * Helper method to execute read and verify if priority header is added or not as expected + */ + private void executePrefetchReadTest(TracingContext tracingContext, + Configuration rawConfig, + boolean shouldHaveHeader) throws Exception { + try (AzureBlobFileSystem azureFs = (AzureBlobFileSystem) FileSystem.newInstance( + rawConfig)) { + AzureBlobFileSystemStore store = Mockito.spy(azureFs.getAbfsStore()); + + AbfsClient abfsClient = Mockito.spy(store.getClient()); + Mockito.doReturn(abfsClient).when(store).getClient(); + + List headersList = new ArrayList<>(); + + doAnswer(invocation -> { + AbfsRestOperation realOp + = (AbfsRestOperation) invocation.callRealMethod(); + AbfsRestOperation spiedOp = spy(realOp); + + headersList.addAll(spiedOp.getRequestHeaders()); + + doNothing().when(spiedOp).execute(any(TracingContext.class)); + return spiedOp; + }) + .when(abfsClient) + .getAbfsRestOperation( + any(AbfsRestOperationType.class), + anyString(), + any(URL.class), + anyList(), + any(byte[].class), + anyInt(), + anyInt(), + nullable(String.class) + ); + + abfsClient.read( + "dummy-path", 0L, new byte[1], 0, 1, + "etag", "leaseId", null, tracingContext); + + AbfsConfiguration abfsConfig = store.getAbfsConfiguration(); + if (shouldHaveHeader) { + assertThat(headersList) + .anySatisfy(header -> { + assertThat(header.getName()).isEqualTo( + X_MS_REQUEST_PRIORITY); + assertThat(header.getValue()).isEqualTo( + abfsConfig.getPrefetchRequestPriorityValue()); + }); + } else { + assertThat(headersList) + .noneSatisfy(header -> assertThat(header.getName()).isEqualTo( + X_MS_REQUEST_PRIORITY)); + } + } } - } private Path createTestFile(AzureBlobFileSystem fs, int fileSize) throws Exception { Path testPath = new Path("testFile"); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java index 55d3b5f065716d..75f889b746956f 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java @@ -60,6 +60,7 @@ import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.UNKNOWN_HOST_EXCEPTION_ABBREVIATION; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; @@ -280,7 +281,7 @@ public void testRetryPolicyWithDifferentFailureReasons() throws Exception { // Before iteration 3 sleep will be computed using exponential retry policy and retry count 2 // Should retry with retry count 2 will return false and no further requests will be made. verify(abfsClient, times(2)) - .getRetryPolicy(EGRESS_LIMIT_BREACH_ABBREVIATION); + .getRetryPolicy(any(TracingContext.class), eq(EGRESS_LIMIT_BREACH_ABBREVIATION)); verify(exponentialRetryPolicy, times(1)) .shouldRetry(1, HTTP_UNAVAILABLE); verify(exponentialRetryPolicy, times(1))