From 23357890428f977d171455386d0be0d87ada7f01 Mon Sep 17 00:00:00 2001 From: Manika Joshi Date: Thu, 23 Oct 2025 21:54:40 -0700 Subject: [PATCH 1/9] prefetch request priority --- .../org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java | 8 ++++++++ .../hadoop/fs/azurebfs/constants/ConfigurationKeys.java | 6 ++++++ .../fs/azurebfs/constants/FileSystemConfigurations.java | 2 ++ .../hadoop/fs/azurebfs/services/AbfsBlobClient.java | 6 ++++++ .../apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java | 7 +++++++ .../hadoop/fs/azurebfs/services/AbfsInputStream.java | 2 +- .../hadoop/fs/azurebfs/services/ReadBufferManagerV1.java | 2 +- .../apache/hadoop/fs/azurebfs/utils/TracingContext.java | 7 +++++++ 8 files changed, 38 insertions(+), 2 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 7c355671cf8b26..360c7533791b15 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -519,6 +519,10 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY) private boolean enableCreateIdempotency; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_REQUEST_PRIORITY_FOR_PREFETCH, + DefaultValue = DEFAULT_FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY) + private boolean enablePrefetchRequestPriority; + private String clientProvidedEncryptionKey; private String clientProvidedEncryptionKeySHA; @@ -1236,6 +1240,10 @@ public boolean getIsCreateIdempotencyEnabled() { return enableCreateIdempotency; } + public boolean isEnablePrefetchRequestPriority() { + return enablePrefetchRequestPriority; + } + /** * Enum config to allow user to pick format of x-ms-client-request-id header * @return tracingContextFormat config if valid, else default ALL_ID_FORMAT diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 9afb37e35c7703..28c9010c2bc506 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -276,6 +276,12 @@ public final class ConfigurationKeys { */ public static final String FS_AZURE_ENABLE_READAHEAD_V2 = "fs.azure.enable.readahead.v2"; + /** + * Enable or disable readahead V2 in AbfsInputStream. This will work independent of V1. + * Value: {@value}. + */ + public static final String FS_AZURE_ENABLE_REQUEST_PRIORITY_FOR_PREFETCH = "fs.azure.enable.prefetch.request.priority"; + /** * Minimum number of prefetch threads in the thread pool for readahead V2. * {@value } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 83f636bf1d131b..ce6526634d3393 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -266,5 +266,7 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY = true; + public static final boolean DEFAULT_FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY = false; + private FileSystemConfigurations() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java index d6ae0427b23b98..0388b0213928a3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java @@ -63,6 +63,7 @@ import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams; +import org.apache.hadoop.fs.azurebfs.constants.ReadType; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; @@ -1335,6 +1336,11 @@ public AbfsRestOperation read(final String path, requestHeaders.add(rangeHeader); requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag)); + if (getAbfsConfiguration().isEnablePrefetchRequestPriority() + && ReadType.PREFETCH_READ.equals(tracingContext.getReadType())) { + requestHeaders.add(new AbfsHttpHeader("x-ms-request-priority", "7")); + } + // Add request header to fetch MD5 Hash of data returned by server. if (isChecksumValidationEnabled(requestHeaders, rangeHeader, bufferLength)) { requestHeaders.add(new AbfsHttpHeader(X_MS_RANGE_GET_CONTENT_MD5, TRUE)); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java index f574f4704ab5c8..3f5d69ba0dca0d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java @@ -56,6 +56,7 @@ import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams; +import org.apache.hadoop.fs.azurebfs.constants.ReadType; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; @@ -1050,6 +1051,12 @@ public AbfsRestOperation read(final String path, } final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + + if (getAbfsConfiguration().isEnablePrefetchRequestPriority() + && ReadType.PREFETCH_READ.equals(tracingContext.getReadType())) { + requestHeaders.add(new AbfsHttpHeader("x-ms-request-priority", "7")); + } + // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.READ_OPERATION, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index ba2bb61802a086..a3face76c3fbca 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -590,7 +590,7 @@ int readRemote(long position, byte[] b, int offset, int length, TracingContext t tracingContext.setPosition(String.valueOf(position)); op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get(), - contextEncryptionAdapter, tracingContext); + contextEncryptionAdapter, new TracingContext(tracingContext)); cachedSasToken.update(op.getSasToken()); LOG.debug("issuing HTTP GET request params position = {} b.length = {} " + "offset = {} length = {}", position, b.length, offset, length); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java index fe1ac3fa1f2353..8507ed253c68b9 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java @@ -127,7 +127,7 @@ public void queueReadAhead(final AbfsInputStream stream, final long requestedOff buffer.setRequestedLength(requestedLength); buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE); buffer.setLatch(new CountDownLatch(1)); - buffer.setTracingContext(tracingContext); + buffer.setTracingContext(new TracingContext(tracingContext)); Integer bufferIndex = getFreeList().pop(); // will return a value, since we have checked size > 0 already diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java index 0179718a06e8cb..f4a2ba9d82c2d4 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java @@ -389,4 +389,11 @@ public void setReadType(ReadType readType) { listener.updateReadType(readType); } } + + /** + * Returns the read type for the current operation. + */ + public ReadType getReadType() { + return readType; + } } From 75b91b1f9f240dbfd275fe76671fba7baba3b3f3 Mon Sep 17 00:00:00 2001 From: Manika Joshi Date: Thu, 23 Oct 2025 21:58:51 -0700 Subject: [PATCH 2/9] comment correction --- .../apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 28c9010c2bc506..07cc7eb34b705d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -277,7 +277,7 @@ public final class ConfigurationKeys { public static final String FS_AZURE_ENABLE_READAHEAD_V2 = "fs.azure.enable.readahead.v2"; /** - * Enable or disable readahead V2 in AbfsInputStream. This will work independent of V1. + * Enable or disable request priority for prefetch requests * Value: {@value}. */ public static final String FS_AZURE_ENABLE_REQUEST_PRIORITY_FOR_PREFETCH = "fs.azure.enable.prefetch.request.priority"; From ebab278b5eb297775e3e821dfcbf3c7c1b227f7d Mon Sep 17 00:00:00 2001 From: Manika Joshi Date: Wed, 29 Oct 2025 01:11:15 -0700 Subject: [PATCH 3/9] test addition --- .../services/TestAbfsInputStream.java | 95 +++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index 9b388b57c3e559..ea271ee8bc288e 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -19,8 +19,11 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; +import java.lang.reflect.Field; import java.net.URI; import java.net.URISyntaxException; +import java.net.URL; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Optional; @@ -58,6 +61,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SPLIT_NO_LIMIT; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_REQUEST_PRIORITY_FOR_PREFETCH; import static org.apache.hadoop.fs.azurebfs.constants.ReadType.DIRECT_READ; import static org.apache.hadoop.fs.azurebfs.constants.ReadType.FOOTER_READ; import static org.apache.hadoop.fs.azurebfs.constants.ReadType.MISSEDCACHE_READ; @@ -66,8 +70,12 @@ import static org.apache.hadoop.fs.azurebfs.constants.ReadType.SMALLFILE_READ; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -104,6 +112,9 @@ public class TestAbfsInputStream extends private static final int POSITION_INDEX = 9; private static final int OPERATION_INDEX = 6; private static final int READTYPE_INDEX = 11; + private static final String PREFETCH_TRAFFIC_PRIORITY_HEADER = "x-ms-request-priority"; + private static final String PREFETCH_TRAFFIC_PRIORITY_HEADER_VALUE = "7"; + @AfterEach @Override @@ -896,6 +907,90 @@ private void testReadTypeInTracingContextHeaderInternal(AzureBlobFileSystem fs, assertReadTypeInClientRequestId(fs, numOfReadCalls, totalReadCalls, readType); } + /* + * Test to verify that both conditions of prefetch read and respective config + * enabled needs to be true for the priority header to be added + */ + @Test + public void testPrefetchReadAddsPriorityHeaderWithDifferentConfigs() + throws Exception { + Configuration configuration1 = new Configuration(getRawConfiguration()); + configuration1.set(FS_AZURE_ENABLE_REQUEST_PRIORITY_FOR_PREFETCH, "true"); + + Configuration configuration2 = new Configuration(getRawConfiguration()); + //use the default value for the config: false + configuration2.unset(FS_AZURE_ENABLE_REQUEST_PRIORITY_FOR_PREFETCH); + + TracingContext tracingContext1 = mock(TracingContext.class); + when(tracingContext1.getReadType()).thenReturn(PREFETCH_READ); + + //Prefetch Read with config enabled + executePrefetchReadTest(tracingContext1, configuration1, true); + //Prefetch Read with config disabled + executePrefetchReadTest(tracingContext1, configuration2, false); + + when(tracingContext1.getReadType()).thenReturn(DIRECT_READ); + + //Non-prefetch read with config disabled + executePrefetchReadTest(tracingContext1, configuration2, false); + //Non-prefetch read with config enabled + executePrefetchReadTest(tracingContext1, configuration1, false); + } + + private void executePrefetchReadTest(TracingContext tracingContext, + Configuration rawConfig, + boolean shouldHaveHeader) throws Exception { + AzureBlobFileSystem azureFs = (AzureBlobFileSystem) FileSystem.newInstance( + rawConfig); + AzureBlobFileSystemStore store = Mockito.spy(azureFs.getAbfsStore()); + + AbfsClient abfsClient = Mockito.spy(store.getClient()); + Mockito.doReturn(abfsClient).when(store).getClient(); + + List headersList = new ArrayList<>(); + + doAnswer(invocation -> { + AbfsRestOperation realOp + = (AbfsRestOperation) invocation.callRealMethod(); + AbfsRestOperation spiedOp = spy(realOp); + + headersList.addAll(spiedOp.getRequestHeaders()); + + doNothing().when(spiedOp).execute(any(TracingContext.class)); + return spiedOp; + }) + .when(abfsClient) + .getAbfsRestOperation( + any(AbfsRestOperationType.class), + anyString(), + any(URL.class), + anyList(), + any(byte[].class), + anyInt(), + anyInt(), + nullable(String.class) + ); + + abfsClient.read( + "dummy-path", 0L, new byte[1], 0, 1, + "etag", "leaseId", null, tracingContext); + + + if (shouldHaveHeader) { + assertThat(headersList) + .anySatisfy(header -> { + assertThat(header.getName()).isEqualTo( + PREFETCH_TRAFFIC_PRIORITY_HEADER); + assertThat(header.getValue()).isEqualTo( + PREFETCH_TRAFFIC_PRIORITY_HEADER_VALUE); + }); + } else { + assertThat(headersList) + .noneSatisfy(header -> assertThat(header.getName()).isEqualTo( + PREFETCH_TRAFFIC_PRIORITY_HEADER)); + } + } + private Path createTestFile(AzureBlobFileSystem fs, int fileSize) throws Exception { Path testPath = new Path("testFile"); byte[] fileContent = getRandomBytesArray(fileSize); From b02b911762415843278785c152c22c0e39f9fb50 Mon Sep 17 00:00:00 2001 From: Manika Joshi Date: Sun, 2 Nov 2025 02:11:18 -0800 Subject: [PATCH 4/9] comments changes --- .../hadoop/fs/azurebfs/AbfsConfiguration.java | 11 ++++++++++- .../fs/azurebfs/constants/ConfigurationKeys.java | 8 +++++++- .../constants/FileSystemConfigurations.java | 6 ++++++ .../constants/HttpHeaderConfigurations.java | 1 + .../fs/azurebfs/services/AbfsBlobClient.java | 5 +---- .../hadoop/fs/azurebfs/services/AbfsClient.java | 12 ++++++++++++ .../fs/azurebfs/services/AbfsDfsClient.java | 5 +---- .../azurebfs/services/TestAbfsInputStream.java | 16 +++++++--------- 8 files changed, 45 insertions(+), 19 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 360c7533791b15..2a6b58994c06a2 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -519,10 +519,15 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY) private boolean enableCreateIdempotency; - @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_REQUEST_PRIORITY_FOR_PREFETCH, + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY, DefaultValue = DEFAULT_FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY) private boolean enablePrefetchRequestPriority; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_PREFETCH_REQUEST_PRIORITY_VALUE, + MinValue = DEFAULT_FS_AZURE_MIN_PREFETCH_REQUEST_PRIORITY_VALUE, + DefaultValue = DEFAULT_FS_AZURE_PREFETCH_REQUEST_PRIORITY_VALUE) + private int prefetchRequestPriorityValue; + private String clientProvidedEncryptionKey; private String clientProvidedEncryptionKeySHA; @@ -1244,6 +1249,10 @@ public boolean isEnablePrefetchRequestPriority() { return enablePrefetchRequestPriority; } + public String getPrefetchRequestPriorityValue() { + return Integer.toString(prefetchRequestPriorityValue); + } + /** * Enum config to allow user to pick format of x-ms-client-request-id header * @return tracingContextFormat config if valid, else default ALL_ID_FORMAT diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 07cc7eb34b705d..01282581aa4416 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -280,7 +280,13 @@ public final class ConfigurationKeys { * Enable or disable request priority for prefetch requests * Value: {@value}. */ - public static final String FS_AZURE_ENABLE_REQUEST_PRIORITY_FOR_PREFETCH = "fs.azure.enable.prefetch.request.priority"; + public static final String FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY = "fs.azure.enable.prefetch.request.priority"; + + /** + * Enable or disable request priority for prefetch requests + * Value: {@value}. + */ + public static final String FS_AZURE_PREFETCH_REQUEST_PRIORITY_VALUE = "fs.azure.prefetch.request.priority.value"; /** * Minimum number of prefetch threads in the thread pool for readahead V2. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index ce6526634d3393..5b28385742a5f8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -268,5 +268,11 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY = false; + // The default traffic request priority is 3 (from service) + // The lowest priority a request can get is 7 + public static final int DEFAULT_FS_AZURE_PREFETCH_REQUEST_PRIORITY_VALUE = 7; + + public static final int DEFAULT_FS_AZURE_MIN_PREFETCH_REQUEST_PRIORITY_VALUE = 3; + private FileSystemConfigurations() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java index b442b1f8533471..a51161263d882c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java @@ -69,6 +69,7 @@ public final class HttpHeaderConfigurations { public static final String X_MS_LEASE_ACTION = "x-ms-lease-action"; public static final String X_MS_LEASE_DURATION = "x-ms-lease-duration"; public static final String X_MS_LEASE_ID = "x-ms-lease-id"; + public static final String PREFETCH_TRAFFIC_PRIORITY_HEADER = "x-ms-request-priority"; /** * Http Request Header for denoting the lease id of source in copy operation. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java index 0388b0213928a3..856aebdd36bfe3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java @@ -1336,10 +1336,7 @@ public AbfsRestOperation read(final String path, requestHeaders.add(rangeHeader); requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag)); - if (getAbfsConfiguration().isEnablePrefetchRequestPriority() - && ReadType.PREFETCH_READ.equals(tracingContext.getReadType())) { - requestHeaders.add(new AbfsHttpHeader("x-ms-request-priority", "7")); - } + addRequestPriorityForPrefetch(requestHeaders, tracingContext); // Add request header to fetch MD5 Hash of data returned by server. if (isChecksumValidationEnabled(requestHeaders, rangeHeader, bufferLength)) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 71da8f9bda96e0..d94103835ec26e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -62,6 +62,7 @@ import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.constants.HttpOperationType; +import org.apache.hadoop.fs.azurebfs.constants.ReadType; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; @@ -139,6 +140,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.ACCEPT_CHARSET; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_MD5; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_TYPE; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.PREFETCH_TRAFFIC_PRIORITY_HEADER; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.USER_AGENT; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_ALGORITHM; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT; @@ -157,6 +159,7 @@ public abstract class AbfsClient implements Closeable { public static final String HUNDRED_CONTINUE_USER_AGENT = SINGLE_WHITE_SPACE + HUNDRED_CONTINUE + SEMICOLON; public static final String ABFS_CLIENT_TIMER_THREAD_NAME = "abfs-timer-client"; public static final String FNS_BLOB_USER_AGENT_IDENTIFIER = "FNS"; + private static final String PREFETCH_TRAFFIC_PRIORITY_HEADER_VALUE = "7"; private final URL baseUrl; private final SharedKeyCredentials sharedKeyCredentials; @@ -1379,6 +1382,15 @@ protected void addCheckSumHeaderForWrite(List requestHeaders, } } + protected void addRequestPriorityForPrefetch(List requestHeaders, + TracingContext tracingContext) { + if (getAbfsConfiguration().isEnablePrefetchRequestPriority() + && ReadType.PREFETCH_READ.equals(tracingContext.getReadType())) { + requestHeaders.add(new AbfsHttpHeader(PREFETCH_TRAFFIC_PRIORITY_HEADER, + getAbfsConfiguration().getPrefetchRequestPriorityValue())); + } + } + /** * To verify the checksum information received from server for the data read. * @param buffer stores the data received from server. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java index 3f5d69ba0dca0d..6ea6563b85fcaf 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java @@ -1052,10 +1052,7 @@ public AbfsRestOperation read(final String path, final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); - if (getAbfsConfiguration().isEnablePrefetchRequestPriority() - && ReadType.PREFETCH_READ.equals(tracingContext.getReadType())) { - requestHeaders.add(new AbfsHttpHeader("x-ms-request-priority", "7")); - } + addRequestPriorityForPrefetch(requestHeaders, tracingContext); // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance String sasTokenForReuse = appendSASTokenToQuery(path, diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index ea271ee8bc288e..ac7867c8240d42 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -19,7 +19,6 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; -import java.lang.reflect.Field; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; @@ -30,6 +29,7 @@ import java.util.Random; import java.util.concurrent.ExecutionException; +import org.apache.hadoop.fs.azurebfs.Abfs; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.AbfsCountersImpl; import org.junit.jupiter.api.AfterEach; @@ -61,7 +61,8 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SPLIT_NO_LIMIT; -import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_REQUEST_PRIORITY_FOR_PREFETCH; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.PREFETCH_TRAFFIC_PRIORITY_HEADER; import static org.apache.hadoop.fs.azurebfs.constants.ReadType.DIRECT_READ; import static org.apache.hadoop.fs.azurebfs.constants.ReadType.FOOTER_READ; import static org.apache.hadoop.fs.azurebfs.constants.ReadType.MISSEDCACHE_READ; @@ -112,8 +113,6 @@ public class TestAbfsInputStream extends private static final int POSITION_INDEX = 9; private static final int OPERATION_INDEX = 6; private static final int READTYPE_INDEX = 11; - private static final String PREFETCH_TRAFFIC_PRIORITY_HEADER = "x-ms-request-priority"; - private static final String PREFETCH_TRAFFIC_PRIORITY_HEADER_VALUE = "7"; @AfterEach @@ -915,11 +914,11 @@ private void testReadTypeInTracingContextHeaderInternal(AzureBlobFileSystem fs, public void testPrefetchReadAddsPriorityHeaderWithDifferentConfigs() throws Exception { Configuration configuration1 = new Configuration(getRawConfiguration()); - configuration1.set(FS_AZURE_ENABLE_REQUEST_PRIORITY_FOR_PREFETCH, "true"); + configuration1.set(FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY, "true"); Configuration configuration2 = new Configuration(getRawConfiguration()); //use the default value for the config: false - configuration2.unset(FS_AZURE_ENABLE_REQUEST_PRIORITY_FOR_PREFETCH); + configuration2.unset(FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY); TracingContext tracingContext1 = mock(TracingContext.class); when(tracingContext1.getReadType()).thenReturn(PREFETCH_READ); @@ -975,14 +974,13 @@ private void executePrefetchReadTest(TracingContext tracingContext, "dummy-path", 0L, new byte[1], 0, 1, "etag", "leaseId", null, tracingContext); - + AbfsConfiguration abfsConfig = store.getAbfsConfiguration(); if (shouldHaveHeader) { assertThat(headersList) .anySatisfy(header -> { assertThat(header.getName()).isEqualTo( PREFETCH_TRAFFIC_PRIORITY_HEADER); - assertThat(header.getValue()).isEqualTo( - PREFETCH_TRAFFIC_PRIORITY_HEADER_VALUE); + assertThat(header.getValue()).isEqualTo(abfsConfig.getPrefetchRequestPriorityValue()); }); } else { assertThat(headersList) From a96f941b86c5e50403c98c4452b88fbba763be33 Mon Sep 17 00:00:00 2001 From: Manika Joshi Date: Sun, 2 Nov 2025 19:53:22 -0800 Subject: [PATCH 5/9] comments --- .../hadoop/fs/azurebfs/AbfsConfiguration.java | 4 +- .../constants/FileSystemConfigurations.java | 6 +- .../constants/HttpHeaderConfigurations.java | 2 +- .../fs/azurebfs/services/AbfsClient.java | 4 +- .../services/TestAbfsInputStream.java | 99 ++++++++++--------- 5 files changed, 58 insertions(+), 57 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 2a6b58994c06a2..4f1fa5bfe9c074 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -524,8 +524,8 @@ public class AbfsConfiguration{ private boolean enablePrefetchRequestPriority; @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_PREFETCH_REQUEST_PRIORITY_VALUE, - MinValue = DEFAULT_FS_AZURE_MIN_PREFETCH_REQUEST_PRIORITY_VALUE, - DefaultValue = DEFAULT_FS_AZURE_PREFETCH_REQUEST_PRIORITY_VALUE) + MinValue = DEFAULT_FS_AZURE_STANDARD_REQUEST_PRIORITY_VALUE, + DefaultValue = DEFAULT_FS_AZURE_REQUEST_PRIORITY_VALUE_7) private int prefetchRequestPriorityValue; private String clientProvidedEncryptionKey; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 5b28385742a5f8..b5115aeeb1c84d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -266,13 +266,13 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY = true; - public static final boolean DEFAULT_FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY = false; + public static final boolean DEFAULT_FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY = true; // The default traffic request priority is 3 (from service) // The lowest priority a request can get is 7 - public static final int DEFAULT_FS_AZURE_PREFETCH_REQUEST_PRIORITY_VALUE = 7; + public static final int DEFAULT_FS_AZURE_REQUEST_PRIORITY_VALUE_7 = 7; - public static final int DEFAULT_FS_AZURE_MIN_PREFETCH_REQUEST_PRIORITY_VALUE = 3; + public static final int DEFAULT_FS_AZURE_STANDARD_REQUEST_PRIORITY_VALUE = 3; private FileSystemConfigurations() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java index a51161263d882c..9521518fa1f17d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java @@ -69,7 +69,7 @@ public final class HttpHeaderConfigurations { public static final String X_MS_LEASE_ACTION = "x-ms-lease-action"; public static final String X_MS_LEASE_DURATION = "x-ms-lease-duration"; public static final String X_MS_LEASE_ID = "x-ms-lease-id"; - public static final String PREFETCH_TRAFFIC_PRIORITY_HEADER = "x-ms-request-priority"; + public static final String X_MS_REQUEST_PRIORITY = "x-ms-request-priority"; /** * Http Request Header for denoting the lease id of source in copy operation. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index d94103835ec26e..b00536a2bb3bd2 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -140,7 +140,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.ACCEPT_CHARSET; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_MD5; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.CONTENT_TYPE; -import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.PREFETCH_TRAFFIC_PRIORITY_HEADER; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_REQUEST_PRIORITY; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.USER_AGENT; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_ALGORITHM; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT; @@ -1386,7 +1386,7 @@ protected void addRequestPriorityForPrefetch(List requestHeaders TracingContext tracingContext) { if (getAbfsConfiguration().isEnablePrefetchRequestPriority() && ReadType.PREFETCH_READ.equals(tracingContext.getReadType())) { - requestHeaders.add(new AbfsHttpHeader(PREFETCH_TRAFFIC_PRIORITY_HEADER, + requestHeaders.add(new AbfsHttpHeader(X_MS_REQUEST_PRIORITY, getAbfsConfiguration().getPrefetchRequestPriorityValue())); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index ac7867c8240d42..4ccab588b3b818 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -29,7 +29,6 @@ import java.util.Random; import java.util.concurrent.ExecutionException; -import org.apache.hadoop.fs.azurebfs.Abfs; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.AbfsCountersImpl; import org.junit.jupiter.api.AfterEach; @@ -62,7 +61,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SPLIT_NO_LIMIT; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY; -import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.PREFETCH_TRAFFIC_PRIORITY_HEADER; +import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_REQUEST_PRIORITY; import static org.apache.hadoop.fs.azurebfs.constants.ReadType.DIRECT_READ; import static org.apache.hadoop.fs.azurebfs.constants.ReadType.FOOTER_READ; import static org.apache.hadoop.fs.azurebfs.constants.ReadType.MISSEDCACHE_READ; @@ -939,53 +938,55 @@ public void testPrefetchReadAddsPriorityHeaderWithDifferentConfigs() private void executePrefetchReadTest(TracingContext tracingContext, Configuration rawConfig, boolean shouldHaveHeader) throws Exception { - AzureBlobFileSystem azureFs = (AzureBlobFileSystem) FileSystem.newInstance( - rawConfig); - AzureBlobFileSystemStore store = Mockito.spy(azureFs.getAbfsStore()); - - AbfsClient abfsClient = Mockito.spy(store.getClient()); - Mockito.doReturn(abfsClient).when(store).getClient(); - - List headersList = new ArrayList<>(); - - doAnswer(invocation -> { - AbfsRestOperation realOp - = (AbfsRestOperation) invocation.callRealMethod(); - AbfsRestOperation spiedOp = spy(realOp); - - headersList.addAll(spiedOp.getRequestHeaders()); - - doNothing().when(spiedOp).execute(any(TracingContext.class)); - return spiedOp; - }) - .when(abfsClient) - .getAbfsRestOperation( - any(AbfsRestOperationType.class), - anyString(), - any(URL.class), - anyList(), - any(byte[].class), - anyInt(), - anyInt(), - nullable(String.class) - ); - - abfsClient.read( - "dummy-path", 0L, new byte[1], 0, 1, - "etag", "leaseId", null, tracingContext); - - AbfsConfiguration abfsConfig = store.getAbfsConfiguration(); - if (shouldHaveHeader) { - assertThat(headersList) - .anySatisfy(header -> { - assertThat(header.getName()).isEqualTo( - PREFETCH_TRAFFIC_PRIORITY_HEADER); - assertThat(header.getValue()).isEqualTo(abfsConfig.getPrefetchRequestPriorityValue()); - }); - } else { - assertThat(headersList) - .noneSatisfy(header -> assertThat(header.getName()).isEqualTo( - PREFETCH_TRAFFIC_PRIORITY_HEADER)); + try (AzureBlobFileSystem azureFs = (AzureBlobFileSystem) FileSystem.newInstance( + rawConfig)) { + AzureBlobFileSystemStore store = Mockito.spy(azureFs.getAbfsStore()); + + AbfsClient abfsClient = Mockito.spy(store.getClient()); + Mockito.doReturn(abfsClient).when(store).getClient(); + + List headersList = new ArrayList<>(); + + doAnswer(invocation -> { + AbfsRestOperation realOp + = (AbfsRestOperation) invocation.callRealMethod(); + AbfsRestOperation spiedOp = spy(realOp); + + headersList.addAll(spiedOp.getRequestHeaders()); + + doNothing().when(spiedOp).execute(any(TracingContext.class)); + return spiedOp; + }) + .when(abfsClient) + .getAbfsRestOperation( + any(AbfsRestOperationType.class), + anyString(), + any(URL.class), + anyList(), + any(byte[].class), + anyInt(), + anyInt(), + nullable(String.class) + ); + + abfsClient.read( + "dummy-path", 0L, new byte[1], 0, 1, + "etag", "leaseId", null, tracingContext); + + AbfsConfiguration abfsConfig = store.getAbfsConfiguration(); + if (shouldHaveHeader) { + assertThat(headersList) + .anySatisfy(header -> { + assertThat(header.getName()).isEqualTo( + X_MS_REQUEST_PRIORITY); + assertThat(header.getValue()).isEqualTo( + abfsConfig.getPrefetchRequestPriorityValue()); + }); + } else { + assertThat(headersList) + .noneSatisfy(header -> assertThat(header.getName()).isEqualTo( + X_MS_REQUEST_PRIORITY)); + } } } From 3c27816ef0ebc1c631d6d93189fe1de6ef728dbe Mon Sep 17 00:00:00 2001 From: Manika Joshi Date: Mon, 3 Nov 2025 00:47:44 -0800 Subject: [PATCH 6/9] changes --- .../org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java | 4 ++-- .../hadoop/fs/azurebfs/constants/ConfigurationKeys.java | 2 +- .../fs/azurebfs/constants/FileSystemConfigurations.java | 4 ++-- .../apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java | 2 +- .../org/apache/hadoop/fs/azurebfs/services/AbfsClient.java | 7 ++++++- .../apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java | 2 +- .../hadoop/fs/azurebfs/services/AbfsInputStream.java | 2 +- .../hadoop/fs/azurebfs/services/ReadBufferManagerV1.java | 2 +- .../hadoop/fs/azurebfs/services/ReadBufferManagerV2.java | 2 +- .../apache/hadoop/fs/azurebfs/utils/TracingContext.java | 2 ++ 10 files changed, 18 insertions(+), 11 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 0b75c0ebb7fe4d..a4a9b0717467a0 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -643,8 +643,8 @@ public class AbfsConfiguration{ private boolean enablePrefetchRequestPriority; @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_PREFETCH_REQUEST_PRIORITY_VALUE, - MinValue = DEFAULT_FS_AZURE_STANDARD_REQUEST_PRIORITY_VALUE, - DefaultValue = DEFAULT_FS_AZURE_REQUEST_PRIORITY_VALUE_7) + MinValue = DEFAULT_FS_AZURE_STANDARD_MIN_REQUEST_PRIORITY_VALUE, + DefaultValue = DEFAULT_FS_AZURE_REQUEST_PRIORITY_VALUE) private int prefetchRequestPriorityValue; private String clientProvidedEncryptionKey; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index f12e24cd1079ff..91fc97e1b7cfd6 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -289,7 +289,7 @@ public final class ConfigurationKeys { public static final String FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY = "fs.azure.enable.prefetch.request.priority"; /** - * Enable or disable request priority for prefetch requests + * Request priority value for prefetch requests * Value: {@value}. */ public static final String FS_AZURE_PREFETCH_REQUEST_PRIORITY_VALUE = "fs.azure.prefetch.request.priority.value"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 75ccc38a9fcdfc..2f554160109e41 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -397,9 +397,9 @@ public final class FileSystemConfigurations { // The default traffic request priority is 3 (from service) // The lowest priority a request can get is 7 - public static final int DEFAULT_FS_AZURE_REQUEST_PRIORITY_VALUE_7 = 7; + public static final int DEFAULT_FS_AZURE_REQUEST_PRIORITY_VALUE = 7; - public static final int DEFAULT_FS_AZURE_STANDARD_REQUEST_PRIORITY_VALUE = 3; + public static final int DEFAULT_FS_AZURE_STANDARD_MIN_REQUEST_PRIORITY_VALUE = 3; public static final boolean DEFAULT_FS_AZURE_ENABLE_TAIL_LATENCY_TRACKER = false; public static final boolean DEFAULT_FS_AZURE_ENABLE_TAIL_LATENCY_REQUEST_TIMEOUT = false; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java index 856aebdd36bfe3..52ec53e4f13b55 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java @@ -63,7 +63,6 @@ import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams; -import org.apache.hadoop.fs.azurebfs.constants.ReadType; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; @@ -1336,6 +1335,7 @@ public AbfsRestOperation read(final String path, requestHeaders.add(rangeHeader); requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag)); + // Add request priority header for prefetch reads addRequestPriorityForPrefetch(requestHeaders, tracingContext); // Add request header to fetch MD5 Hash of data returned by server. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 973681c5856e12..4d9ceee9e3f0a7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -160,7 +160,6 @@ public abstract class AbfsClient implements Closeable { public static final String HUNDRED_CONTINUE_USER_AGENT = SINGLE_WHITE_SPACE + HUNDRED_CONTINUE + SEMICOLON; public static final String ABFS_CLIENT_TIMER_THREAD_NAME = "abfs-timer-client"; public static final String FNS_BLOB_USER_AGENT_IDENTIFIER = "FNS"; - private static final String PREFETCH_TRAFFIC_PRIORITY_HEADER_VALUE = "7"; private final URL baseUrl; private final SharedKeyCredentials sharedKeyCredentials; @@ -1402,6 +1401,12 @@ protected void addCheckSumHeaderForWrite(List requestHeaders, } } + /** + * Add request priority header for prefetch read requests if enabled. + * + * @param requestHeaders to be updated with request priority header + * @param tracingContext tracing context to check read type + */ protected void addRequestPriorityForPrefetch(List requestHeaders, TracingContext tracingContext) { if (getAbfsConfiguration().isEnablePrefetchRequestPriority() diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java index 750aa6e4be8315..08ef93d1008401 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java @@ -56,7 +56,6 @@ import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams; -import org.apache.hadoop.fs.azurebfs.constants.ReadType; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; @@ -1074,6 +1073,7 @@ public AbfsRestOperation read(final String path, final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder(); + // Add request priority header for prefetch reads addRequestPriorityForPrefetch(requestHeaders, tracingContext); // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index a758577c780f1d..9d29614a044162 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -593,7 +593,7 @@ int readRemote(long position, byte[] b, int offset, int length, TracingContext t tracingContext.setPosition(String.valueOf(position)); op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get(), - contextEncryptionAdapter, new TracingContext(tracingContext)); + contextEncryptionAdapter, tracingContext); cachedSasToken.update(op.getSasToken()); LOG.debug("issuing HTTP GET request params position = {} b.length = {} " + "offset = {} length = {}", position, b.length, offset, length); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java index 28cc2d5b0180bb..e476f6d744614d 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java @@ -127,7 +127,7 @@ public void queueReadAhead(final AbfsInputStream stream, final long requestedOff buffer.setRequestedLength(requestedLength); buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE); buffer.setLatch(new CountDownLatch(1)); - buffer.setTracingContext(new TracingContext(tracingContext)); + buffer.setTracingContext(tracingContext); Integer bufferIndex = getFreeList().pop(); // will return a value, since we have checked size > 0 already diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java index 5e053a7b165c03..c7e6e4c3d28d42 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java @@ -280,7 +280,7 @@ public void queueReadAhead(final AbfsInputStream stream, buffer.setRequestedLength(requestedLength); buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE); buffer.setLatch(new CountDownLatch(1)); - buffer.setTracingContext(new TracingContext(tracingContext)); + buffer.setTracingContext(tracingContext); if (isFreeListEmpty()) { /* diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java index f4a2ba9d82c2d4..62ac76fee99c2f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/TracingContext.java @@ -392,6 +392,8 @@ public void setReadType(ReadType readType) { /** * Returns the read type for the current operation. + * + * @return the read type for the request. */ public ReadType getReadType() { return readType; From e77c870ec6a02d2ae564e1efa698cb3f59cd754f Mon Sep 17 00:00:00 2001 From: Manika Joshi Date: Tue, 4 Nov 2025 01:08:37 -0800 Subject: [PATCH 7/9] terms changes --- .../org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java | 4 ++-- .../fs/azurebfs/constants/FileSystemConfigurations.java | 7 +++---- .../hadoop/fs/azurebfs/services/TestAbfsInputStream.java | 3 +++ 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index a4a9b0717467a0..a54ee2bd444d40 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -643,8 +643,8 @@ public class AbfsConfiguration{ private boolean enablePrefetchRequestPriority; @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_PREFETCH_REQUEST_PRIORITY_VALUE, - MinValue = DEFAULT_FS_AZURE_STANDARD_MIN_REQUEST_PRIORITY_VALUE, - DefaultValue = DEFAULT_FS_AZURE_REQUEST_PRIORITY_VALUE) + MinValue = DEFAULT_FS_AZURE_STANDARD_REQUEST_PRIORITY_VALUE, + DefaultValue = DEFAULT_FS_AZURE_LOWEST_REQUEST_PRIORITY_VALUE) private int prefetchRequestPriorityValue; private String clientProvidedEncryptionKey; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 2f554160109e41..8c5bc2229cfa03 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -395,11 +395,10 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY = true; - // The default traffic request priority is 3 (from service) + // The default traffic request priority is 3 (from service side) // The lowest priority a request can get is 7 - public static final int DEFAULT_FS_AZURE_REQUEST_PRIORITY_VALUE = 7; - - public static final int DEFAULT_FS_AZURE_STANDARD_MIN_REQUEST_PRIORITY_VALUE = 3; + public static final int DEFAULT_FS_AZURE_LOWEST_REQUEST_PRIORITY_VALUE = 7; + public static final int DEFAULT_FS_AZURE_STANDARD_REQUEST_PRIORITY_VALUE = 3; public static final boolean DEFAULT_FS_AZURE_ENABLE_TAIL_LATENCY_TRACKER = false; public static final boolean DEFAULT_FS_AZURE_ENABLE_TAIL_LATENCY_REQUEST_TIMEOUT = false; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index dc80ef566329dd..aa6ce16373a221 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -938,6 +938,9 @@ public void testPrefetchReadAddsPriorityHeaderWithDifferentConfigs() executePrefetchReadTest(tracingContext1, configuration1, false); } + /* + * Helper method to execute read and verify if priority header is added or not as expected + */ private void executePrefetchReadTest(TracingContext tracingContext, Configuration rawConfig, boolean shouldHaveHeader) throws Exception { From af084723121961541df66ec8f91523ab254cbf6c Mon Sep 17 00:00:00 2001 From: Manika Joshi Date: Fri, 24 Apr 2026 23:44:50 -0700 Subject: [PATCH 8/9] prod changes --- .../hadoop/fs/azurebfs/AbfsConfiguration.java | 14 +++ .../fs/azurebfs/AzureBlobFileSystemStore.java | 4 + .../azurebfs/constants/ConfigurationKeys.java | 1 + .../constants/FileSystemConfigurations.java | 1 + .../fs/azurebfs/services/AbfsClient.java | 16 +++ .../azurebfs/services/AbfsClientContext.java | 7 ++ .../services/AbfsClientContextBuilder.java | 8 ++ .../fs/azurebfs/services/AbfsInputStream.java | 2 + .../azurebfs/services/AbfsRestOperation.java | 2 +- .../services/ExponentialRetryPolicy.java | 12 ++ .../azurebfs/services/ReadBufferManager.java | 19 ++++ .../services/ReadBufferManagerV1.java | 4 +- .../services/ReadBufferManagerV2.java | 2 +- .../fs/azurebfs/services/ITestAbfsClient.java | 10 +- .../services/ITestExponentialRetryPolicy.java | 86 +++++++++++++++ .../services/TestAbfsInputStream.java | 104 ++++++++++++------ 16 files changed, 254 insertions(+), 38 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 0a32a0507755f6..d615f8cc4fb4b9 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -189,6 +189,11 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_MAX_RETRY_ATTEMPTS) private int maxIoRetries; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_MAX_IO_PREFETCH_RETRIES, + MinValue = 0, + DefaultValue = DEFAULT_PREFETCH_MAX_RETRY_ATTEMPTS) + private int maxIoPrefetchRetries; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CUSTOM_TOKEN_FETCH_RETRY_COUNT, MinValue = 0, DefaultValue = DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT) @@ -1142,6 +1147,10 @@ public int getMaxIoRetries() { return this.maxIoRetries; } + public int getPrefetchMaxIoRetries() { + return this.maxIoPrefetchRetries; + } + public int getCustomTokenFetchRetryCount() { return this.customTokenFetchRetryCount; } @@ -1977,6 +1986,11 @@ public void setMaxIoRetries(int maxIoRetries) { this.maxIoRetries = maxIoRetries; } + @VisibleForTesting + public void setMaxIoPrefetchRetries(int maxIoPrefetchRetries) { + this.maxIoPrefetchRetries = maxIoPrefetchRetries; + } + @VisibleForTesting void setMaxBackoffIntervalMilliseconds(int maxBackoffInterval) { this.maxBackoffInterval = maxBackoffInterval; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index e92e3397c915f7..91ca69e8e817f6 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -225,6 +225,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { public AzureBlobFileSystemStore( AzureBlobFileSystemStoreBuilder abfsStoreBuilder) throws IOException { this.uri = abfsStoreBuilder.uri; + //todo: this String[] authorityParts = authorityParts(uri); final String fileSystemName = authorityParts[0]; final String accountName = authorityParts[1]; @@ -1886,6 +1887,7 @@ private void initializeClient(URI uri, String fileSystemName, LOG.trace("AbfsClient init complete"); } + //todo: here private AbfsServiceType getAbfsServiceTypeFromUrl() { if (uri.toString().contains(ABFS_BLOB_DOMAIN_NAME)) { return AbfsServiceType.BLOB; @@ -1905,6 +1907,8 @@ private AbfsClientContext populateAbfsClientContext() { return new AbfsClientContextBuilder() .withExponentialRetryPolicy( new ExponentialRetryPolicy(abfsConfiguration)) + .withPrefetchExponentialRetryPolicy( + new ExponentialRetryPolicy(abfsConfiguration, true)) .withStaticRetryPolicy( new StaticRetryPolicy(abfsConfiguration)) .withTailLatencyRequestTimeoutRetryPolicy( diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index d2dbe17650820e..820e4946c7eed6 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -136,6 +136,7 @@ public final class ConfigurationKeys { public static final String AZURE_STATIC_RETRY_INTERVAL = "fs.azure.static.retry.interval"; public static final String AZURE_BACKOFF_INTERVAL = "fs.azure.io.retry.backoff.interval"; public static final String AZURE_MAX_IO_RETRIES = "fs.azure.io.retry.max.retries"; + public static final String AZURE_MAX_IO_PREFETCH_RETRIES = "fs.azure.io.prefetch.retry.max.retries"; public static final String AZURE_CUSTOM_TOKEN_FETCH_RETRY_COUNT = "fs.azure.custom.token.fetch.retry.count"; /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 328c878401c65a..b26053f8ff4830 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -51,6 +51,7 @@ public final class FileSystemConfigurations { public static final int DEFAULT_STATIC_RETRY_INTERVAL = 1_000; // 1s public static final int DEFAULT_BACKOFF_INTERVAL = 500; // 500ms public static final int DEFAULT_MAX_RETRY_ATTEMPTS = 30; + public static final int DEFAULT_PREFETCH_MAX_RETRY_ATTEMPTS = 2; public static final int DEFAULT_CUSTOM_TOKEN_FETCH_RETRY_COUNT = 3; /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index a781e12c86a423..83b6154e6e29e1 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -158,6 +158,7 @@ public abstract class AbfsClient implements Closeable { private final SharedKeyCredentials sharedKeyCredentials; private ApiVersion xMsVersion = ApiVersion.getCurrentVersion(); private final ExponentialRetryPolicy exponentialRetryPolicy; + private final ExponentialRetryPolicy prefetchRetryPolicy; private final StaticRetryPolicy staticRetryPolicy; private final TailLatencyRequestTimeoutRetryPolicy tailLatencyRequestTimeoutRetryPolicy; private final String filesystem; @@ -212,6 +213,7 @@ private AbfsClient(final URL baseUrl, this.filesystem = baseUrlString.substring(indexLastForwardSlash + 1); this.abfsConfiguration = abfsConfiguration; this.exponentialRetryPolicy = abfsClientContext.getExponentialRetryPolicy(); + this.prefetchRetryPolicy = abfsClientContext.getPrefetchExponentialRetryPolicy(); this.staticRetryPolicy = abfsClientContext.getStaticRetryPolicy(); this.tailLatencyRequestTimeoutRetryPolicy = abfsClientContext.getTailLatencyRequestTimeoutRetryPolicy(); this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT)); @@ -374,6 +376,10 @@ ExponentialRetryPolicy getExponentialRetryPolicy() { return exponentialRetryPolicy; } + ExponentialRetryPolicy getPrefetchRetryPolicy() { + return prefetchRetryPolicy; + } + StaticRetryPolicy getStaticRetryPolicy() { return staticRetryPolicy; } @@ -397,6 +403,16 @@ && getAbfsConfiguration().getStaticRetryForConnectionTimeoutEnabled()) { return getExponentialRetryPolicy(); } + public AbfsRetryPolicy getRetryPolicy(TracingContext tc, final String failureReason){ + if (getAbfsConfiguration().isEnablePrefetchRequestPriority() + && ReadType.PREFETCH_READ.equals(tc.getReadType())){ + return getPrefetchRetryPolicy(); + } + else { + return getRetryPolicy(failureReason); + } + } + SharedKeyCredentials getSharedKeyCredentials() { return sharedKeyCredentials; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContext.java index 27b2d5996e02ea..f89a0317550d89 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContext.java @@ -25,6 +25,7 @@ public class AbfsClientContext { private final ExponentialRetryPolicy exponentialRetryPolicy; + private final ExponentialRetryPolicy prefetchExponentialRetryPolicy; private final StaticRetryPolicy staticRetryPolicy; private final TailLatencyRequestTimeoutRetryPolicy tailLatencyRequestTimeoutRetryPolicy; private final AbfsPerfTracker abfsPerfTracker; @@ -33,11 +34,13 @@ public class AbfsClientContext { AbfsClientContext( ExponentialRetryPolicy exponentialRetryPolicy, + ExponentialRetryPolicy prefetchExponentialRetryPolicy, StaticRetryPolicy staticRetryPolicy, TailLatencyRequestTimeoutRetryPolicy tailLatencyRequestTimeoutRetryPolicy, AbfsPerfTracker abfsPerfTracker, AbfsCounters abfsCounters, String fileSystemId) { this.exponentialRetryPolicy = exponentialRetryPolicy; + this.prefetchExponentialRetryPolicy = prefetchExponentialRetryPolicy; this.staticRetryPolicy = staticRetryPolicy; this.tailLatencyRequestTimeoutRetryPolicy = tailLatencyRequestTimeoutRetryPolicy; this.abfsPerfTracker = abfsPerfTracker; @@ -49,6 +52,10 @@ public ExponentialRetryPolicy getExponentialRetryPolicy() { return exponentialRetryPolicy; } + public ExponentialRetryPolicy getPrefetchExponentialRetryPolicy() { + return prefetchExponentialRetryPolicy; + } + public StaticRetryPolicy getStaticRetryPolicy() { return staticRetryPolicy; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContextBuilder.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContextBuilder.java index 5a175d27263017..3c5501607b1367 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContextBuilder.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientContextBuilder.java @@ -25,6 +25,7 @@ public class AbfsClientContextBuilder { private ExponentialRetryPolicy exponentialRetryPolicy; + private ExponentialRetryPolicy prefetchExponentialRetryPolicy; private StaticRetryPolicy staticRetryPolicy; private TailLatencyRequestTimeoutRetryPolicy tailLatencyRequestTimeoutRetryPolicy; private AbfsPerfTracker abfsPerfTracker; @@ -37,6 +38,12 @@ public AbfsClientContextBuilder withExponentialRetryPolicy( return this; } + public AbfsClientContextBuilder withPrefetchExponentialRetryPolicy( + final ExponentialRetryPolicy prefetchExponentialRetryPolicy) { + this.prefetchExponentialRetryPolicy = prefetchExponentialRetryPolicy; + return this; + } + public AbfsClientContextBuilder withStaticRetryPolicy( final StaticRetryPolicy staticRetryPolicy) { this.staticRetryPolicy = staticRetryPolicy; @@ -74,6 +81,7 @@ public AbfsClientContext build() { //validate the values return new AbfsClientContext( exponentialRetryPolicy, + prefetchExponentialRetryPolicy, staticRetryPolicy, tailLatencyRequestTimeoutRetryPolicy, abfsPerfTracker, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index eba5b09337775e..e9ad01aaae0914 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -551,6 +551,8 @@ protected int readInternal(final long position, final byte[] b, final int offset nextSize = min((long) readAheadBlockSize, contentLength - nextOffset); } + getReadBufferManager().setPrefetchRequestPriorityEnabled(client.getAbfsConfiguration().isEnablePrefetchRequestPriority()); + // try reading from buffers first receivedBytes = getReadBufferManager().getBlock(this, position, length, b); bytesFromReadAhead += receivedBytes; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index 1b55084fb45717..82dd8ae35f1ae0 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -484,7 +484,7 @@ private boolean executeHttpOperation(final int retryCount, int status = httpOperation.getStatusCode(); failureReason = RetryReason.getAbbreviation(null, status, httpOperation.getStorageErrorMessage()); - retryPolicy = client.getRetryPolicy(failureReason); + retryPolicy = client.getRetryPolicy(tracingContext, failureReason); if (retryPolicy.shouldRetry(retryCount, httpOperation.getStatusCode())) { return false; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java index f1f2bc8be346f0..690c844b1d6c45 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java @@ -94,6 +94,18 @@ public ExponentialRetryPolicy(AbfsConfiguration conf) { conf.getBackoffIntervalMilliseconds()); } + /** + * Initializes a new instance of the {@link ExponentialRetryPolicy} class. + * + * @param conf The {@link AbfsConfiguration} from which to retrieve retry configuration. + */ + public ExponentialRetryPolicy(AbfsConfiguration conf, Boolean isPrefetch) { + this(conf.getPrefetchMaxIoRetries(), + conf.getMinBackoffIntervalMilliseconds(), + conf.getMaxBackoffIntervalMilliseconds(), + conf.getBackoffIntervalMilliseconds()); + } + /** * Initializes a new instance of the {@link ExponentialRetryPolicy} class. * diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java index 5b53d641a20df8..e885ac09d56c09 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java @@ -47,6 +47,7 @@ public abstract class ReadBufferManager { private Queue readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet private LinkedList inProgressList = new LinkedList<>(); // requests being processed by worker threads private LinkedList completedReadList = new LinkedList<>(); // buffers available for reading + private boolean isPrefetchRequestPriorityEnabled; /** * Initializes the ReadBufferManager singleton instance. Creates the read buffers and threads. @@ -198,6 +199,15 @@ protected static void setReadAheadBlockSize(int readAheadBlockSize) { blockSize = readAheadBlockSize; } + /** + * Sets prefetch request priority according to the config set + * + * @param isEnabled true if its enabled, false otherwise + */ + protected void setPrefetchRequestPriorityEnabled(boolean isEnabled) { + isPrefetchRequestPriorityEnabled = isEnabled; + } + /** * Gets the queue of read-ahead requests. * @@ -274,6 +284,15 @@ int getCompletedReadListSize() { return completedReadList.size(); } + /** + * Checks if prefetch request priority is enabled. + * + * @return true if prefetch request priority is enabled, false otherwise + */ + boolean isPrefetchRequestPriorityEnabled() { + return isPrefetchRequestPriorityEnabled; + } + /** * Simulates full buffer usage and adds a failed buffer for testing. * diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java index c034d856596038..f2fdb6b17b9f47 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java @@ -485,7 +485,9 @@ private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long if (buf.getStatus() == ReadBufferStatus.READ_FAILED) { // To prevent new read requests to fail due to old read-ahead attempts, // return exception only from buffers that failed within last thresholdAgeMilliseconds - if ((currentTimeMillis() - (buf.getTimeStamp()) < getThresholdAgeMilliseconds())) { + if (!isPrefetchRequestPriorityEnabled() && ( + currentTimeMillis() - (buf.getTimeStamp()) + < getThresholdAgeMilliseconds())) { throw buf.getErrException(); } else { return 0; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java index c271a5e354acda..aa2641b49c3c2f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java @@ -709,7 +709,7 @@ private int getBlockFromCompletedQueue(final String eTag, final long position, if (buf.getStatus() == ReadBufferStatus.READ_FAILED) { // To prevent new read requests to fail due to old read-ahead attempts, // return exception only from buffers that failed within last getThresholdAgeMilliseconds() - if ((currentTimeMillis() - (buf.getTimeStamp()) + if (!isPrefetchRequestPriorityEnabled() && (currentTimeMillis() - (buf.getTimeStamp()) < getThresholdAgeMilliseconds())) { throw buf.getErrException(); } else { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java index daa1bda78c2e79..9339fe61230623 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java @@ -416,10 +416,12 @@ public static AbfsClient createTestClientFromCurrentContext( AbfsClientContext abfsClientContext = new AbfsClientContextBuilder().withAbfsPerfTracker(tracker) - .withExponentialRetryPolicy( - new ExponentialRetryPolicy(abfsConfig.getMaxIoRetries())) - .withAbfsCounters(abfsCounters) - .build(); + .withExponentialRetryPolicy( + new ExponentialRetryPolicy(abfsConfig.getMaxIoRetries())) + .withPrefetchExponentialRetryPolicy( + new ExponentialRetryPolicy(abfsConfig.getPrefetchMaxIoRetries())) + .withAbfsCounters(abfsCounters) + .build(); // Create test AbfsClient AbfsClient testClient; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestExponentialRetryPolicy.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestExponentialRetryPolicy.java index a2560dd6181eaf..9e9476cc5a4cb3 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestExponentialRetryPolicy.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestExponentialRetryPolicy.java @@ -23,10 +23,12 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DOT; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_BACKOFF_INTERVAL; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_BACKOFF_INTERVAL; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_IO_PREFETCH_RETRIES; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_IO_RETRIES; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MIN_BACKOFF_INTERVAL; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_AUTOTHROTTLING; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE; import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT1_NAME; import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ACCOUNT_KEY; @@ -54,6 +56,8 @@ import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; +import org.apache.hadoop.fs.azurebfs.constants.ReadType; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; /** * Unit test ITestExponentialRetryPolicy. @@ -340,4 +344,86 @@ private void testMaxIOConfig(AbfsConfiguration abfsConfig) { .describedAs("When all retries are exhausted, the retryCount will be same as max configured.") .isEqualTo(abfsConfig.getMaxIoRetries()); } + + /** + * Test prefetch exponential retry policy max retries configuration. + * Validates that prefetch retry policy correctly uses the configured + * max prefetch IO retries value and respects retry limits. + * + * @throws Exception if test fails + */ + @Test + public void testPrefetchExponentialRetryPolicyMaxRetries() throws Exception { + AbfsConfiguration abfsConfig = getAbfsConfig(); + int prefetchRetries = 5; + abfsConfig.setMaxIoPrefetchRetries(prefetchRetries); + ExponentialRetryPolicy prefetchPolicy = new ExponentialRetryPolicy(abfsConfig, true); + + Assertions.assertThat(prefetchPolicy.getMaxRetryCount()) + .describedAs("Prefetch policy should use getPrefetchMaxIoRetries value") + .isEqualTo(prefetchRetries); + + // Validate retry logic + for (int i = 0; i < prefetchRetries; i++) { + Assertions.assertThat(prefetchPolicy.shouldRetry(i, -1)) + .describedAs("Should retry for attempts less than maxPrefetchIoRetries") + .isTrue(); + } + Assertions.assertThat(prefetchPolicy.shouldRetry(prefetchRetries, -1)) + .describedAs("Should not retry when attempts equal to maxPrefetchIoRetries") + .isFalse(); + } + + /** + * Test client getRetryPolicy returns correct policy based on configuration. + * Validates that: + * - When prefetch priority is enabled, prefetch retry policy is returned for PREFETCH_READ + * - When prefetch priority is disabled, default retry policy is returned + * - Retry counts are correctly configured for each policy type + * + * @throws Exception if test fails + */ + @Test + public void testGetRetryPolicyBasedOnConfiguration() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + TracingContext tc = mock(TracingContext.class); + when(tc.getReadType()).thenReturn(ReadType.PREFETCH_READ); + + // Test with prefetch priority enabled + Configuration configWithPrefetch = new Configuration(); + configWithPrefetch.set(FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY, "true"); + configWithPrefetch.set(AZURE_MAX_IO_PREFETCH_RETRIES, "4"); + AbfsConfiguration abfsConfigWithPrefetch = new AbfsConfiguration(configWithPrefetch, + DUMMY_ACCOUNT_NAME); + AbfsClient clientWithPrefetch = ITestAbfsClient.createTestClientFromCurrentContext( + fs.getAbfsStore().getClient(), abfsConfigWithPrefetch); + + AbfsRetryPolicy prefetchPolicy = clientWithPrefetch.getRetryPolicy(tc, ""); + Assertions.assertThat(prefetchPolicy) + .describedAs("Should return prefetchExponentialRetryPolicy for PREFETCH_READ when enabled") + .isEqualTo(clientWithPrefetch.getPrefetchRetryPolicy()); + Assertions.assertThat(prefetchPolicy.getMaxRetryCount()) + .describedAs("Prefetch policy should have correct max retry count") + .isEqualTo(4); + Assertions.assertThat(prefetchPolicy.shouldRetry(0, -1)).isTrue(); + Assertions.assertThat(prefetchPolicy.shouldRetry(3, -1)).isTrue(); + Assertions.assertThat(prefetchPolicy.shouldRetry(4, -1)).isFalse(); + + // Test with prefetch priority disabled + Configuration configWithoutPrefetch = new Configuration(); + configWithoutPrefetch.set(FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY, "false"); + configWithoutPrefetch.set(AZURE_MAX_IO_RETRIES, "7"); + AbfsConfiguration abfsConfigWithoutPrefetch = new AbfsConfiguration(configWithoutPrefetch, + DUMMY_ACCOUNT_NAME); + AbfsClient clientWithoutPrefetch = ITestAbfsClient.createTestClientFromCurrentContext( + fs.getAbfsStore().getClient(), abfsConfigWithoutPrefetch); + + AbfsRetryPolicy defaultPolicy = clientWithoutPrefetch.getRetryPolicy(tc, "anyFailureReason"); + Assertions.assertThat(defaultPolicy) + .describedAs("Should return default exponentialRetryPolicy when prefetch priority is disabled") + .isEqualTo(clientWithoutPrefetch.getExponentialRetryPolicy()); + Assertions.assertThat(defaultPolicy.getMaxRetryCount()) + .describedAs("Default policy should have correct max retry count") + .isEqualTo(7); + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index 93ab2d85c7ac99..8ac34dd123dbba 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -76,6 +76,8 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COLON; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SPLIT_NO_LIMIT; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_IO_PREFETCH_RETRIES; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_IO_RETRIES; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_REQUEST_PRIORITY; import static org.apache.hadoop.fs.azurebfs.constants.ReadType.DIRECT_READ; @@ -95,6 +97,7 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; @@ -174,6 +177,7 @@ AbfsRestOperation getMockRestOpWithMetadata() { AbfsClient getMockAbfsClient() throws URISyntaxException { // Mock failure for client.read() AbfsClient client = mock(AbfsClient.class); + AbfsConfiguration abfsConfig = mock(AbfsConfiguration.class); AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd"))); Mockito.doReturn(abfsCounters).when(client).getAbfsCounters(); AbfsPerfTracker tracker = new AbfsPerfTracker( @@ -181,7 +185,10 @@ AbfsClient getMockAbfsClient() throws URISyntaxException { this.getAccountName(), this.getConfiguration()); when(client.getAbfsPerfTracker()).thenReturn(tracker); - + when(client.getAbfsConfiguration()).thenReturn(abfsConfig); + // Delegate to real config instead of hardcoding false + when(abfsConfig.isEnablePrefetchRequestPriority()) + .thenReturn(getConfiguration().isEnablePrefetchRequestPriority()); return client; } @@ -333,6 +340,65 @@ private void checkGetPathStatusCalls(Path testFile, FileStatus fileStatus, Mockito.reset(mockClient); //clears invocation count for next test case } + /** + * Verifies that when prefetch read fails and + * FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY is true (default), the + * exception is NOT propagated to the caller. The stream should silently + * fall back or exhaust its (lower) retry budget without surfacing an error. + * The failed reads should be of readtype PR and the successful one of MR. + * This tests the core contract: prefetch failures are silent when the + * feature flag is on. + */ + @Test + public void testPrefetchFailureNotPropagatedToUserWhenPriorityEnabled() throws Exception { + AbfsClient client = getMockAbfsClient(); + + AbfsRestOperation successOp = getMockRestOp(); + when(client.getAbfsConfiguration().isEnablePrefetchRequestPriority()) + .thenReturn(true); + + // 3 Prefetch reads fail → final direct (missed cache) read succeeds + doThrow(new TimeoutException("Internal Server error for RAH-Thread-X")) + .doThrow(new TimeoutException("Internal Server error for RAH-Thread-Y")) + .doThrow(new TimeoutException("Internal Server error for RAH-Thread-Z")) + .doReturn(successOp) + .when(client) + .read(any(String.class), any(Long.class), any(byte[].class), + any(Integer.class), any(Integer.class), any(String.class), + any(String.class), any(), any(TracingContext.class)); + + AbfsInputStream inputStream = getAbfsInputStream(client, "testFailedReadAheadReadTypes.txt"); + + inputStream.read(new byte[1 * ONE_KB]); // triggers all 4 read calls + + // Verify count remains same as earlier test + verifyReadCallCount(client, 4); + + // Capture TracingContext arguments (9th parameter) + ArgumentCaptor traceCaptor = ArgumentCaptor.forClass(TracingContext.class); + verify(client, times(4)) + .read(any(String.class), any(Long.class), any(byte[].class), + any(Integer.class), any(Integer.class), any(String.class), + any(String.class), any(), traceCaptor.capture()); + + List contextList = traceCaptor.getAllValues(); + + // First 3 calls = PR (Prefetch Reads) + for (int i = 0; i < 3; i++) { + verifyHeaderForReadTypeInTracingContextHeader(contextList.get(i), + ReadType.PREFETCH_READ, + -1 + ); + } + + // 4th call = MR (Missed Cache Read) + verifyHeaderForReadTypeInTracingContextHeader(contextList.get(3), + MISSEDCACHE_READ, + 0 + ); + } + + @Test public void testOpenFileWithOptions() throws Exception { AzureBlobFileSystem fs = getFileSystem(); @@ -1056,6 +1122,9 @@ public void testFailedReadAhead() throws Exception { AbfsClient client = getMockAbfsClient(); AbfsRestOperation successOp = getMockRestOp(); + when(client.getAbfsConfiguration().isEnablePrefetchRequestPriority()) + .thenReturn(false); + // Stub : // Read request leads to 3 readahead calls: Fail all 3 readahead-client.read() // Actual read request fails with the failure in readahead thread @@ -1133,6 +1202,9 @@ public void testOlderReadAheadFailure() throws Exception { AbfsClient client = getMockAbfsClient(); AbfsRestOperation successOp = getMockRestOp(); + when(client.getAbfsConfiguration().isEnablePrefetchRequestPriority()) + .thenReturn(false); + // Stub : // First Read request leads to 3 readahead calls: Fail all 3 readahead-client.read() // A second read request will see that readahead had failed for data in @@ -1799,36 +1871,6 @@ private void executePrefetchReadTest(TracingContext tracingContext, } } - /* - * Test to verify that both conditions of prefetch read and respective config - * enabled needs to be true for the priority header to be added - */ - @Test - public void testPrefetchReadAddsPriorityHeaderWithDifferentConfigs() - throws Exception { - Configuration configuration1 = new Configuration(getRawConfiguration()); - configuration1.set(FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY, "true"); - - Configuration configuration2 = new Configuration(getRawConfiguration()); - //use the default value for the config: false - configuration2.unset(FS_AZURE_ENABLE_PREFETCH_REQUEST_PRIORITY); - - TracingContext tracingContext1 = mock(TracingContext.class); - when(tracingContext1.getReadType()).thenReturn(PREFETCH_READ); - - //Prefetch Read with config enabled - executePrefetchReadTest(tracingContext1, configuration1, true); - //Prefetch Read with config disabled - executePrefetchReadTest(tracingContext1, configuration2, false); - - when(tracingContext1.getReadType()).thenReturn(DIRECT_READ); - - //Non-prefetch read with config disabled - executePrefetchReadTest(tracingContext1, configuration2, false); - //Non-prefetch read with config enabled - executePrefetchReadTest(tracingContext1, configuration1, false); - } - private Path createTestFile(AzureBlobFileSystem fs, int fileSize) throws Exception { Path testPath = new Path("testFile"); byte[] fileContent = getRandomBytesArray(fileSize); From 86615ca0405a15a3ecdcd116b827b56b59257c4c Mon Sep 17 00:00:00 2001 From: Manika Joshi Date: Tue, 28 Apr 2026 20:34:50 -0700 Subject: [PATCH 9/9] cleanup --- .../fs/azurebfs/AzureBlobFileSystemStore.java | 4 +- .../fs/azurebfs/services/AbfsClient.java | 6 +++ .../services/ExponentialRetryPolicy.java | 11 +++-- .../azurebfs/services/AbfsClientTestUtil.java | 5 ++ .../fs/azurebfs/services/ITestAbfsClient.java | 2 + .../services/ITestExponentialRetryPolicy.java | 2 +- .../services/TestAbfsInputStream.java | 48 +++++++++++-------- .../TestAbfsRestOperationMockFailures.java | 3 +- 8 files changed, 52 insertions(+), 29 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 91ca69e8e817f6..ada6466bdca26a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -225,7 +225,6 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport { public AzureBlobFileSystemStore( AzureBlobFileSystemStoreBuilder abfsStoreBuilder) throws IOException { this.uri = abfsStoreBuilder.uri; - //todo: this String[] authorityParts = authorityParts(uri); final String fileSystemName = authorityParts[0]; final String accountName = authorityParts[1]; @@ -1887,7 +1886,6 @@ private void initializeClient(URI uri, String fileSystemName, LOG.trace("AbfsClient init complete"); } - //todo: here private AbfsServiceType getAbfsServiceTypeFromUrl() { if (uri.toString().contains(ABFS_BLOB_DOMAIN_NAME)) { return AbfsServiceType.BLOB; @@ -1908,7 +1906,7 @@ private AbfsClientContext populateAbfsClientContext() { .withExponentialRetryPolicy( new ExponentialRetryPolicy(abfsConfiguration)) .withPrefetchExponentialRetryPolicy( - new ExponentialRetryPolicy(abfsConfiguration, true)) + ExponentialRetryPolicy.forPrefetch(abfsConfiguration)) .withStaticRetryPolicy( new StaticRetryPolicy(abfsConfiguration)) .withTailLatencyRequestTimeoutRetryPolicy( diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 83b6154e6e29e1..cbff2ced2e675e 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -403,6 +403,12 @@ && getAbfsConfiguration().getStaticRetryForConnectionTimeoutEnabled()) { return getExponentialRetryPolicy(); } + /** + * Returns the retry policy based on tracing context and failure reason. + * @param tc the tracing context containing read type information. + * @param failureReason the reason for the failure to determine retry policy. + * @return retry policy to be used, prefetch policy if enabled and prefetch read, otherwise standard policy. + */ public AbfsRetryPolicy getRetryPolicy(TracingContext tc, final String failureReason){ if (getAbfsConfiguration().isEnablePrefetchRequestPriority() && ReadType.PREFETCH_READ.equals(tc.getReadType())){ diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java index 690c844b1d6c45..3e690297dd868f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ExponentialRetryPolicy.java @@ -95,12 +95,15 @@ public ExponentialRetryPolicy(AbfsConfiguration conf) { } /** - * Initializes a new instance of the {@link ExponentialRetryPolicy} class. + * Creates an {@link ExponentialRetryPolicy} configured with prefetch retry + * settings read from the given {@link AbfsConfiguration}. * - * @param conf The {@link AbfsConfiguration} from which to retrieve retry configuration. + * @param conf The {@link AbfsConfiguration} from which to retrieve prefetch retry configuration. + * @return a new {@link ExponentialRetryPolicy} for prefetch operations. */ - public ExponentialRetryPolicy(AbfsConfiguration conf, Boolean isPrefetch) { - this(conf.getPrefetchMaxIoRetries(), + public static ExponentialRetryPolicy forPrefetch(AbfsConfiguration conf) { + return new ExponentialRetryPolicy( + conf.getPrefetchMaxIoRetries(), conf.getMinBackoffIntervalMilliseconds(), conf.getMaxBackoffIntervalMilliseconds(), conf.getBackoffIntervalMilliseconds()); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java index cd1a2af7d6c59b..58b14ce496444d 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientTestUtil.java @@ -386,6 +386,11 @@ public static void addGeneralMockBehaviourToAbfsClient(final AbfsClient abfsClie Mockito.doReturn(staticRetryPolicy).when(abfsClient).getRetryPolicy(CONNECTION_TIMEOUT_ABBREVIATION); Mockito.doReturn(exponentialRetryPolicy).when(abfsClient).getRetryPolicy( AdditionalMatchers.not(eq(CONNECTION_TIMEOUT_ABBREVIATION))); + Mockito.doReturn(staticRetryPolicy).when(abfsClient).getRetryPolicy( + any(TracingContext.class), eq(CONNECTION_TIMEOUT_ABBREVIATION)); + Mockito.doReturn(exponentialRetryPolicy).when(abfsClient).getRetryPolicy( + any(TracingContext.class), + AdditionalMatchers.not(eq(CONNECTION_TIMEOUT_ABBREVIATION))); // Defining behavior of static retry policy Mockito.doReturn(true).when(staticRetryPolicy) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java index 9339fe61230623..a68ebb5262cf61 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java @@ -526,6 +526,8 @@ public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance, new ExponentialRetryPolicy(1)); when(client.getRetryPolicy(any())).thenReturn( new ExponentialRetryPolicy(1)); + when(client.getRetryPolicy(any(TracingContext.class), any())).thenReturn( + new ExponentialRetryPolicy(1)); when(client.createDefaultUriQueryBuilder()).thenCallRealMethod(); when(client.createRequestUrl(any(), any())).thenCallRealMethod(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestExponentialRetryPolicy.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestExponentialRetryPolicy.java index 9e9476cc5a4cb3..06d4f1d12868cf 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestExponentialRetryPolicy.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestExponentialRetryPolicy.java @@ -357,7 +357,7 @@ public void testPrefetchExponentialRetryPolicyMaxRetries() throws Exception { AbfsConfiguration abfsConfig = getAbfsConfig(); int prefetchRetries = 5; abfsConfig.setMaxIoPrefetchRetries(prefetchRetries); - ExponentialRetryPolicy prefetchPolicy = new ExponentialRetryPolicy(abfsConfig, true); + ExponentialRetryPolicy prefetchPolicy = ExponentialRetryPolicy.forPrefetch(abfsConfig); Assertions.assertThat(prefetchPolicy.getMaxRetryCount()) .describedAs("Prefetch policy should use getPrefetchMaxIoRetries value") diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index 8ac34dd123dbba..ccf1a39187828f 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -186,7 +186,7 @@ AbfsClient getMockAbfsClient() throws URISyntaxException { this.getConfiguration()); when(client.getAbfsPerfTracker()).thenReturn(tracker); when(client.getAbfsConfiguration()).thenReturn(abfsConfig); - // Delegate to real config instead of hardcoding false + when(abfsConfig.isEnablePrefetchRequestPriority()) .thenReturn(getConfiguration().isEnablePrefetchRequestPriority()); return client; @@ -346,8 +346,6 @@ private void checkGetPathStatusCalls(Path testFile, FileStatus fileStatus, * exception is NOT propagated to the caller. The stream should silently * fall back or exhaust its (lower) retry budget without surfacing an error. * The failed reads should be of readtype PR and the successful one of MR. - * This tests the core contract: prefetch failures are silent when the - * feature flag is on. */ @Test public void testPrefetchFailureNotPropagatedToUserWhenPriorityEnabled() throws Exception { @@ -357,12 +355,13 @@ public void testPrefetchFailureNotPropagatedToUserWhenPriorityEnabled() throws E when(client.getAbfsConfiguration().isEnablePrefetchRequestPriority()) .thenReturn(true); - // 3 Prefetch reads fail → final direct (missed cache) read succeeds - doThrow(new TimeoutException("Internal Server error for RAH-Thread-X")) - .doThrow(new TimeoutException("Internal Server error for RAH-Thread-Y")) - .doThrow(new TimeoutException("Internal Server error for RAH-Thread-Z")) - .doReturn(successOp) - .when(client) + doAnswer(invocation -> { + TracingContext tc = invocation.getArgument(8); + if (ReadType.PREFETCH_READ.equals(tc.getReadType())) { + throw new TimeoutException("Internal Server error for prefetch"); + } + return successOp; + }).when(client) .read(any(String.class), any(Long.class), any(byte[].class), any(Integer.class), any(Integer.class), any(String.class), any(String.class), any(), any(TracingContext.class)); @@ -383,19 +382,28 @@ public void testPrefetchFailureNotPropagatedToUserWhenPriorityEnabled() throws E List contextList = traceCaptor.getAllValues(); - // First 3 calls = PR (Prefetch Reads) - for (int i = 0; i < 3; i++) { - verifyHeaderForReadTypeInTracingContextHeader(contextList.get(i), - ReadType.PREFETCH_READ, - -1 - ); + // Exactly 1 missed-cache read and 3 prefetch reads. + List prefetchContexts = new ArrayList<>(); + TracingContext missedCacheContext = null; + for (TracingContext ctx : contextList) { + if (MISSEDCACHE_READ.equals(ctx.getReadType())) { + missedCacheContext = ctx; + } else { + prefetchContexts.add(ctx); + } } - // 4th call = MR (Missed Cache Read) - verifyHeaderForReadTypeInTracingContextHeader(contextList.get(3), - MISSEDCACHE_READ, - 0 - ); + assertThat(missedCacheContext) + .describedAs("Exactly one missed-cache read should have occurred") + .isNotNull(); + assertThat(prefetchContexts.size()) + .describedAs("Exactly 3 prefetch reads should have occurred") + .isEqualTo(3); + + for (TracingContext ctx : prefetchContexts) { + verifyHeaderForReadTypeInTracingContextHeader(ctx, ReadType.PREFETCH_READ, -1); + } + verifyHeaderForReadTypeInTracingContextHeader(missedCacheContext, MISSEDCACHE_READ, 0); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java index 55d3b5f065716d..75f889b746956f 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsRestOperationMockFailures.java @@ -60,6 +60,7 @@ import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.UNKNOWN_HOST_EXCEPTION_ABBREVIATION; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; @@ -280,7 +281,7 @@ public void testRetryPolicyWithDifferentFailureReasons() throws Exception { // Before iteration 3 sleep will be computed using exponential retry policy and retry count 2 // Should retry with retry count 2 will return false and no further requests will be made. verify(abfsClient, times(2)) - .getRetryPolicy(EGRESS_LIMIT_BREACH_ABBREVIATION); + .getRetryPolicy(any(TracingContext.class), eq(EGRESS_LIMIT_BREACH_ABBREVIATION)); verify(exponentialRetryPolicy, times(1)) .shouldRetry(1, HTTP_UNAVAILABLE); verify(exponentialRetryPolicy, times(1))