diff --git a/core/src/main/java/io/grpc/internal/MessageFramer.java b/core/src/main/java/io/grpc/internal/MessageFramer.java index 5e75fa2e6fe..83cfdc12897 100644 --- a/core/src/main/java/io/grpc/internal/MessageFramer.java +++ b/core/src/main/java/io/grpc/internal/MessageFramer.java @@ -75,6 +75,10 @@ void deliverFrame( // effectively final. Can only be set once. private int maxOutboundMessageSize = NO_MAX_OUTBOUND_MESSAGE_SIZE; private WritableBuffer buffer; + /** + * if > 0 - the number of bytes to allocate for the current known-length message. + */ + private int knownLengthPendingAllocation; private Compressor compressor = Codec.Identity.NONE; private boolean messageCompression = true; private final OutputStreamAdapter outputStreamAdapter = new OutputStreamAdapter(); @@ -222,13 +226,25 @@ private int writeKnownLengthUncompressed(InputStream message, int messageLength) headerScratch.put(UNCOMPRESSED).putInt(messageLength); // Allocate the initial buffer chunk based on frame header + payload length. // Note that the allocator may allocate a buffer larger or smaller than this length + knownLengthPendingAllocation = HEADER_LENGTH + messageLength; if (buffer == null) { - buffer = bufferAllocator.allocate(headerScratch.position() + messageLength); + buffer = allocateKnownLength(); } writeRaw(headerScratch.array(), 0, headerScratch.position()); return writeToOutputStream(message, outputStreamAdapter); } + /** + * Allocate buffer according to {@link #knownLengthPendingAllocation} which is decremented after + * that. + */ + private WritableBuffer allocateKnownLength() { + WritableBuffer newBuffer = bufferAllocator.allocateKnownLength(knownLengthPendingAllocation); + knownLengthPendingAllocation -= Math.min(knownLengthPendingAllocation, + newBuffer.writableBytes()); + return newBuffer; + } + /** * Write a message that has been serialized to a sequence of buffers. */ @@ -243,7 +259,7 @@ private void writeBufferChain(BufferChainOutputStream bufferChain, boolean compr } headerScratch.clear(); headerScratch.put(compressed ? COMPRESSED : UNCOMPRESSED).putInt(messageLength); - WritableBuffer writeableHeader = bufferAllocator.allocate(HEADER_LENGTH); + WritableBuffer writeableHeader = bufferAllocator.allocateKnownLength(HEADER_LENGTH); writeableHeader.write(headerScratch.array(), 0, headerScratch.position()); if (messageLength == 0) { // the payload had 0 length so make the header the current buffer. @@ -289,7 +305,9 @@ private void writeRaw(byte[] b, int off, int len) { } if (buffer == null) { // Request a buffer allocation using the message length as a hint. - buffer = bufferAllocator.allocate(len); + buffer = knownLengthPendingAllocation > 0 + ? allocateKnownLength() + : bufferAllocator.allocate(len); } int toWrite = min(len, buffer.writableBytes()); buffer.write(b, off, toWrite); @@ -397,7 +415,7 @@ private final class BufferChainOutputStream extends OutputStream { * {@link #write(byte[], int, int)}. */ @Override - public void write(int b) throws IOException { + public void write(int b) { if (current != null && current.writableBytes() > 0) { current.write((byte)b); return; diff --git a/core/src/main/java/io/grpc/internal/WritableBufferAllocator.java b/core/src/main/java/io/grpc/internal/WritableBufferAllocator.java index 902b279a556..e509b57a124 100644 --- a/core/src/main/java/io/grpc/internal/WritableBufferAllocator.java +++ b/core/src/main/java/io/grpc/internal/WritableBufferAllocator.java @@ -27,4 +27,12 @@ public interface WritableBufferAllocator { * free to return a buffer with a greater or lesser capacity. */ WritableBuffer allocate(int capacityHint); + + /** + * Request a new {@link WritableBuffer} with the given {@code capacityHint}. This method is + * similar to {@link #allocate(int)}, but there is no need to allocate greater capacity. + */ + default WritableBuffer allocateKnownLength(int capacityHint) { + return allocate(capacityHint); + } } diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/CompressionTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/CompressionTest.java index 208eb40c438..5307c26949b 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/CompressionTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/CompressionTest.java @@ -24,6 +24,8 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import io.grpc.CallOptions; import io.grpc.Channel; @@ -53,8 +55,6 @@ import io.grpc.testing.integration.TestServiceGrpc.TestServiceBlockingStub; import io.grpc.testing.integration.TransportCompressionTest.Fzip; import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -146,25 +146,16 @@ public void tearDown() { * Parameters for test. */ @Parameters - public static Collection params() { - boolean[] bools = new boolean[]{false, true}; - List combos = new ArrayList<>(64); - for (boolean enableClientMessageCompression : bools) { - for (boolean clientAcceptEncoding : bools) { - for (boolean clientEncoding : bools) { - for (boolean enableServerMessageCompression : bools) { - for (boolean serverAcceptEncoding : bools) { - for (boolean serverEncoding : bools) { - combos.add(new Object[] { - enableClientMessageCompression, clientAcceptEncoding, clientEncoding, - enableServerMessageCompression, serverAcceptEncoding, serverEncoding}); - } - } - } - } - } - } - return combos; + public static Iterable params() { + List bools = Lists.newArrayList(false, true); + return Iterables.transform(Lists.cartesianProduct( + bools, // enableClientMessageCompression + bools, // clientAcceptEncoding + bools, // clientEncoding + bools, // enableServerMessageCompression + bools, // serverAcceptEncoding + bools // serverEncoding + ), List::toArray); } @Test diff --git a/netty/src/main/java/io/grpc/netty/NettyWritableBufferAllocator.java b/netty/src/main/java/io/grpc/netty/NettyWritableBufferAllocator.java index 40b84717160..6b11090f348 100644 --- a/netty/src/main/java/io/grpc/netty/NettyWritableBufferAllocator.java +++ b/netty/src/main/java/io/grpc/netty/NettyWritableBufferAllocator.java @@ -33,6 +33,9 @@ */ class NettyWritableBufferAllocator implements WritableBufferAllocator { + // Use 4k as our minimum buffer size. + private static final int MIN_BUFFER = 4 * 1024; + // Set the maximum buffer size to 1MB. private static final int MAX_BUFFER = 1024 * 1024; @@ -44,6 +47,11 @@ class NettyWritableBufferAllocator implements WritableBufferAllocator { @Override public WritableBuffer allocate(int capacityHint) { + return allocateKnownLength(Math.max(MIN_BUFFER, capacityHint)); + } + + @Override + public WritableBuffer allocateKnownLength(int capacityHint) { capacityHint = Math.min(MAX_BUFFER, capacityHint); return new NettyWritableBuffer(allocator.buffer(capacityHint, capacityHint)); } diff --git a/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java b/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java index a44a196ac8c..4dd24c3fd4d 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java @@ -233,7 +233,7 @@ public void writeFrameFutureFailedShouldCancelRpc() { // Verify that failed SendGrpcFrameCommand results in immediate CancelClientStreamCommand. inOrder.verify(writeQueue).enqueue(any(CancelClientStreamCommand.class), eq(true)); // Verify that any other failures do not produce another CancelClientStreamCommand in the queue. - inOrder.verify(writeQueue, atLeast(1)).enqueue(any(SendGrpcFrameCommand.class), eq(false)); + inOrder.verify(writeQueue, atLeast(0)).enqueue(any(SendGrpcFrameCommand.class), eq(false)); inOrder.verify(writeQueue).enqueue(any(SendGrpcFrameCommand.class), eq(true)); inOrder.verifyNoMoreInteractions(); diff --git a/netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java b/netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java index 0723e359752..2f2933ae103 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java @@ -158,7 +158,7 @@ public void writeFrameFutureFailedShouldCancelRpc() { // Verify that failed SendGrpcFrameCommand results in immediate CancelServerStreamCommand. inOrder.verify(writeQueue).enqueue(any(CancelServerStreamCommand.class), eq(true)); // Verify that any other failures do not produce another CancelServerStreamCommand in the queue. - inOrder.verify(writeQueue, atLeast(1)).enqueue(any(SendGrpcFrameCommand.class), eq(false)); + inOrder.verify(writeQueue, atLeast(0)).enqueue(any(SendGrpcFrameCommand.class), eq(false)); inOrder.verify(writeQueue).enqueue(any(SendGrpcFrameCommand.class), eq(true)); inOrder.verifyNoMoreInteractions(); } diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpWritableBufferAllocator.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpWritableBufferAllocator.java index 481ada61c96..aa5361f2268 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpWritableBufferAllocator.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpWritableBufferAllocator.java @@ -19,6 +19,7 @@ import io.grpc.internal.WritableBuffer; import io.grpc.internal.WritableBufferAllocator; import okio.Buffer; +import okio.Segment; /** * The default allocator for {@link OkHttpWritableBuffer}s used by the OkHttp transport. OkHttp @@ -27,9 +28,6 @@ */ class OkHttpWritableBufferAllocator implements WritableBufferAllocator { - // Use 4k as our minimum buffer size. - private static final int MIN_BUFFER = 4096; - // Set the maximum buffer size to 1MB private static final int MAX_BUFFER = 1024 * 1024; @@ -45,7 +43,13 @@ class OkHttpWritableBufferAllocator implements WritableBufferAllocator { */ @Override public WritableBuffer allocate(int capacityHint) { - capacityHint = Math.min(MAX_BUFFER, Math.max(MIN_BUFFER, capacityHint)); + // okio buffer uses fixed size Segments, round capacityHint up + return allocateKnownLength((capacityHint + Segment.SIZE - 1) / Segment.SIZE * Segment.SIZE); + } + + @Override + public WritableBuffer allocateKnownLength(int capacityHint) { + capacityHint = Math.min(MAX_BUFFER, capacityHint); return new OkHttpWritableBuffer(new Buffer(), capacityHint); } } diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpWritableBufferAllocatorTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpWritableBufferAllocatorTest.java index e606b6b9a50..c444e0ee11d 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpWritableBufferAllocatorTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpWritableBufferAllocatorTest.java @@ -21,6 +21,7 @@ import io.grpc.internal.WritableBuffer; import io.grpc.internal.WritableBufferAllocator; import io.grpc.internal.WritableBufferAllocatorTestBase; +import okio.Segment; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -42,7 +43,7 @@ protected WritableBufferAllocator allocator() { public void testCapacity() { WritableBuffer buffer = allocator().allocate(4096); assertEquals(0, buffer.readableBytes()); - assertEquals(4096, buffer.writableBytes()); + assertEquals(Segment.SIZE, buffer.writableBytes()); } @Test @@ -54,8 +55,8 @@ public void testInitialCapacityHasMaximum() { @Test public void testIsExactBelowMaxCapacity() { - WritableBuffer buffer = allocator().allocate(4097); + WritableBuffer buffer = allocator().allocate(Segment.SIZE + 1); assertEquals(0, buffer.readableBytes()); - assertEquals(4097, buffer.writableBytes()); + assertEquals(Segment.SIZE * 2, buffer.writableBytes()); } } diff --git a/servlet/src/main/java/io/grpc/servlet/ServletServerStream.java b/servlet/src/main/java/io/grpc/servlet/ServletServerStream.java index bc367587cc6..b340a33f925 100644 --- a/servlet/src/main/java/io/grpc/servlet/ServletServerStream.java +++ b/servlet/src/main/java/io/grpc/servlet/ServletServerStream.java @@ -38,6 +38,7 @@ import io.grpc.internal.TransportFrameUtil; import io.grpc.internal.TransportTracer; import io.grpc.internal.WritableBuffer; +import io.grpc.internal.WritableBufferAllocator; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Collections; @@ -72,7 +73,7 @@ final class ServletServerStream extends AbstractServerStream { Attributes attributes, String authority, InternalLogId logId) throws IOException { - super(ByteArrayWritableBuffer::new, statsTraceCtx); + super(ALLOCATOR, statsTraceCtx); transportState = new ServletTransportState(maxInboundMessageSize, statsTraceCtx, new TransportTracer()); this.attributes = attributes; @@ -161,6 +162,20 @@ public void deframeFailed(Throwable cause) { } } + private static final WritableBufferAllocator ALLOCATOR = new WritableBufferAllocator() { + private static final int MIN_BUFFER = 4096; + + @Override + public WritableBuffer allocate(int capacityHint) { + return new ByteArrayWritableBuffer(max(MIN_BUFFER, capacityHint)); + } + + @Override + public WritableBuffer allocateKnownLength(int capacityHint) { + return new ByteArrayWritableBuffer(capacityHint); + } + }; + private static final class ByteArrayWritableBuffer implements WritableBuffer { private final int capacity;