Skip to content

Commit

Permalink
Fixes #10679 - Review HTTP/2 rate control.
Browse files Browse the repository at this point in the history
* Bumped the rate control rate from 50 events/s to 128.
* Added rate control for all CONTINUATION frames.
* Added rate control for invalid PUSH_PROMISE frames.
* Added rate control for RST_STREAM frames.
* Added rate control for all SETTINGS frames.
* Fixed growth of header block accumulation buffer.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet committed Oct 9, 2023
1 parent d1d2af6 commit 0d344f4
Show file tree
Hide file tree
Showing 11 changed files with 179 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,28 @@ public boolean parse(ByteBuffer buffer)
int remaining = buffer.remaining();
if (remaining < length)
{
headerBlockFragments.storeFragment(buffer, remaining, false);
ContinuationFrame frame = new ContinuationFrame(getStreamId(), false);
if (!rateControlOnEvent(frame))
return connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_continuation_frame_rate");

if (!headerBlockFragments.storeFragment(buffer, remaining, false))
return connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_continuation_stream");

length -= remaining;
break;
}
else
{
boolean last = hasFlag(Flags.END_HEADERS);
headerBlockFragments.storeFragment(buffer, length, last);
boolean endHeaders = hasFlag(Flags.END_HEADERS);
ContinuationFrame frame = new ContinuationFrame(getStreamId(), endHeaders);
if (!rateControlOnEvent(frame))
return connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_continuation_frame_rate");

if (!headerBlockFragments.storeFragment(buffer, length, endHeaders))
return connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_continuation_stream");

reset();
if (last)
if (endHeaders)
return onHeaders(buffer);
return true;
}
Expand All @@ -104,17 +116,21 @@ private boolean onHeaders(ByteBuffer buffer)
RetainableByteBuffer headerBlock = headerBlockFragments.complete();
MetaData metaData = headerBlockParser.parse(headerBlock.getByteBuffer(), headerBlock.remaining());
headerBlock.release();
if (metaData == null)
return true;
HeadersFrame frame = new HeadersFrame(getStreamId(), metaData, headerBlockFragments.getPriorityFrame(), headerBlockFragments.isEndStream());
headerBlockFragments.reset();

if (metaData == HeaderBlockParser.SESSION_FAILURE)
return false;
HeadersFrame frame = new HeadersFrame(getStreamId(), metaData, headerBlockFragments.getPriorityFrame(), headerBlockFragments.isEndStream());
if (metaData == HeaderBlockParser.STREAM_FAILURE)

if (metaData != HeaderBlockParser.STREAM_FAILURE)
{
notifyHeaders(frame);
}
else
{
if (!rateControlOnEvent(frame))
return connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_continuation_frame_rate");
return connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_headers_frame_rate");
}
notifyHeaders(frame);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,43 @@
public class HeaderBlockFragments
{
private final ByteBufferPool bufferPool;
private final int maxCapacity;
private PriorityFrame priorityFrame;
private boolean endStream;
private int streamId;
private boolean endStream;
private RetainableByteBuffer storage;

public HeaderBlockFragments(ByteBufferPool bufferPool)
public HeaderBlockFragments(ByteBufferPool bufferPool, int maxCapacity)
{
this.bufferPool = bufferPool;
this.maxCapacity = maxCapacity;
}

void reset()
{
priorityFrame = null;
streamId = 0;
endStream = false;
storage = null;
}

public void storeFragment(ByteBuffer fragment, int length, boolean last)
public boolean storeFragment(ByteBuffer fragment, int length, boolean last)
{
if (storage == null)
{
int space = last ? length : length * 2;
storage = bufferPool.acquire(space, fragment.isDirect());
if (length > maxCapacity)
return false;
int capacity = last ? length : length * 2;
storage = bufferPool.acquire(capacity, fragment.isDirect());
BufferUtil.flipToFill(storage.getByteBuffer());
}

// Grow the storage if necessary.
if (storage.remaining() < length)
{
ByteBuffer byteBuffer = storage.getByteBuffer();
if (byteBuffer.position() + length > maxCapacity)
return false;
int space = last ? length : length * 2;
int capacity = byteBuffer.position() + space;
RetainableByteBuffer newStorage = bufferPool.acquire(capacity, storage.isDirect());
Expand All @@ -61,6 +75,7 @@ public void storeFragment(ByteBuffer fragment, int length, boolean last)
fragment.limit(fragment.position() + length);
storage.getByteBuffer().put(fragment);
fragment.limit(limit);
return true;
}

public PriorityFrame getPriorityFrame()
Expand All @@ -85,10 +100,8 @@ public void setEndStream(boolean endStream)

public RetainableByteBuffer complete()
{
RetainableByteBuffer result = storage;
storage = null;
result.getByteBuffer().flip();
return result;
storage.getByteBuffer().flip();
return storage;
}

public int getStreamId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,15 @@ else if (hasFlag(Flags.END_HEADERS))
}
else
{
headerBlockFragments.setStreamId(getStreamId());
headerBlockFragments.setEndStream(isEndStream());
if (headerBlockFragments.getStreamId() != 0)
{
connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_headers_frame");
}
else
{
headerBlockFragments.setStreamId(getStreamId());
headerBlockFragments.setEndStream(isEndStream());
}
}
}

Expand Down Expand Up @@ -171,6 +178,18 @@ else if (hasFlag(Flags.PRIORITY))
break;
}
case HEADERS:
{
if (!hasFlag(Flags.END_HEADERS))
{
headerBlockFragments.setStreamId(getStreamId());
headerBlockFragments.setEndStream(isEndStream());
if (hasFlag(Flags.PRIORITY))
headerBlockFragments.setPriorityFrame(new PriorityFrame(getStreamId(), parentStreamId, weight, exclusive));
}
state = State.HEADER_BLOCK;
break;
}
case HEADER_BLOCK:
{
if (hasFlag(Flags.END_HEADERS))
{
Expand All @@ -195,7 +214,7 @@ else if (hasFlag(Flags.PRIORITY))
{
HeadersFrame frame = new HeadersFrame(getStreamId(), metaData, null, isEndStream());
if (!rateControlOnEvent(frame))
connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_headers_frame_rate");
return connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_headers_frame_rate");
}
}
}
Expand All @@ -204,16 +223,14 @@ else if (hasFlag(Flags.PRIORITY))
int remaining = buffer.remaining();
if (remaining < length)
{
headerBlockFragments.storeFragment(buffer, remaining, false);
if (!headerBlockFragments.storeFragment(buffer, remaining, false))
return connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_headers_frame");
length -= remaining;
}
else
{
headerBlockFragments.setStreamId(getStreamId());
headerBlockFragments.setEndStream(isEndStream());
if (hasFlag(Flags.PRIORITY))
headerBlockFragments.setPriorityFrame(new PriorityFrame(getStreamId(), parentStreamId, weight, exclusive));
headerBlockFragments.storeFragment(buffer, length, false);
if (!headerBlockFragments.storeFragment(buffer, length, false))
return connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_headers_frame");
state = State.PADDING;
loop = paddingLength == 0;
}
Expand Down Expand Up @@ -257,6 +274,6 @@ private void onHeaders(HeadersFrame frame)

private enum State
{
PREPARE, PADDING_LENGTH, EXCLUSIVE, PARENT_STREAM_ID, PARENT_STREAM_ID_BYTES, WEIGHT, HEADERS, PADDING
PREPARE, PADDING_LENGTH, EXCLUSIVE, PARENT_STREAM_ID, PARENT_STREAM_ID_BYTES, WEIGHT, HEADERS, HEADER_BLOCK, PADDING
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void init(Listener listener)
this.listener = listener;
unknownBodyParser = new UnknownBodyParser(headerParser, listener);
HeaderBlockParser headerBlockParser = new HeaderBlockParser(headerParser, bufferPool, hpackDecoder, unknownBodyParser);
HeaderBlockFragments headerBlockFragments = new HeaderBlockFragments(bufferPool);
HeaderBlockFragments headerBlockFragments = new HeaderBlockFragments(bufferPool, hpackDecoder.getMaxHeaderListSize());
bodyParsers[FrameType.DATA.getType()] = new DataBodyParser(headerParser, listener);
bodyParsers[FrameType.HEADERS.getType()] = new HeadersBodyParser(headerParser, listener, headerBlockParser, headerBlockFragments);
bodyParsers[FrameType.PRIORITY.getType()] = new PriorityBodyParser(headerParser, listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.Flags;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;

public class PushPromiseBodyParser extends BodyParser
Expand Down Expand Up @@ -65,13 +66,9 @@ public boolean parse(ByteBuffer buffer)
length = getBodyLength();

if (isPadding())
{
state = State.PADDING_LENGTH;
}
else
{
state = State.STREAM_ID;
}
break;
}
case PADDING_LENGTH:
Expand Down Expand Up @@ -131,7 +128,15 @@ public boolean parse(ByteBuffer buffer)
state = State.PADDING;
loop = paddingLength == 0;
if (metaData != HeaderBlockParser.STREAM_FAILURE)
{
onPushPromise(streamId, metaData);
}
else
{
HeadersFrame frame = new HeadersFrame(getStreamId(), metaData, null, isEndStream());
if (!rateControlOnEvent(frame))
return connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_headers_frame_rate");
}
}
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public boolean parse(ByteBuffer buffer)
{
if (buffer.remaining() >= 4)
{
return onReset(buffer.getInt());
return onReset(buffer, buffer.getInt());
}
else
{
Expand All @@ -73,7 +73,7 @@ public boolean parse(ByteBuffer buffer)
--cursor;
error += currByte << (8 * cursor);
if (cursor == 0)
return onReset(error);
return onReset(buffer, error);
break;
}
default:
Expand All @@ -85,9 +85,11 @@ public boolean parse(ByteBuffer buffer)
return false;
}

private boolean onReset(int error)
private boolean onReset(ByteBuffer buffer, int error)
{
ResetFrame frame = new ResetFrame(getStreamId(), error);
if (!rateControlOnEvent(frame))
return connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_rst_stream_frame_rate");
reset();
notifyReset(frame);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,7 @@ protected void emptyBody(ByteBuffer buffer)
return;
boolean isReply = hasFlag(Flags.ACK);
SettingsFrame frame = new SettingsFrame(Collections.emptyMap(), isReply);
if (!isReply && !rateControlOnEvent(frame))
connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_settings_frame_rate");
else
onSettings(frame);
onSettings(buffer, frame);
}

private boolean validateFrame(ByteBuffer buffer, int streamId, int bodyLength)
Expand Down Expand Up @@ -219,11 +216,13 @@ protected boolean onSettings(ByteBuffer buffer, Map<Integer, Integer> settings)
return connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_settings_max_frame_size");

SettingsFrame frame = new SettingsFrame(settings, hasFlag(Flags.ACK));
return onSettings(frame);
return onSettings(buffer, frame);
}

private boolean onSettings(SettingsFrame frame)
private boolean onSettings(ByteBuffer buffer, SettingsFrame frame)
{
if (!rateControlOnEvent(frame))
return connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_settings_frame_rate");
reset();
notifySettings(frame);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public boolean parse(ByteBuffer buffer)
boolean parsed = cursor == 0;
if (parsed && !rateControlOnEvent(new UnknownFrame(getFrameType())))
return connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_unknown_frame_rate");

return parsed;
}

Expand Down
Loading

0 comments on commit 0d344f4

Please sign in to comment.