Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INTERNAL]: Add RegionalRoutingContext to encapsulate URI. #44398

Merged
merged 18 commits into from
Mar 3, 2025
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.azure.cosmos.implementation.directconnectivity.AddressSelector;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdUtils;
import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
import com.azure.cosmos.implementation.routing.LocationCache;
import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
import com.azure.cosmos.test.faultinjection.FaultInjectionCondition;
import com.azure.cosmos.test.faultinjection.FaultInjectionConnectionErrorResult;
Expand Down Expand Up @@ -447,6 +448,7 @@ private Mono<List<URI>> resolvePhysicalAddresses(
null);

faultInjectionAddressRequest.requestContext.locationEndpointToRoute = regionEndpoint;
faultInjectionAddressRequest.requestContext.regionalEndpointsToRoute = new LocationCache.RegionalEndpoints(regionEndpoint);
faultInjectionAddressRequest.setPartitionKeyRangeIdentity(new PartitionKeyRangeIdentity(pkRangeId));

if (isWriteOnly) {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
import com.azure.cosmos.implementation.feedranges.FeedRangePartitionKeyImpl;
import com.azure.cosmos.implementation.guava25.base.Function;
import com.azure.cosmos.implementation.routing.LocationCache;
import com.azure.cosmos.models.CosmosBatch;
import com.azure.cosmos.models.CosmosBatchResponse;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
Expand Down Expand Up @@ -60,7 +61,6 @@
import reactor.core.publisher.Mono;

import java.lang.reflect.Field;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -5144,8 +5144,8 @@ private static double getEstimatedFailureCountSeenPerRegionPerPartitionKeyRange(
return 0d;
}

ConcurrentHashMap<URI, LocationSpecificHealthContext> locationEndpointToLocationSpecificContextForPartition
= (ConcurrentHashMap<URI, LocationSpecificHealthContext>) locationEndpointToLocationSpecificContextForPartitionField.get(partitionAndLocationSpecificUnavailabilityInfo);
ConcurrentHashMap<LocationCache.RegionalEndpoints, LocationSpecificHealthContext> locationEndpointToLocationSpecificContextForPartition
= (ConcurrentHashMap<LocationCache.RegionalEndpoints, LocationSpecificHealthContext>) locationEndpointToLocationSpecificContextForPartitionField.get(partitionAndLocationSpecificUnavailabilityInfo);

int count = 0;
boolean failuresExist = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient;
import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdEndpoint;
import com.azure.cosmos.implementation.routing.CollectionRoutingMap;
import com.azure.cosmos.implementation.routing.LocationCache;
import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper;
import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
import com.azure.cosmos.models.CosmosContainerIdentity;
Expand Down Expand Up @@ -180,11 +181,13 @@ public void openConnectionsAndInitCachesWithContainer(ProactiveConnectionManagem

cosmosAsyncContainer.openConnectionsAndInitCaches(proactiveConnectionRegionCount).block();

UnmodifiableList<URI> readEndpoints =
UnmodifiableList<LocationCache.RegionalEndpoints> readEndpoints =
globalEndpointManager.getReadEndpoints();

List<URI> proactiveConnectionEndpoints = readEndpoints.subList(
0,
Math.min(readEndpoints.size(),proactiveContainerInitConfig.getProactiveConnectionRegionsCount()));
Math.min(readEndpoints.size(),proactiveContainerInitConfig.getProactiveConnectionRegionsCount()))
.stream().map(LocationCache.RegionalEndpoints::getGatewayLocationEndpoint).collect(Collectors.toList());

Mono<CosmosAsyncContainer> asyncContainerMono = Mono.just(cosmosAsyncContainer);

Expand Down Expand Up @@ -342,10 +345,14 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect
ConcurrentHashMap<String, ?> routingMap = getRoutingMap(rxDocumentClient);
ConcurrentHashMap<String, ?> collectionInfoByNameMap = getCollectionInfoByNameMap(rxDocumentClient);
Set<String> endpoints = ConcurrentHashMap.newKeySet();
UnmodifiableList<URI> readEndpoints = globalEndpointManager.getReadEndpoints();
UnmodifiableList<LocationCache.RegionalEndpoints> readEndpoints = globalEndpointManager.getReadEndpoints();

List<URI> proactiveConnectionEndpoints = readEndpoints.subList(
0,
Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount()));
Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount()))
.stream()
.map(LocationCache.RegionalEndpoints::getGatewayLocationEndpoint)
.collect(Collectors.toList());

Flux<CosmosAsyncContainer> asyncContainerFlux = Flux.fromIterable(asyncContainers);
Flux<Utils.ValueHolder<List<PartitionKeyRange>>> partitionKeyRangeFlux =
Expand Down Expand Up @@ -488,10 +495,13 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect
ConcurrentHashMap<String, ?> routingMap = getRoutingMap(rxDocumentClient);
ConcurrentHashMap<String, ?> collectionInfoByNameMap = getCollectionInfoByNameMap(rxDocumentClient);
Set<String> endpoints = ConcurrentHashMap.newKeySet();
UnmodifiableList<URI> readEndpoints = globalEndpointManager.getReadEndpoints();
UnmodifiableList<LocationCache.RegionalEndpoints> readEndpoints = globalEndpointManager.getReadEndpoints();
List<URI> proactiveConnectionEndpoints = readEndpoints.subList(
0,
Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount()));
Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount()))
.stream()
.map(LocationCache.RegionalEndpoints::getGatewayLocationEndpoint)
.collect(Collectors.toList());;

Flux<CosmosAsyncContainer> asyncContainerFlux = Flux.fromIterable(asyncContainers);
Flux<Utils.ValueHolder<List<PartitionKeyRange>>> partitionKeyRangeFlux =
Expand Down Expand Up @@ -656,10 +666,13 @@ public void openConnectionsAndInitCachesWithCosmosClient_And_PerContainerConnect
ConcurrentHashMap<String, ?> routingMap = getRoutingMap(rxDocumentClient);
ConcurrentHashMap<String, ?> collectionInfoByNameMap = getCollectionInfoByNameMap(rxDocumentClient);
Set<String> endpoints = ConcurrentHashMap.newKeySet();
UnmodifiableList<URI> readEndpoints = globalEndpointManager.getReadEndpoints();
UnmodifiableList<LocationCache.RegionalEndpoints> readEndpoints = globalEndpointManager.getReadEndpoints();
List<URI> proactiveConnectionEndpoints = readEndpoints.subList(
0,
Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount()));
Math.min(readEndpoints.size(), proactiveContainerInitConfig.getProactiveConnectionRegionsCount()))
.stream()
.map(LocationCache.RegionalEndpoints::getGatewayLocationEndpoint)
.collect(Collectors.toList());;

Flux<CosmosAsyncContainer> asyncContainerFlux = Flux.fromIterable(asyncContainers);
Flux<Utils.ValueHolder<List<PartitionKeyRange>>> partitionKeyRangeFlux =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.azure.cosmos.ThrottlingRetryOptions;
import com.azure.cosmos.implementation.circuitBreaker.GlobalPartitionEndpointManagerForCircuitBreaker;
import com.azure.cosmos.implementation.directconnectivity.ChannelAcquisitionException;
import com.azure.cosmos.implementation.routing.LocationCache;
import io.netty.handler.timeout.ReadTimeoutException;
import io.reactivex.subscribers.TestSubscriber;
import org.mockito.Mockito;
Expand Down Expand Up @@ -65,7 +66,7 @@ public void networkFailureOnRead() throws Exception {
ThrottlingRetryOptions throttlingRetryOptions = new ThrottlingRetryOptions();
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManager = Mockito.mock(GlobalPartitionEndpointManagerForCircuitBreaker.class);
Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(new LocationCache.RegionalEndpoints(new URI("http://localhost"))).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(mockDiagnosticsClientContext(), endpointManager, true, throttlingRetryOptions, null, globalPartitionEndpointManager);

Expand Down Expand Up @@ -106,7 +107,7 @@ public void shouldRetryOnGatewayTimeout(
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManager = Mockito.mock(GlobalPartitionEndpointManagerForCircuitBreaker.class);

Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(new LocationCache.RegionalEndpoints(new URI("http://localhost"))).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(true));
ClientRetryPolicy clientRetryPolicy =
new ClientRetryPolicy(
Expand Down Expand Up @@ -149,7 +150,7 @@ public void tcpNetworkFailureOnRead() throws Exception {
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManager = Mockito.mock(GlobalPartitionEndpointManagerForCircuitBreaker.class);

Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(new LocationCache.RegionalEndpoints(new URI("http://localhost"))).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
Mockito.doReturn(2).when(endpointManager).getPreferredLocationCount();
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(mockDiagnosticsClientContext(), endpointManager, true, retryOptions, null, globalPartitionEndpointManager);
Expand Down Expand Up @@ -197,7 +198,7 @@ public void networkFailureOnWrite() throws Exception {
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManager = Mockito.mock(GlobalPartitionEndpointManagerForCircuitBreaker.class);

Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(new LocationCache.RegionalEndpoints(new URI("http://localhost"))).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(mockDiagnosticsClientContext(), endpointManager, true, throttlingRetryOptions, null, globalPartitionEndpointManager);

Expand Down Expand Up @@ -232,7 +233,7 @@ public void tcpNetworkFailureOnWrite(
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManager = Mockito.mock(GlobalPartitionEndpointManagerForCircuitBreaker.class);

Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(new LocationCache.RegionalEndpoints(new URI("http://localhost"))).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
Mockito.doReturn(2).when(endpointManager).getPreferredLocationCount();
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(mockDiagnosticsClientContext(), endpointManager, true, retryOptions, null, globalPartitionEndpointManager);
Expand Down Expand Up @@ -292,7 +293,7 @@ public void networkFailureOnUpsert() throws Exception {
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManager = Mockito.mock(GlobalPartitionEndpointManagerForCircuitBreaker.class);

Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(new LocationCache.RegionalEndpoints(new URI("http://localhost"))).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(mockDiagnosticsClientContext(), endpointManager, true, throttlingRetryOptions, null, globalPartitionEndpointManager);

Expand Down Expand Up @@ -325,7 +326,7 @@ public void tcpNetworkFailureOnUpsert() throws Exception {
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManager = Mockito.mock(GlobalPartitionEndpointManagerForCircuitBreaker.class);

Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(new LocationCache.RegionalEndpoints(new URI("http://localhost"))).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
Mockito.doReturn(2).when(endpointManager).getPreferredLocationCount();
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(mockDiagnosticsClientContext(), endpointManager, true, retryOptions, null, globalPartitionEndpointManager);
Expand Down Expand Up @@ -361,7 +362,7 @@ public void networkFailureOnDelete() throws Exception {
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManager = Mockito.mock(GlobalPartitionEndpointManagerForCircuitBreaker.class);

Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(new LocationCache.RegionalEndpoints(new URI("http://localhost"))).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(mockDiagnosticsClientContext(), endpointManager, true, throttlingRetryOptions, null, globalPartitionEndpointManager);

Expand Down Expand Up @@ -395,7 +396,7 @@ public void tcpNetworkFailureOnDelete() throws Exception {
GlobalEndpointManager endpointManager = Mockito.mock(GlobalEndpointManager.class);
GlobalPartitionEndpointManagerForCircuitBreaker globalPartitionEndpointManager = Mockito.mock(GlobalPartitionEndpointManagerForCircuitBreaker.class);

Mockito.doReturn(new URI("http://localhost")).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(new LocationCache.RegionalEndpoints(new URI("http://localhost"))).when(endpointManager).resolveServiceEndpoint(Mockito.any(RxDocumentServiceRequest.class));
Mockito.doReturn(Mono.empty()).when(endpointManager).refreshLocationAsync(Mockito.eq(null), Mockito.eq(false));
Mockito.doReturn(2).when(endpointManager).getPreferredLocationCount();
ClientRetryPolicy clientRetryPolicy = new ClientRetryPolicy(mockDiagnosticsClientContext(), endpointManager, true, retryOptions, null, globalPartitionEndpointManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
import com.azure.cosmos.implementation.clienttelemetry.MetricCategory;
import com.azure.cosmos.implementation.clienttelemetry.TagName;
import com.azure.cosmos.implementation.directconnectivity.Protocol;
import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils;
import io.netty.handler.ssl.SslContext;
import org.testng.annotations.Test;

import java.util.EnumSet;
Expand Down
Loading