Skip to content

Commit

Permalink
core: Exit idle mode when delayed transport is in use
Browse files Browse the repository at this point in the history
8844cf7 triggered a regression where a new RPC wouldn't cause the
channel to exit idle mode, if an RPC was still progressing on an old
transport. This was already possible previously, but was racy.
8844cf7 made it less racy and more obvious.

The two added `exitIdleMode()` calls in this commit are companions to
those in `enterIdleMode()`, which detect whether the channel should
immediately exit idle mode.

Noticed in cl/635819804.
  • Loading branch information
ejona86 committed May 23, 2024
1 parent 0b5f38d commit fea577c
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 7 deletions.
16 changes: 10 additions & 6 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -889,12 +889,6 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
return newClientCall(method, callOptions);
}
syncContext.execute(new Runnable() {
@Override
public void run() {
exitIdleMode();
}
});
if (configSelector.get() != INITIAL_PENDING_SELECTOR) {
// This is an optimization for the case (typically with InProcessTransport) when name
// resolution result is immediately available at this point. Otherwise, some users'
Expand Down Expand Up @@ -927,6 +921,10 @@ public void run() {
if (pendingCalls == null) {
pendingCalls = new LinkedHashSet<>();
inUseStateAggregator.updateObjectInUse(pendingCallsInUseObject, true);
// It's possible to be in idle mode while inUseStateAggregator is in-use, if one of
// the subchannels is in use. But we should never be in idle mode when pendingCalls is
// in use.
exitIdleMode();
}
pendingCalls.add(pendingCall);
} else {
Expand Down Expand Up @@ -2081,6 +2079,12 @@ public Attributes filterTransport(Attributes attributes) {
@Override
public void transportInUse(final boolean inUse) {
inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
if (inUse) {
// It's possible to be in idle mode while inUseStateAggregator is in-use, if one of the
// subchannels is in use. But we should never be in idle mode when delayed transport is in
// use.
exitIdleMode();
}
}

@Override
Expand Down
3 changes: 2 additions & 1 deletion core/src/testFixtures/java/io/grpc/internal/FakeClock.java
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ private void schedule(ScheduledTask task, long delay, TimeUnit unit) {
}

@Override public boolean isShutdown() {
throw new UnsupportedOperationException();
// If shutdown is not implemented, then it is never shutdown.
return false;
}

@Override public boolean isTerminated() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2024 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.testing.integration;

import static com.google.common.truth.Truth.assertThat;

import io.grpc.ManagedChannel;
import io.grpc.ServerInterceptors;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import io.grpc.internal.FakeClock;
import io.grpc.internal.testing.StreamRecorder;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.testing.integration.EmptyProtos.Empty;
import io.grpc.testing.integration.Messages.ResponseParameters;
import io.grpc.testing.integration.Messages.StreamingOutputCallRequest;
import io.grpc.testing.integration.Messages.StreamingOutputCallResponse;
import java.util.concurrent.TimeUnit;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

/** Tests for ManagedChannelImpl that use a real transport. */
@RunWith(JUnit4.class)
public final class ManagedChannelImplIntegrationTest {
private static final String SERVER_NAME = ManagedChannelImplIntegrationTest.class.getName();
@Rule
public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();

@Test
public void idleWhileRpcInTransport_exitsIdleForNewRpc() throws Exception {
FakeClock fakeClock = new FakeClock();
grpcCleanup.register(InProcessServerBuilder.forName(SERVER_NAME)
.directExecutor()
.addService(
ServerInterceptors.intercept(
new TestServiceImpl(fakeClock.getScheduledExecutorService()),
TestServiceImpl.interceptors()))
.build()
.start());
ManagedChannel channel = grpcCleanup.register(InProcessChannelBuilder.forName(SERVER_NAME)
.directExecutor()
.build());

TestServiceGrpc.TestServiceBlockingStub blockingStub = TestServiceGrpc.newBlockingStub(channel);
TestServiceGrpc.TestServiceStub asyncStub = TestServiceGrpc.newStub(channel);
StreamRecorder<StreamingOutputCallResponse> responseObserver = StreamRecorder.create();
StreamObserver<StreamingOutputCallRequest> requestObserver =
asyncStub.fullDuplexCall(responseObserver);
requestObserver.onNext(StreamingOutputCallRequest.newBuilder()
.addResponseParameters(ResponseParameters.newBuilder()
.setIntervalUs(Integer.MAX_VALUE))
.build());
try {
channel.enterIdle();
assertThat(blockingStub
.withDeadlineAfter(10, TimeUnit.SECONDS)
.emptyCall(Empty.getDefaultInstance()))
.isEqualTo(Empty.getDefaultInstance());
} finally {
requestObserver.onError(new RuntimeException("cleanup"));
}
}
}
3 changes: 3 additions & 0 deletions xds/src/test/java/io/grpc/xds/CsdsServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ public void fetchClientConfig_interruptedException() {
grpcServerRule.getServiceRegistry()
.addService(new CsdsService(new FakeXdsClientPoolFactory(throwingXdsClient)));

// Hack to prevent the interrupted exception from propagating through to the client stub.
grpcServerRule.getChannel().getState(true);

try {
ClientStatusResponse response = csdsStub.fetchClientStatus(REQUEST);
fail("Should've failed, got response: " + response);
Expand Down

0 comments on commit fea577c

Please sign in to comment.