-
Notifications
You must be signed in to change notification settings - Fork 3.9k
xds: add "resource_timer_is_transient_failure" server feature #12063
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d142abb
72aba57
13bc715
9bf979f
99785ae
6bd7a23
aa18a55
90c8a28
3d72bcb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ | |
|
||
import static com.google.common.base.Preconditions.checkArgument; | ||
import static com.google.common.base.Preconditions.checkNotNull; | ||
import static io.grpc.xds.client.BootstrapperImpl.xdsDataErrorHandlingEnabled; | ||
import static io.grpc.xds.client.XdsResourceType.ParsedResource; | ||
import static io.grpc.xds.client.XdsResourceType.ValidatedResourceUpdate; | ||
|
||
|
@@ -67,6 +68,7 @@ public final class XdsClientImpl extends XdsClient implements ResourceStore { | |
// Longest time to wait, since the subscription to some resource, for concluding its absence. | ||
@VisibleForTesting | ||
public static final int INITIAL_RESOURCE_FETCH_TIMEOUT_SEC = 15; | ||
public static final int EXTENDED_RESOURCE_FETCH_TIMEOUT_SEC = 30; | ||
|
||
private final SynchronizationContext syncContext = new SynchronizationContext( | ||
new Thread.UncaughtExceptionHandler() { | ||
|
@@ -738,6 +740,9 @@ void restartTimer() { | |
// When client becomes ready, it triggers a restartTimer for all relevant subscribers. | ||
return; | ||
} | ||
ServerInfo serverInfo = activeCpc.getServerInfo(); | ||
int timeoutSec = xdsDataErrorHandlingEnabled && serverInfo.resourceTimerIsTransientError() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Checking |
||
? EXTENDED_RESOURCE_FETCH_TIMEOUT_SEC : INITIAL_RESOURCE_FETCH_TIMEOUT_SEC; | ||
|
||
class ResourceNotFound implements Runnable { | ||
@Override | ||
|
@@ -761,8 +766,7 @@ public String toString() { | |
respTimer.cancel(); | ||
} | ||
respTimer = syncContext.schedule( | ||
new ResourceNotFound(), INITIAL_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS, | ||
timeService); | ||
new ResourceNotFound(), timeoutSec, TimeUnit.SECONDS, timeService); | ||
} | ||
|
||
void stopTimer() { | ||
|
@@ -840,6 +844,8 @@ void onAbsent(@Nullable ProcessingTracker processingTracker, ServerInfo serverIn | |
// Ignore deletion of State of the World resources when this feature is on, | ||
// and the resource is reusable. | ||
boolean ignoreResourceDeletionEnabled = serverInfo.ignoreResourceDeletion(); | ||
boolean resourceTimerIsTransientError = | ||
xdsDataErrorHandlingEnabled && serverInfo.resourceTimerIsTransientError(); | ||
if (ignoreResourceDeletionEnabled && type.isFullStateOfTheWorld() && data != null) { | ||
if (!resourceDeletionIgnored) { | ||
logger.log(XdsLogLevel.FORCE_WARNING, | ||
|
@@ -854,14 +860,20 @@ void onAbsent(@Nullable ProcessingTracker processingTracker, ServerInfo serverIn | |
if (!absent) { | ||
data = null; | ||
absent = true; | ||
metadata = ResourceMetadata.newResourceMetadataDoesNotExist(); | ||
metadata = resourceTimerIsTransientError ? ResourceMetadata.newResourceMetadataTimeout() : | ||
ResourceMetadata.newResourceMetadataDoesNotExist(); | ||
for (ResourceWatcher<T> watcher : watchers.keySet()) { | ||
if (processingTracker != null) { | ||
processingTracker.startTask(); | ||
} | ||
watchers.get(watcher).execute(() -> { | ||
try { | ||
watcher.onResourceDoesNotExist(resource); | ||
if (resourceTimerIsTransientError) { | ||
watcher.onError(Status.UNAVAILABLE.withDescription( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unit test for watcher.onError and the timeoutSec value when resource timer is transient error. |
||
"Timed out waiting for resource " + resource + " from xDS server")); | ||
} else { | ||
watcher.onResourceDoesNotExist(resource); | ||
} | ||
} finally { | ||
if (processingTracker != null) { | ||
processingTracker.onComplete(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -85,6 +85,7 @@ | |
import io.grpc.xds.client.Bootstrapper.BootstrapInfo; | ||
import io.grpc.xds.client.Bootstrapper.CertificateProviderInfo; | ||
import io.grpc.xds.client.Bootstrapper.ServerInfo; | ||
import io.grpc.xds.client.BootstrapperImpl; | ||
import io.grpc.xds.client.EnvoyProtoData.Node; | ||
import io.grpc.xds.client.LoadStatsManager2.ClusterDropStats; | ||
import io.grpc.xds.client.Locality; | ||
|
@@ -145,7 +146,7 @@ | |
public abstract class GrpcXdsClientImplTestBase { | ||
|
||
private static final String SERVER_URI = "trafficdirector.googleapis.com"; | ||
private static final String SERVER_URI_CUSTOME_AUTHORITY = "trafficdirector2.googleapis.com"; | ||
private static final String SERVER_URI_CUSTOM_AUTHORITY = "trafficdirector2.googleapis.com"; | ||
private static final String SERVER_URI_EMPTY_AUTHORITY = "trafficdirector3.googleapis.com"; | ||
private static final String LDS_RESOURCE = "listener.googleapis.com"; | ||
private static final String RDS_RESOURCE = "route-configuration.googleapis.com"; | ||
|
@@ -304,6 +305,30 @@ public long currentTimeNanos() { | |
private final BindableService adsService = createAdsService(); | ||
private final BindableService lrsService = createLrsService(); | ||
|
||
private XdsTransportFactory xdsTransportFactory = new XdsTransportFactory() { | ||
@Override | ||
public XdsTransport create(ServerInfo serverInfo) { | ||
if (serverInfo.target().equals(SERVER_URI)) { | ||
return new GrpcXdsTransport(channel); | ||
} | ||
if (serverInfo.target().equals(SERVER_URI_CUSTOM_AUTHORITY)) { | ||
if (channelForCustomAuthority == null) { | ||
channelForCustomAuthority = cleanupRule.register( | ||
InProcessChannelBuilder.forName(serverName).directExecutor().build()); | ||
} | ||
return new GrpcXdsTransport(channelForCustomAuthority); | ||
} | ||
if (serverInfo.target().equals(SERVER_URI_EMPTY_AUTHORITY)) { | ||
if (channelForEmptyAuthority == null) { | ||
channelForEmptyAuthority = cleanupRule.register( | ||
InProcessChannelBuilder.forName(serverName).directExecutor().build()); | ||
} | ||
return new GrpcXdsTransport(channelForEmptyAuthority); | ||
} | ||
throw new IllegalArgumentException("Can not create channel for " + serverInfo); | ||
} | ||
}; | ||
|
||
@Before | ||
public void setUp() throws IOException { | ||
when(backoffPolicyProvider.get()).thenReturn(backoffPolicy1, backoffPolicy2); | ||
|
@@ -322,32 +347,9 @@ public void setUp() throws IOException { | |
.start()); | ||
channel = | ||
cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()); | ||
XdsTransportFactory xdsTransportFactory = new XdsTransportFactory() { | ||
@Override | ||
public XdsTransport create(ServerInfo serverInfo) { | ||
if (serverInfo.target().equals(SERVER_URI)) { | ||
return new GrpcXdsTransport(channel); | ||
} | ||
if (serverInfo.target().equals(SERVER_URI_CUSTOME_AUTHORITY)) { | ||
if (channelForCustomAuthority == null) { | ||
channelForCustomAuthority = cleanupRule.register( | ||
InProcessChannelBuilder.forName(serverName).directExecutor().build()); | ||
} | ||
return new GrpcXdsTransport(channelForCustomAuthority); | ||
} | ||
if (serverInfo.target().equals(SERVER_URI_EMPTY_AUTHORITY)) { | ||
if (channelForEmptyAuthority == null) { | ||
channelForEmptyAuthority = cleanupRule.register( | ||
InProcessChannelBuilder.forName(serverName).directExecutor().build()); | ||
} | ||
return new GrpcXdsTransport(channelForEmptyAuthority); | ||
} | ||
throw new IllegalArgumentException("Can not create channel for " + serverInfo); | ||
} | ||
}; | ||
|
||
xdsServerInfo = ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS, ignoreResourceDeletion(), | ||
true); | ||
true, false); | ||
BootstrapInfo bootstrapInfo = | ||
Bootstrapper.BootstrapInfo.builder() | ||
.servers(Collections.singletonList(xdsServerInfo)) | ||
|
@@ -357,7 +359,7 @@ public XdsTransport create(ServerInfo serverInfo) { | |
AuthorityInfo.create( | ||
"xdstp://authority.xds.com/envoy.config.listener.v3.Listener/%s", | ||
ImmutableList.of(Bootstrapper.ServerInfo.create( | ||
SERVER_URI_CUSTOME_AUTHORITY, CHANNEL_CREDENTIALS))), | ||
SERVER_URI_CUSTOM_AUTHORITY, CHANNEL_CREDENTIALS))), | ||
"", | ||
AuthorityInfo.create( | ||
"xdstp:///envoy.config.listener.v3.Listener/%s", | ||
|
@@ -3155,6 +3157,108 @@ public void flowControlAbsent() throws Exception { | |
verify(anotherWatcher).onError(any()); | ||
} | ||
|
||
@Test | ||
@SuppressWarnings("unchecked") | ||
public void resourceTimerIsTransientError_schedulesExtendedTimeout() { | ||
BootstrapperImpl.xdsDataErrorHandlingEnabled = true; | ||
ServerInfo serverInfo = ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS, | ||
false, true, true); | ||
BootstrapInfo bootstrapInfo = | ||
Bootstrapper.BootstrapInfo.builder() | ||
.servers(Collections.singletonList(serverInfo)) | ||
.node(NODE) | ||
.authorities(ImmutableMap.of( | ||
"", | ||
AuthorityInfo.create( | ||
"xdstp:///envoy.config.listener.v3.Listener/%s", | ||
ImmutableList.of(Bootstrapper.ServerInfo.create( | ||
SERVER_URI_EMPTY_AUTHORITY, CHANNEL_CREDENTIALS))))) | ||
.certProviders(ImmutableMap.of()) | ||
.build(); | ||
xdsClient = new XdsClientImpl( | ||
xdsTransportFactory, | ||
bootstrapInfo, | ||
fakeClock.getScheduledExecutorService(), | ||
backoffPolicyProvider, | ||
fakeClock.getStopwatchSupplier(), | ||
timeProvider, | ||
MessagePrinter.INSTANCE, | ||
new TlsContextManagerImpl(bootstrapInfo), | ||
xdsClientMetricReporter); | ||
ResourceWatcher<CdsUpdate> watcher = mock(ResourceWatcher.class); | ||
String resourceName = "cluster.googleapis.com"; | ||
|
||
xdsClient.watchXdsResource( | ||
XdsClusterResource.getInstance(), | ||
resourceName, | ||
watcher, | ||
fakeClock.getScheduledExecutorService()); | ||
|
||
ScheduledTask task = Iterables.getOnlyElement( | ||
fakeClock.getPendingTasks(CDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)); | ||
assertThat(task.getDelay(TimeUnit.SECONDS)) | ||
.isEqualTo(XdsClientImpl.EXTENDED_RESOURCE_FETCH_TIMEOUT_SEC); | ||
fakeClock.runDueTasks(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could delete this statement. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was doing that exactly earlier but after every test method there is an
This fails while asserting for empty pending tasks. We need to run all the due tasks at the end. |
||
BootstrapperImpl.xdsDataErrorHandlingEnabled = false; | ||
} | ||
|
||
@Test | ||
@SuppressWarnings("unchecked") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of putting this on the entire method, can you put it on just the |
||
public void resourceTimerIsTransientError_callsOnErrorUnavailable() { | ||
BootstrapperImpl.xdsDataErrorHandlingEnabled = true; | ||
xdsServerInfo = ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS, ignoreResourceDeletion(), | ||
true, true); | ||
BootstrapInfo bootstrapInfo = | ||
Bootstrapper.BootstrapInfo.builder() | ||
.servers(Collections.singletonList(xdsServerInfo)) | ||
.node(NODE) | ||
.authorities(ImmutableMap.of( | ||
"authority.xds.com", | ||
AuthorityInfo.create( | ||
"xdstp://authority.xds.com/envoy.config.listener.v3.Listener/%s", | ||
ImmutableList.of(Bootstrapper.ServerInfo.create( | ||
SERVER_URI_CUSTOM_AUTHORITY, CHANNEL_CREDENTIALS))), | ||
"", | ||
AuthorityInfo.create( | ||
"xdstp:///envoy.config.listener.v3.Listener/%s", | ||
ImmutableList.of(Bootstrapper.ServerInfo.create( | ||
SERVER_URI_EMPTY_AUTHORITY, CHANNEL_CREDENTIALS))))) | ||
.certProviders(ImmutableMap.of("cert-instance-name", | ||
CertificateProviderInfo.create("file-watcher", ImmutableMap.of()))) | ||
.build(); | ||
xdsClient = new XdsClientImpl( | ||
xdsTransportFactory, | ||
bootstrapInfo, | ||
fakeClock.getScheduledExecutorService(), | ||
backoffPolicyProvider, | ||
fakeClock.getStopwatchSupplier(), | ||
timeProvider, | ||
MessagePrinter.INSTANCE, | ||
new TlsContextManagerImpl(bootstrapInfo), | ||
xdsClientMetricReporter); | ||
String timeoutResource = CDS_RESOURCE + "_timeout"; | ||
ResourceWatcher<CdsUpdate> timeoutWatcher = mock(ResourceWatcher.class); | ||
|
||
xdsClient.watchXdsResource( | ||
XdsClusterResource.getInstance(), | ||
timeoutResource, | ||
timeoutWatcher, | ||
fakeClock.getScheduledExecutorService()); | ||
|
||
assertThat(resourceDiscoveryCalls).hasSize(1); | ||
DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); | ||
call.verifyRequest(CDS, ImmutableList.of(timeoutResource), "", "", NODE); | ||
fakeClock.forwardTime(XdsClientImpl.EXTENDED_RESOURCE_FETCH_TIMEOUT_SEC, TimeUnit.SECONDS); | ||
fakeClock.runDueTasks(); | ||
ArgumentCaptor<Status> errorCaptor = ArgumentCaptor.forClass(Status.class); | ||
verify(timeoutWatcher).onError(errorCaptor.capture()); | ||
Status error = errorCaptor.getValue(); | ||
assertThat(error.getCode()).isEqualTo(Status.Code.UNAVAILABLE); | ||
assertThat(error.getDescription()).isEqualTo( | ||
"Timed out waiting for resource " + timeoutResource + " from xDS server"); | ||
BootstrapperImpl.xdsDataErrorHandlingEnabled = false; | ||
} | ||
|
||
private Answer<Void> blockUpdate(CyclicBarrier barrier) { | ||
return new Answer<Void>() { | ||
@Override | ||
|
@@ -4220,7 +4324,7 @@ private XdsClientImpl createXdsClient(String serverUri) { | |
private BootstrapInfo buildBootStrap(String serverUri) { | ||
|
||
ServerInfo xdsServerInfo = ServerInfo.create(serverUri, CHANNEL_CREDENTIALS, | ||
ignoreResourceDeletion(), true); | ||
ignoreResourceDeletion(), true, false); | ||
|
||
return Bootstrapper.BootstrapInfo.builder() | ||
.servers(Collections.singletonList(xdsServerInfo)) | ||
|
@@ -4230,7 +4334,7 @@ private BootstrapInfo buildBootStrap(String serverUri) { | |
AuthorityInfo.create( | ||
"xdstp://authority.xds.com/envoy.config.listener.v3.Listener/%s", | ||
ImmutableList.of(Bootstrapper.ServerInfo.create( | ||
SERVER_URI_CUSTOME_AUTHORITY, CHANNEL_CREDENTIALS))), | ||
SERVER_URI_CUSTOM_AUTHORITY, CHANNEL_CREDENTIALS))), | ||
"", | ||
AuthorityInfo.create( | ||
"xdstp:///envoy.config.listener.v3.Listener/%s", | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used by CSDS, and looks like it crashes if you don't update it:
grpc-java/xds/src/main/java/io/grpc/xds/CsdsService.java
Lines 240 to 255 in 5a8326f
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So in the case of TIMEOUT, CSDS service should return either
ClientResourceStatus.REQUESTED
orClientResourceStatus.DOES_NOT_EXIST
. What do you think?DOES_NOT_EXIST
fits better.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is clearly not something for us to arbitrarily decide during implementation. That's defined by the gRFC. They map 1:1 with the CSDS enum values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. I see now: envoyproxy/envoy@2aaa544. We need to do envoy proto sync to grpc java to have these.