Skip to content

Remove first FlowControlHandler from HTTP pipeline #128099

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
5 changes: 5 additions & 0 deletions docs/changelog/128099.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 128099
summary: Remove first `FlowControlHandler` from HTTP pipeline
area: Network
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,23 @@
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.util.ReferenceCounted;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.http.netty4.internal.HttpValidator;
import org.elasticsearch.transport.Transports;

import java.util.ArrayDeque;

public class Netty4HttpHeaderValidator extends ChannelDuplexHandler {

private final HttpValidator validator;
private final ThreadContext threadContext;
private State state;
private State state = State.PASSING;
private final ArrayDeque<Object> buffer = new ArrayDeque<>();

public Netty4HttpHeaderValidator(HttpValidator validator, ThreadContext threadContext) {
this.validator = validator;
Expand All @@ -36,80 +40,125 @@ public Netty4HttpHeaderValidator(HttpValidator validator, ThreadContext threadCo

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (state == State.VALIDATING || buffer.size() > 0) {
// there's already some buffered messages that need to be processed before this one, so queue this one up behind them
buffer.offerLast(msg);
return;
}

assert msg instanceof HttpObject;
var httpObject = (HttpObject) msg;
final var httpObject = (HttpObject) msg;
if (httpObject.decoderResult().isFailure()) {
ctx.fireChannelRead(httpObject); // pass-through for decoding failures
} else if (msg instanceof HttpRequest httpRequest) {
validate(ctx, httpRequest);
} else if (state == State.PASSING) {
assert msg instanceof HttpContent;
ctx.fireChannelRead(msg);
} else {
if (msg instanceof HttpRequest request) {
validate(ctx, request);
} else {
assert msg instanceof HttpContent;
var content = (HttpContent) msg;
if (state == State.DROPPING) {
content.release();
ctx.read();
} else {
assert state == State.PASSING : "unexpected content before validation completed";
ctx.fireChannelRead(content);
}
}
assert state == State.DROPPING : state;
assert msg instanceof HttpContent;
final var httpContent = (HttpContent) msg;
httpContent.release();
ctx.read();
}
}

@Override
public void read(ChannelHandlerContext ctx) throws Exception {
// until validation is completed we can ignore read calls,
// once validation is finished HttpRequest will be fired and downstream can read from there
if (state != State.VALIDATING) {
ctx.read();
}
public void channelReadComplete(ChannelHandlerContext ctx) {
if (buffer.size() == 0) {
ctx.fireChannelReadComplete();
} // else we're buffering messages so will manage the read-complete messages ourselves
}

void validate(ChannelHandlerContext ctx, HttpRequest request) {
assert Transports.assertDefaultThreadContext(threadContext);
state = State.VALIDATING;
ActionListener.run(
// this prevents thread-context changes to propagate to the validation listener
// atm, the validation listener submits to the event loop executor, which doesn't know about the ES thread-context,
// so this is just a defensive play, in case the code inside the listener changes to not use the event loop executor
ActionListener.assertOnce(
new ContextPreservingActionListener<Void>(
threadContext.wrapRestorable(threadContext.newStoredContext()),
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
handleValidationResult(ctx, request, null);
}

@Override
public void onFailure(Exception e) {
handleValidationResult(ctx, request, e);
}
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
assert ctx.channel().eventLoop().inEventLoop();
if (state != State.VALIDATING) {
if (buffer.size() > 0) {
final var message = buffer.pollFirst();
if (message instanceof HttpRequest httpRequest) {
if (httpRequest.decoderResult().isFailure()) {
ctx.fireChannelRead(message); // pass-through for decoding failures
ctx.fireChannelReadComplete(); // downstream will have to call read() again when it's ready
} else {
validate(ctx, httpRequest);
}
)
),
listener -> {
// this prevents thread-context changes to propagate beyond the validation, as netty worker threads are reused
try (ThreadContext.StoredContext ignore = threadContext.newStoredContext()) {
validator.validate(request, ctx.channel(), listener);
} else {
assert message instanceof HttpContent;
assert state == State.PASSING : state; // DROPPING releases any buffered chunks up-front
ctx.fireChannelRead(message);
ctx.fireChannelReadComplete(); // downstream will have to call read() again when it's ready
Comment on lines +87 to +91
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we don't care about individual chunks and in spirit of PR to process more efficiently we can compose all buffered chunks into one if there is more than one chunk.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I did play around with that idea at first but I couldn't find a totally obvious way to combine the chunks back together (it's more than just the bytes in the request body, there's also decoder errors, and the last chunk is special, and maybe some other things too). Moreover this doesn't do anything to the hot path where we complete validation inline and then stream chunks straight through anyway, so it didn't seem worth pursuing. Maybe in a follow-up?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. Added complexity seems unnecessary. Thanks.

}
} else {
ctx.read();
}
);
}
}

void handleValidationResult(ChannelHandlerContext ctx, HttpRequest request, @Nullable Exception validationError) {
assert Transports.assertDefaultThreadContext(threadContext);
// Always explicitly dispatch back to the event loop to prevent reentrancy concerns if we are still on event loop
ctx.channel().eventLoop().execute(() -> {
if (validationError != null) {
request.setDecoderResult(DecoderResult.failure(validationError));
state = State.DROPPING;
} else {
state = State.PASSING;
void validate(ChannelHandlerContext ctx, HttpRequest httpRequest) {
final var validationResultListener = new ValidationResultListener(ctx, httpRequest);
SubscribableListener.newForked(validationResultListener::doValidate)
.addListener(
validationResultListener,
// dispatch back to event loop unless validation completed already in which case we can just continue on this thread
// straight away, avoiding the need to buffer any subsequent messages
ctx.channel().eventLoop(),
null
);
}

private class ValidationResultListener implements ActionListener<Void> {

private final ChannelHandlerContext ctx;
private final HttpRequest httpRequest;

ValidationResultListener(ChannelHandlerContext ctx, HttpRequest httpRequest) {
this.ctx = ctx;
this.httpRequest = httpRequest;
}

void doValidate(ActionListener<Void> listener) {
assert Transports.assertDefaultThreadContext(threadContext);
assert ctx.channel().eventLoop().inEventLoop();
assert state == State.PASSING || state == State.DROPPING : state;
state = State.VALIDATING;
try (var ignore = threadContext.newEmptyContext()) {
validator.validate(
httpRequest,
ctx.channel(),
new ContextPreservingActionListener<>(threadContext::newEmptyContext, listener)
);
}
ctx.fireChannelRead(request);
});
}

@Override
public void onResponse(Void unused) {
assert Transports.assertDefaultThreadContext(threadContext);
assert ctx.channel().eventLoop().inEventLoop();
assert state == State.VALIDATING : state;
state = State.PASSING;
fireChannelRead();
}

@Override
public void onFailure(Exception e) {
assert Transports.assertDefaultThreadContext(threadContext);
assert ctx.channel().eventLoop().inEventLoop();
assert state == State.VALIDATING : state;
httpRequest.setDecoderResult(DecoderResult.failure(e));
state = State.DROPPING;
while (buffer.isEmpty() == false && buffer.peekFirst() instanceof HttpRequest == false) {
assert buffer.peekFirst() instanceof HttpContent;
((ReferenceCounted) buffer.pollFirst()).release();
}
fireChannelRead();
}

private void fireChannelRead() {
ctx.fireChannelRead(httpRequest);
ctx.fireChannelReadComplete(); // downstream needs to read() again
}
}

private enum State {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,6 @@ protected HttpMessage createMessage(String[] initialLine) throws Exception {
ch.pipeline().addLast("decoder", decoder); // parses the HTTP bytes request into HTTP message pieces

// from this point in pipeline every handler must call ctx or channel #read() when ready to process next HTTP part
ch.pipeline().addLast(new FlowControlHandler());
if (Assertions.ENABLED) {
// missing reads are hard to catch, but we can detect absence of reads within interval
long missingReadIntervalMs = 10_000;
Expand Down
Loading
Loading