Skip to content

s2a: Address comments on S2A channel + stub #11584

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 0 additions & 43 deletions s2a/src/main/java/io/grpc/s2a/internal/channel/S2AChannelPool.java

This file was deleted.

109 changes: 0 additions & 109 deletions s2a/src/main/java/io/grpc/s2a/internal/channel/S2AGrpcChannelPool.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,9 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.TimeUnit.SECONDS;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ChannelCredentials;
import io.grpc.ClientCall;
import io.grpc.ManagedChannel;
import io.grpc.MethodDescriptor;
import io.grpc.internal.SharedResourceHolder.Resource;
import io.grpc.netty.NettyChannelBuilder;
import java.time.Duration;
Expand Down Expand Up @@ -55,6 +51,8 @@
@ThreadSafe
public final class S2AHandshakerServiceChannel {
private static final Duration CHANNEL_SHUTDOWN_TIMEOUT = Duration.ofSeconds(10);
private static final Logger logger =
Logger.getLogger(S2AHandshakerServiceChannel.class.getName());

/**
* Returns a {@link SharedResourceHolder.Resource} instance for managing channels to an S2A server
Expand Down Expand Up @@ -86,75 +84,35 @@ public ChannelResource(String targetAddress, ChannelCredentials channelCredentia
}

/**
* Creates a {@code HandshakerServiceChannel} instance to the service running at {@code
* Creates a {@code ManagedChannel} instance to the service running at {@code
* targetAddress}.
*/
@Override
public Channel create() {
ManagedChannel channel =
NettyChannelBuilder.forTarget(targetAddress, channelCredentials)
return NettyChannelBuilder.forTarget(targetAddress, channelCredentials)
.directExecutor()
.idleTimeout(5, SECONDS)
.build();
return HandshakerServiceChannel.create(channel);
}

/** Destroys a {@code HandshakerServiceChannel} instance. */
/** Destroys a {@code ManagedChannel} instance. */
@Override
public void close(Channel instanceChannel) {
checkNotNull(instanceChannel);
HandshakerServiceChannel channel = (HandshakerServiceChannel) instanceChannel;
channel.close();
}

@Override
public String toString() {
return "grpc-s2a-channel";
}
}

/**
* Manages a channel using a {@link ManagedChannel} instance.
*/
@VisibleForTesting
static class HandshakerServiceChannel extends Channel {
private static final Logger logger =
Logger.getLogger(S2AHandshakerServiceChannel.class.getName());
private final ManagedChannel delegate;

static HandshakerServiceChannel create(ManagedChannel delegate) {
checkNotNull(delegate);
return new HandshakerServiceChannel(delegate);
}

private HandshakerServiceChannel(ManagedChannel delegate) {
this.delegate = delegate;
}

/**
* Returns the address of the service to which the {@code delegate} channel connects, which is
* typically of the form {@code host:port}.
*/
@Override
public String authority() {
return delegate.authority();
}

/** Creates a {@link ClientCall} that invokes the operations in {@link MethodDescriptor}. */
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(
MethodDescriptor<ReqT, RespT> methodDescriptor, CallOptions options) {
return delegate.newCall(methodDescriptor, options);
}

@SuppressWarnings("FutureReturnValueIgnored")
public void close() {
delegate.shutdownNow();
ManagedChannel channel = (ManagedChannel) instanceChannel;
channel.shutdownNow();
try {
delegate.awaitTermination(CHANNEL_SHUTDOWN_TIMEOUT.getSeconds(), SECONDS);
channel.awaitTermination(CHANNEL_SHUTDOWN_TIMEOUT.getSeconds(), SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.log(Level.WARNING, "Channel to S2A was not shutdown.");
}

}

@Override
public String toString() {
return "grpc-s2a-channel";
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
import io.grpc.netty.InternalProtocolNegotiator.ProtocolNegotiator;
import io.grpc.netty.InternalProtocolNegotiators;
import io.grpc.netty.InternalProtocolNegotiators.ProtocolNegotiationHandler;
import io.grpc.s2a.internal.channel.S2AChannelPool;
import io.grpc.s2a.internal.channel.S2AGrpcChannelPool;
import io.grpc.s2a.internal.handshaker.S2AIdentity;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
Expand Down Expand Up @@ -70,17 +68,16 @@ public final class S2AProtocolNegotiatorFactory {
public static InternalProtocolNegotiator.ClientFactory createClientFactory(
@Nullable S2AIdentity localIdentity, ObjectPool<Channel> s2aChannelPool) {
checkNotNull(s2aChannelPool, "S2A channel pool should not be null.");
S2AChannelPool channelPool = S2AGrpcChannelPool.create(s2aChannelPool);
return new S2AClientProtocolNegotiatorFactory(localIdentity, channelPool);
return new S2AClientProtocolNegotiatorFactory(localIdentity, s2aChannelPool);
}

static final class S2AClientProtocolNegotiatorFactory
implements InternalProtocolNegotiator.ClientFactory {
private final @Nullable S2AIdentity localIdentity;
private final S2AChannelPool channelPool;
private final ObjectPool<Channel> channelPool;

S2AClientProtocolNegotiatorFactory(
@Nullable S2AIdentity localIdentity, S2AChannelPool channelPool) {
@Nullable S2AIdentity localIdentity, ObjectPool<Channel> channelPool) {
this.localIdentity = localIdentity;
this.channelPool = channelPool;
}
Expand All @@ -100,13 +97,14 @@ public int getDefaultPort() {
@VisibleForTesting
static final class S2AProtocolNegotiator implements ProtocolNegotiator {

private final S2AChannelPool channelPool;
private final ObjectPool<Channel> channelPool;
private final Channel channel;
private final Optional<S2AIdentity> localIdentity;
private final ListeningExecutorService service =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));

static S2AProtocolNegotiator createForClient(
S2AChannelPool channelPool, @Nullable S2AIdentity localIdentity) {
ObjectPool<Channel> channelPool, @Nullable S2AIdentity localIdentity) {
checkNotNull(channelPool, "Channel pool should not be null.");
if (localIdentity == null) {
return new S2AProtocolNegotiator(channelPool, Optional.empty());
Expand All @@ -123,9 +121,11 @@ static S2AProtocolNegotiator createForClient(
return HostAndPort.fromString(authority).getHost();
}

private S2AProtocolNegotiator(S2AChannelPool channelPool, Optional<S2AIdentity> localIdentity) {
private S2AProtocolNegotiator(ObjectPool<Channel> channelPool,
Optional<S2AIdentity> localIdentity) {
this.channelPool = channelPool;
this.localIdentity = localIdentity;
this.channel = channelPool.getObject();
}

@Override
Expand All @@ -139,13 +139,13 @@ public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
String hostname = getHostNameFromAuthority(grpcHandler.getAuthority());
checkArgument(!isNullOrEmpty(hostname), "hostname should not be null or empty.");
return new S2AProtocolNegotiationHandler(
grpcHandler, channelPool, localIdentity, hostname, service);
grpcHandler, channel, localIdentity, hostname, service);
}

@Override
public void close() {
service.shutdown();
channelPool.close();
channelPool.returnObject(channel);
}
}

Expand Down Expand Up @@ -180,15 +180,15 @@ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
}

private static final class S2AProtocolNegotiationHandler extends ProtocolNegotiationHandler {
private final S2AChannelPool channelPool;
private final Channel channel;
private final Optional<S2AIdentity> localIdentity;
private final String hostname;
private final GrpcHttp2ConnectionHandler grpcHandler;
private final ListeningExecutorService service;

private S2AProtocolNegotiationHandler(
GrpcHttp2ConnectionHandler grpcHandler,
S2AChannelPool channelPool,
Channel channel,
Optional<S2AIdentity> localIdentity,
String hostname,
ListeningExecutorService service) {
Expand All @@ -204,7 +204,7 @@ public void handlerAdded(ChannelHandlerContext ctx) {
},
grpcHandler.getNegotiationLogger());
this.grpcHandler = grpcHandler;
this.channelPool = channelPool;
this.channel = channel;
this.localIdentity = localIdentity;
this.hostname = hostname;
checkNotNull(service, "service should not be null.");
Expand All @@ -217,8 +217,7 @@ protected void handlerAdded0(ChannelHandlerContext ctx) {
BufferReadsHandler bufferReads = new BufferReadsHandler();
ctx.pipeline().addBefore(ctx.name(), /* name= */ null, bufferReads);

Channel ch = channelPool.getChannel();
S2AServiceGrpc.S2AServiceStub stub = S2AServiceGrpc.newStub(ch);
S2AServiceGrpc.S2AServiceStub stub = S2AServiceGrpc.newStub(channel);
S2AStub s2aStub = S2AStub.newInstance(stub);

ListenableFuture<SslContext> sslContextFuture =
Expand Down
Loading