Skip to content

Commit e75a044

Browse files
authored
s2a,netty: S2AHandshakerServiceChannel doesn't use custom event loop. (#11539)
* S2AHandshakerServiceChannel doesn't use custom event loop. * use executorPool. * log when channel not shutdown. * use a cached threadpool. * update non-executor version.
1 parent 782a44a commit e75a044

File tree

4 files changed

+56
-65
lines changed

4 files changed

+56
-65
lines changed

netty/src/main/java/io/grpc/netty/InternalProtocolNegotiators.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717
package io.grpc.netty;
1818

1919
import io.grpc.ChannelLogger;
20+
import io.grpc.internal.ObjectPool;
2021
import io.grpc.netty.ProtocolNegotiators.ClientTlsHandler;
2122
import io.grpc.netty.ProtocolNegotiators.GrpcNegotiationHandler;
2223
import io.grpc.netty.ProtocolNegotiators.WaitUntilActiveHandler;
2324
import io.netty.channel.ChannelHandler;
2425
import io.netty.handler.ssl.SslContext;
2526
import io.netty.util.AsciiString;
27+
import java.util.concurrent.Executor;
2628

2729
/**
2830
* Internal accessor for {@link ProtocolNegotiators}.
@@ -35,9 +37,12 @@ private InternalProtocolNegotiators() {}
3537
* Returns a {@link ProtocolNegotiator} that ensures the pipeline is set up so that TLS will
3638
* be negotiated, the {@code handler} is added and writes to the {@link io.netty.channel.Channel}
3739
* may happen immediately, even before the TLS Handshake is complete.
40+
* @param executorPool a dedicated {@link Executor} pool for time-consuming TLS tasks
3841
*/
39-
public static InternalProtocolNegotiator.ProtocolNegotiator tls(SslContext sslContext) {
40-
final io.grpc.netty.ProtocolNegotiator negotiator = ProtocolNegotiators.tls(sslContext);
42+
public static InternalProtocolNegotiator.ProtocolNegotiator tls(SslContext sslContext,
43+
ObjectPool<? extends Executor> executorPool) {
44+
final io.grpc.netty.ProtocolNegotiator negotiator = ProtocolNegotiators.tls(sslContext,
45+
executorPool);
4146
final class TlsNegotiator implements InternalProtocolNegotiator.ProtocolNegotiator {
4247

4348
@Override
@@ -58,6 +63,15 @@ public void close() {
5863

5964
return new TlsNegotiator();
6065
}
66+
67+
/**
68+
* Returns a {@link ProtocolNegotiator} that ensures the pipeline is set up so that TLS will
69+
* be negotiated, the {@code handler} is added and writes to the {@link io.netty.channel.Channel}
70+
* may happen immediately, even before the TLS Handshake is complete.
71+
*/
72+
public static InternalProtocolNegotiator.ProtocolNegotiator tls(SslContext sslContext) {
73+
return tls(sslContext, null);
74+
}
6175

6276
/**
6377
* Returns a {@link ProtocolNegotiator} that ensures the pipeline is set up so that TLS will be

s2a/src/main/java/io/grpc/s2a/channel/S2AHandshakerServiceChannel.java

Lines changed: 17 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,11 @@
2929
import io.grpc.MethodDescriptor;
3030
import io.grpc.internal.SharedResourceHolder.Resource;
3131
import io.grpc.netty.NettyChannelBuilder;
32-
import io.netty.channel.EventLoopGroup;
33-
import io.netty.channel.nio.NioEventLoopGroup;
34-
import io.netty.channel.socket.nio.NioSocketChannel;
35-
import io.netty.util.concurrent.DefaultThreadFactory;
3632
import java.time.Duration;
3733
import java.util.Optional;
3834
import java.util.concurrent.ConcurrentMap;
35+
import java.util.logging.Level;
36+
import java.util.logging.Logger;
3937
import javax.annotation.concurrent.ThreadSafe;
4038

4139
/**
@@ -61,7 +59,6 @@
6159
public final class S2AHandshakerServiceChannel {
6260
private static final ConcurrentMap<String, Resource<Channel>> SHARED_RESOURCE_CHANNELS =
6361
Maps.newConcurrentMap();
64-
private static final Duration DELEGATE_TERMINATION_TIMEOUT = Duration.ofSeconds(2);
6562
private static final Duration CHANNEL_SHUTDOWN_TIMEOUT = Duration.ofSeconds(10);
6663

6764
/**
@@ -95,41 +92,34 @@ public ChannelResource(String targetAddress, Optional<ChannelCredentials> channe
9592
}
9693

9794
/**
98-
* Creates a {@code EventLoopHoldingChannel} instance to the service running at {@code
99-
* targetAddress}. This channel uses a dedicated thread pool for its {@code EventLoopGroup}
100-
* instance to avoid blocking.
95+
* Creates a {@code HandshakerServiceChannel} instance to the service running at {@code
96+
* targetAddress}.
10197
*/
10298
@Override
10399
public Channel create() {
104-
EventLoopGroup eventLoopGroup =
105-
new NioEventLoopGroup(1, new DefaultThreadFactory("S2A channel pool", true));
106100
ManagedChannel channel = null;
107101
if (channelCredentials.isPresent()) {
108102
// Create a secure channel.
109103
channel =
110104
NettyChannelBuilder.forTarget(targetAddress, channelCredentials.get())
111-
.channelType(NioSocketChannel.class)
112105
.directExecutor()
113-
.eventLoopGroup(eventLoopGroup)
114106
.build();
115107
} else {
116108
// Create a plaintext channel.
117109
channel =
118110
NettyChannelBuilder.forTarget(targetAddress)
119-
.channelType(NioSocketChannel.class)
120111
.directExecutor()
121-
.eventLoopGroup(eventLoopGroup)
122112
.usePlaintext()
123113
.build();
124114
}
125-
return EventLoopHoldingChannel.create(channel, eventLoopGroup);
115+
return HandshakerServiceChannel.create(channel);
126116
}
127117

128-
/** Destroys a {@code EventLoopHoldingChannel} instance. */
118+
/** Destroys a {@code HandshakerServiceChannel} instance. */
129119
@Override
130120
public void close(Channel instanceChannel) {
131121
checkNotNull(instanceChannel);
132-
EventLoopHoldingChannel channel = (EventLoopHoldingChannel) instanceChannel;
122+
HandshakerServiceChannel channel = (HandshakerServiceChannel) instanceChannel;
133123
channel.close();
134124
}
135125

@@ -140,23 +130,21 @@ public String toString() {
140130
}
141131

142132
/**
143-
* Manages a channel using a {@link ManagedChannel} instance that belong to the {@code
144-
* EventLoopGroup} thread pool.
133+
* Manages a channel using a {@link ManagedChannel} instance.
145134
*/
146135
@VisibleForTesting
147-
static class EventLoopHoldingChannel extends Channel {
136+
static class HandshakerServiceChannel extends Channel {
137+
private static final Logger logger =
138+
Logger.getLogger(S2AHandshakerServiceChannel.class.getName());
148139
private final ManagedChannel delegate;
149-
private final EventLoopGroup eventLoopGroup;
150140

151-
static EventLoopHoldingChannel create(ManagedChannel delegate, EventLoopGroup eventLoopGroup) {
141+
static HandshakerServiceChannel create(ManagedChannel delegate) {
152142
checkNotNull(delegate);
153-
checkNotNull(eventLoopGroup);
154-
return new EventLoopHoldingChannel(delegate, eventLoopGroup);
143+
return new HandshakerServiceChannel(delegate);
155144
}
156145

157-
private EventLoopHoldingChannel(ManagedChannel delegate, EventLoopGroup eventLoopGroup) {
146+
private HandshakerServiceChannel(ManagedChannel delegate) {
158147
this.delegate = delegate;
159-
this.eventLoopGroup = eventLoopGroup;
160148
}
161149

162150
/**
@@ -178,16 +166,12 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
178166
@SuppressWarnings("FutureReturnValueIgnored")
179167
public void close() {
180168
delegate.shutdownNow();
181-
boolean isDelegateTerminated;
182169
try {
183-
isDelegateTerminated =
184-
delegate.awaitTermination(DELEGATE_TERMINATION_TIMEOUT.getSeconds(), SECONDS);
170+
delegate.awaitTermination(CHANNEL_SHUTDOWN_TIMEOUT.getSeconds(), SECONDS);
185171
} catch (InterruptedException e) {
186-
isDelegateTerminated = false;
172+
Thread.currentThread().interrupt();
173+
logger.log(Level.WARNING, "Channel to S2A was not shutdown.");
187174
}
188-
long quietPeriodSeconds = isDelegateTerminated ? 0 : 1;
189-
eventLoopGroup.shutdownGracefully(
190-
quietPeriodSeconds, CHANNEL_SHUTDOWN_TIMEOUT.getSeconds(), SECONDS);
191175
}
192176
}
193177

s2a/src/main/java/io/grpc/s2a/handshaker/S2AProtocolNegotiatorFactory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
import com.google.common.util.concurrent.MoreExecutors;
3030
import com.google.errorprone.annotations.ThreadSafe;
3131
import io.grpc.Channel;
32+
import io.grpc.internal.GrpcUtil;
3233
import io.grpc.internal.ObjectPool;
34+
import io.grpc.internal.SharedResourcePool;
3335
import io.grpc.netty.GrpcHttp2ConnectionHandler;
3436
import io.grpc.netty.InternalProtocolNegotiator;
3537
import io.grpc.netty.InternalProtocolNegotiator.ProtocolNegotiator;
@@ -227,7 +229,10 @@ protected void handlerAdded0(ChannelHandlerContext ctx) {
227229
@Override
228230
public void onSuccess(SslContext sslContext) {
229231
ChannelHandler handler =
230-
InternalProtocolNegotiators.tls(sslContext).newHandler(grpcHandler);
232+
InternalProtocolNegotiators.tls(
233+
sslContext,
234+
SharedResourcePool.forResource(GrpcUtil.SHARED_CHANNEL_EXECUTOR))
235+
.newHandler(grpcHandler);
231236

232237
// Remove the bufferReads handler and delegate the rest of the handshake to the TLS
233238
// handler.

s2a/src/test/java/io/grpc/s2a/channel/S2AHandshakerServiceChannelTest.java

Lines changed: 17 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,7 @@
1818

1919
import static com.google.common.truth.Truth.assertThat;
2020
import static com.google.common.truth.extensions.proto.ProtoTruth.assertThat;
21-
import static java.util.concurrent.TimeUnit.SECONDS;
2221
import static org.junit.Assert.assertThrows;
23-
import static org.mockito.Mockito.mock;
24-
import static org.mockito.Mockito.times;
25-
import static org.mockito.Mockito.verify;
2622

2723
import io.grpc.CallOptions;
2824
import io.grpc.Channel;
@@ -39,15 +35,13 @@
3935
import io.grpc.benchmarks.Utils;
4036
import io.grpc.internal.SharedResourceHolder.Resource;
4137
import io.grpc.netty.NettyServerBuilder;
42-
import io.grpc.s2a.channel.S2AHandshakerServiceChannel.EventLoopHoldingChannel;
38+
import io.grpc.s2a.channel.S2AHandshakerServiceChannel.HandshakerServiceChannel;
4339
import io.grpc.stub.StreamObserver;
4440
import io.grpc.testing.GrpcCleanupRule;
4541
import io.grpc.testing.protobuf.SimpleRequest;
4642
import io.grpc.testing.protobuf.SimpleResponse;
4743
import io.grpc.testing.protobuf.SimpleServiceGrpc;
48-
import io.netty.channel.EventLoopGroup;
4944
import java.io.File;
50-
import java.time.Duration;
5145
import java.util.Optional;
5246
import java.util.concurrent.TimeUnit;
5347
import org.junit.Before;
@@ -60,8 +54,6 @@
6054
@RunWith(JUnit4.class)
6155
public final class S2AHandshakerServiceChannelTest {
6256
@ClassRule public static final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
63-
private static final Duration CHANNEL_SHUTDOWN_TIMEOUT = Duration.ofSeconds(10);
64-
private final EventLoopGroup mockEventLoopGroup = mock(EventLoopGroup.class);
6557
private Server mtlsServer;
6658
private Server plaintextServer;
6759

@@ -191,7 +183,7 @@ public void close_mtlsSuccess() throws Exception {
191183
}
192184

193185
/**
194-
* Verifies that an {@code EventLoopHoldingChannel}'s {@code newCall} method can be used to
186+
* Verifies that an {@code HandshakerServiceChannel}'s {@code newCall} method can be used to
195187
* perform a simple RPC.
196188
*/
197189
@Test
@@ -201,7 +193,7 @@ public void newCall_performSimpleRpcSuccess() {
201193
"localhost:" + plaintextServer.getPort(),
202194
/* s2aChannelCredentials= */ Optional.empty());
203195
Channel channel = resource.create();
204-
assertThat(channel).isInstanceOf(EventLoopHoldingChannel.class);
196+
assertThat(channel).isInstanceOf(HandshakerServiceChannel.class);
205197
assertThat(
206198
SimpleServiceGrpc.newBlockingStub(channel).unaryRpc(SimpleRequest.getDefaultInstance()))
207199
.isEqualToDefaultInstance();
@@ -214,53 +206,49 @@ public void newCall_mtlsPerformSimpleRpcSuccess() throws Exception {
214206
S2AHandshakerServiceChannel.getChannelResource(
215207
"localhost:" + mtlsServer.getPort(), getTlsChannelCredentials());
216208
Channel channel = resource.create();
217-
assertThat(channel).isInstanceOf(EventLoopHoldingChannel.class);
209+
assertThat(channel).isInstanceOf(HandshakerServiceChannel.class);
218210
assertThat(
219211
SimpleServiceGrpc.newBlockingStub(channel).unaryRpc(SimpleRequest.getDefaultInstance()))
220212
.isEqualToDefaultInstance();
221213
}
222214

223-
/** Creates a {@code EventLoopHoldingChannel} instance and verifies its authority. */
215+
/** Creates a {@code HandshakerServiceChannel} instance and verifies its authority. */
224216
@Test
225217
public void authority_success() throws Exception {
226218
ManagedChannel channel = new FakeManagedChannel(true);
227-
EventLoopHoldingChannel eventLoopHoldingChannel =
228-
EventLoopHoldingChannel.create(channel, mockEventLoopGroup);
219+
HandshakerServiceChannel eventLoopHoldingChannel =
220+
HandshakerServiceChannel.create(channel);
229221
assertThat(eventLoopHoldingChannel.authority()).isEqualTo("FakeManagedChannel");
230222
}
231223

232224
/**
233-
* Creates and closes a {@code EventLoopHoldingChannel} when its {@code ManagedChannel} terminates
234-
* successfully.
225+
* Creates and closes a {@code HandshakerServiceChannel} when its {@code ManagedChannel}
226+
* terminates successfully.
235227
*/
236228
@Test
237229
public void close_withDelegateTerminatedSuccess() throws Exception {
238230
ManagedChannel channel = new FakeManagedChannel(true);
239-
EventLoopHoldingChannel eventLoopHoldingChannel =
240-
EventLoopHoldingChannel.create(channel, mockEventLoopGroup);
231+
HandshakerServiceChannel eventLoopHoldingChannel =
232+
HandshakerServiceChannel.create(channel);
241233
eventLoopHoldingChannel.close();
242234
assertThat(channel.isShutdown()).isTrue();
243-
verify(mockEventLoopGroup, times(1))
244-
.shutdownGracefully(0, CHANNEL_SHUTDOWN_TIMEOUT.getSeconds(), SECONDS);
245235
}
246236

247237
/**
248-
* Creates and closes a {@code EventLoopHoldingChannel} when its {@code ManagedChannel} does not
238+
* Creates and closes a {@code HandshakerServiceChannel} when its {@code ManagedChannel} does not
249239
* terminate successfully.
250240
*/
251241
@Test
252242
public void close_withDelegateTerminatedFailure() throws Exception {
253243
ManagedChannel channel = new FakeManagedChannel(false);
254-
EventLoopHoldingChannel eventLoopHoldingChannel =
255-
EventLoopHoldingChannel.create(channel, mockEventLoopGroup);
244+
HandshakerServiceChannel eventLoopHoldingChannel =
245+
HandshakerServiceChannel.create(channel);
256246
eventLoopHoldingChannel.close();
257247
assertThat(channel.isShutdown()).isTrue();
258-
verify(mockEventLoopGroup, times(1))
259-
.shutdownGracefully(1, CHANNEL_SHUTDOWN_TIMEOUT.getSeconds(), SECONDS);
260248
}
261249

262250
/**
263-
* Creates and closes a {@code EventLoopHoldingChannel}, creates a new channel from the same
251+
* Creates and closes a {@code HandshakerServiceChannel}, creates a new channel from the same
264252
* resource, and verifies that this second channel is useable.
265253
*/
266254
@Test
@@ -273,7 +261,7 @@ public void create_succeedsAfterCloseIsCalledOnce() throws Exception {
273261
resource.close(channelOne);
274262

275263
Channel channelTwo = resource.create();
276-
assertThat(channelTwo).isInstanceOf(EventLoopHoldingChannel.class);
264+
assertThat(channelTwo).isInstanceOf(HandshakerServiceChannel.class);
277265
assertThat(
278266
SimpleServiceGrpc.newBlockingStub(channelTwo)
279267
.unaryRpc(SimpleRequest.getDefaultInstance()))
@@ -291,7 +279,7 @@ public void create_mtlsSucceedsAfterCloseIsCalledOnce() throws Exception {
291279
resource.close(channelOne);
292280

293281
Channel channelTwo = resource.create();
294-
assertThat(channelTwo).isInstanceOf(EventLoopHoldingChannel.class);
282+
assertThat(channelTwo).isInstanceOf(HandshakerServiceChannel.class);
295283
assertThat(
296284
SimpleServiceGrpc.newBlockingStub(channelTwo)
297285
.unaryRpc(SimpleRequest.getDefaultInstance()))

0 commit comments

Comments
 (0)