diff --git a/docs/changelog/128025.yaml b/docs/changelog/128025.yaml new file mode 100644 index 0000000000000..61a9a80fabfb1 --- /dev/null +++ b/docs/changelog/128025.yaml @@ -0,0 +1,6 @@ +pr: 128025 +summary: "Set `connection: close` header on shutdown" +area: Network +type: enhancement +issues: + - 127984 diff --git a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4PipeliningIT.java b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4PipeliningIT.java index 417faaf64f7f3..b88907b10c45f 100644 --- a/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4PipeliningIT.java +++ b/modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4PipeliningIT.java @@ -9,11 +9,26 @@ package org.elasticsearch.http.netty4; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.http.DefaultFullHttpRequest; +import io.netty.handler.codec.http.HttpClientCodec; +import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.codec.http.HttpVersion; import io.netty.util.ReferenceCounted; import org.apache.lucene.util.BytesRef; import org.elasticsearch.ESNetty4IntegTestCase; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.CountDownActionListener; import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.client.internal.node.NodeClient; @@ -29,6 +44,8 @@ import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.Releasables; import org.elasticsearch.core.Strings; import org.elasticsearch.features.NodeFeature; import org.elasticsearch.http.HttpServerTransport; @@ -41,29 +58,42 @@ import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestResponse; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.EmptyResponseListener; import org.elasticsearch.rest.action.RestToXContentListener; import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.transport.netty4.NettyAllocator; import org.elasticsearch.xcontent.ToXContentObject; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import java.util.function.Supplier; import static org.elasticsearch.http.HttpTransportSettings.SETTING_PIPELINING_MAX_EVENTS; import static org.elasticsearch.rest.RestRequest.Method.GET; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.hamcrest.Matchers.oneOf; @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) public class Netty4PipeliningIT extends ESNetty4IntegTestCase { @Override protected Collection> nodePlugins() { - return CollectionUtils.concatLists(List.of(CountDown3Plugin.class, ChunkAndFailPlugin.class), super.nodePlugins()); + return CollectionUtils.concatLists( + List.of(CountDown3Plugin.class, ChunkAndFailPlugin.class, KeepPipeliningPlugin.class), + super.nodePlugins() + ); } private static final int MAX_PIPELINE_EVENTS = 10; @@ -142,6 +172,115 @@ private void runPipeliningTest(int expectedResponseCount, String... routes) thro } } + public void testSetCloseConnectionHeaderWhenShuttingDown() throws IOException { + + // This test works using KeepPipeliningPlugin to keep a HTTP connection from becoming idle with a sequence of requests while the + // node shuts down and ensures that these requests start to receive responses with `Connection: close` and that the node does not + // shut down until all requests have received a response. + + final var victimNode = internalCluster().startNode(); + + final var releasables = new ArrayList(3); + try { + final var keepPipeliningRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, KeepPipeliningPlugin.ROUTE); + releasables.add(keepPipeliningRequest::release); + + final var enoughResponsesToCloseLatch = new CountDownLatch(between(1, 5)); + final var outstandingRequestsCounter = new AtomicInteger(); + final var nodeShuttingDown = new AtomicBoolean(); + final var stoppedPipelining = new AtomicBoolean(); + + final EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1); + releasables.add(() -> eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).awaitUninterruptibly()); + final var clientBootstrap = new Bootstrap().channel(NettyAllocator.getChannelType()) + .option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator()) + .group(eventLoopGroup) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ch.pipeline().addLast(new HttpClientCodec()); + ch.pipeline().addLast(new SimpleChannelInboundHandler() { + + private int closeHeadersToIgnore = between(0, 5); + + private boolean ignoreCloseHeader() { + if (closeHeadersToIgnore == 0) { + return false; + } else { + closeHeadersToIgnore -= 1; + return true; + } + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, HttpResponse msg) { + enoughResponsesToCloseLatch.countDown(); + assertThat( + outstandingRequestsCounter.decrementAndGet(), + stoppedPipelining.get() ? oneOf(0, 1) : equalTo(1) + ); + + if ("close".equals(msg.headers().get("connection")) && ignoreCloseHeader() == false) { + assertTrue(nodeShuttingDown.get()); + // send one more request with `?respond_immediately` to stop the pipelining + if (stoppedPipelining.compareAndSet(false, true)) { + assertThat(outstandingRequestsCounter.incrementAndGet(), equalTo(2)); + ctx.writeAndFlush( + new DefaultFullHttpRequest( + HttpVersion.HTTP_1_1, + HttpMethod.GET, + KeepPipeliningPlugin.ROUTE + "?" + KeepPipeliningPlugin.RESPOND_IMMEDIATELY + ) + ); + } + } else { + // still pipelining, send another request to trigger the next response + assertThat(outstandingRequestsCounter.incrementAndGet(), equalTo(2)); + ctx.writeAndFlush(keepPipeliningRequest.retain()); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + ExceptionsHelper.maybeDieOnAnotherThread(new AssertionError(cause)); + } + }); + } + }); + + final var httpServerTransport = internalCluster().getInstance(HttpServerTransport.class, victimNode); + final var httpServerAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses()).address(); + + // Open a channel on which we will pipeline the requests to KeepPipeliningPlugin.ROUTE + final var pipeliningChannel = clientBootstrap.connect(httpServerAddress).syncUninterruptibly().channel(); + releasables.add(() -> pipeliningChannel.close().syncUninterruptibly()); + + // Send two pipelined requests so that we start to receive responses + assertTrue(outstandingRequestsCounter.compareAndSet(0, 2)); + pipeliningChannel.writeAndFlush(keepPipeliningRequest.retain()); + pipeliningChannel.writeAndFlush(keepPipeliningRequest.retain()); + + // wait until we've started to receive responses + safeAwait(enoughResponsesToCloseLatch); + + // Shut down the node + assertTrue(nodeShuttingDown.compareAndSet(false, true)); + internalCluster().stopNode(victimNode); + + // Wait for the pipelining channel to be closed, indicating that it stopped pipelining (because it received a response with + // `Connection: close`) and allowed the node to shut down + pipeliningChannel.closeFuture().syncUninterruptibly(); + + // The shutdown did not happen until all requests had had a response. + assertTrue(stoppedPipelining.get()); + assertEquals(0, outstandingRequestsCounter.get()); + + } finally { + Collections.reverse(releasables); + Releasables.close(releasables); + } + } + private void assertOpaqueIdsInOrder(Collection opaqueIds) { // check if opaque ids are monotonically increasing int i = 0; @@ -203,7 +342,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli } /** - * Adds an HTTP route that waits for 3 concurrent executions before returning any of them + * Adds an HTTP route that starts to emit a chunked response and then fails before its completion. */ public static class ChunkAndFailPlugin extends Plugin implements ActionPlugin { @@ -285,4 +424,54 @@ public String getResponseContentTypeString() { }); } } + + /** + * Adds an HTTP route that only responds when starting to process a second request, ensuring that there is always at least one in-flight + * request in the pipeline which keeps a connection from becoming idle. + */ + public static class KeepPipeliningPlugin extends Plugin implements ActionPlugin { + + static final String ROUTE = "/_test/keep_pipelining"; + static final String RESPOND_IMMEDIATELY = "respond_immediately"; + + @Override + public Collection getRestHandlers( + Settings settings, + NamedWriteableRegistry namedWriteableRegistry, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster, + Predicate clusterSupportsFeature + ) { + return List.of(new BaseRestHandler() { + + private SubscribableListener lastRequestTrigger = new SubscribableListener<>(); + + @Override + public String getName() { + return ROUTE; + } + + @Override + public List routes() { + return List.of(new Route(GET, ROUTE)); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) { + final var respondImmediately = request.paramAsBoolean(RESPOND_IMMEDIATELY, false); + return channel -> { + // all happens on a single thread in these tests, no need for concurrency protection + final var previousRequestTrigger = lastRequestTrigger; + lastRequestTrigger = respondImmediately ? SubscribableListener.nullSuccess() : new SubscribableListener<>(); + lastRequestTrigger.addListener(new EmptyResponseListener(channel).map(ignored -> ActionResponse.Empty.INSTANCE)); + previousRequestTrigger.onResponse(null); + }; + } + }); + } + } } diff --git a/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java b/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java index 2cb49416580d2..100febe00d060 100644 --- a/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java +++ b/server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.FutureUtils; +import org.elasticsearch.common.util.concurrent.RunOnce; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.core.AbstractRefCounted; @@ -619,13 +620,27 @@ public ThreadPool getThreadPool() { } /** - * A {@link HttpChannel} that tracks number of requests via a {@link RefCounted}. + * A {@link HttpChannel} that tracks the number of in-flight requests via a {@link RefCounted}, allowing the channel to be put into a + * state where it will close when idle. */ private static class RequestTrackingHttpChannel implements HttpChannel { + + /** + * Action which closes the inner channel exactly once, to avoid a double-close due to a natural {@link #close()} happening + * concurrently with the release of the last reference. + */ + private final Runnable closeOnce = new RunOnce(this::closeInner); + + /** + * Whether the channel will close when it becomes idle (i.e. the node is shutting down). + */ + private volatile boolean closeWhenIdle; + /** * Only counts down to zero via {@link #setCloseWhenIdle()}. */ - final RefCounted refCounted = AbstractRefCounted.of(this::closeInner); + final RefCounted refCounted = AbstractRefCounted.of(closeOnce); + final HttpChannel inner; RequestTrackingHttpChannel(HttpChannel inner) { @@ -640,24 +655,21 @@ public void incomingRequest() throws IllegalStateException { * Close the channel when there are no more requests in flight. */ public void setCloseWhenIdle() { + assert closeWhenIdle == false : "setCloseWhenIdle() already called"; + closeWhenIdle = true; refCounted.decRef(); } @Override public void close() { - closeInner(); + closeOnce.run(); } - /** - * Synchronized to avoid double close due to a natural close and a close via {@link #setCloseWhenIdle()} - */ private void closeInner() { - synchronized (inner) { - if (inner.isOpen()) { - inner.close(); - } else { - logger.info("channel [{}] already closed", inner); - } + if (inner.isOpen()) { + inner.close(); + } else { + logger.info("channel [{}] already closed", inner); } } @@ -673,6 +685,12 @@ public boolean isOpen() { @Override public void sendResponse(HttpResponse response, ActionListener listener) { + assert response.containsHeader(DefaultRestChannel.CONNECTION) == false; + if (closeWhenIdle) { + // We are shutting down, but will keep the connection open while there are still in-flight requests, and this could be an + // arbitrarily long wait if the client is pipelining, so tell the client it should stop using this connection: + response.addHeader(DefaultRestChannel.CONNECTION, DefaultRestChannel.CLOSE); + } inner.sendResponse( response, listener != null ? ActionListener.runAfter(listener, refCounted::decRef) : ActionListener.running(refCounted::decRef)