Skip to content

Commit f959da6

Browse files
authored
transport: Reduce heap allocations (#8668)
## Benchmarks ```sh # Test command $ go run benchmark/benchmain/main.go -benchtime=60s -workloads=unary \ -compression=off -maxConcurrentCalls=200 -trace=off \ -reqSizeBytes=100 -respSizeBytes=100 -networkMode=Local -resultFile="${RUN_NAME}" $ go run benchmark/benchresult/main.go unary-before unary-after Title Before After Percentage TotalOps 7801951 7889246 1.12% SendOps 0 0 NaN% RecvOps 0 0 NaN% Bytes/op 10005.90 9911.48 -0.94% Allocs/op 146.91 143.91 -2.04% ReqT/op 104026013.33 105189946.67 1.12% RespT/op 104026013.33 105189946.67 1.12% 50th-Lat 1.375183ms 1.360319ms -1.08% 90th-Lat 2.293816ms 2.249015ms -1.95% 99th-Lat 3.162307ms 3.13568ms -0.84% Avg-Lat 1.536462ms 1.519465ms -1.11% GoVersion go1.24.8 go1.24.8 GrpcVersion 1.77.0-dev 1.77.0-dev ``` RELEASE NOTES: N/A
1 parent 0d49384 commit f959da6

File tree

5 files changed

+23
-26
lines changed

5 files changed

+23
-26
lines changed

internal/transport/http2_client.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -500,12 +500,10 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *ClientSt
500500
s.ctx = ctx
501501
s.trReader = transportReader{
502502
reader: recvBufferReader{
503-
ctx: s.ctx,
504-
ctxDone: s.ctx.Done(),
505-
recv: &s.buf,
506-
closeStream: func(err error) {
507-
s.Close(err)
508-
},
503+
ctx: s.ctx,
504+
ctxDone: s.ctx.Done(),
505+
recv: &s.buf,
506+
clientStream: s,
509507
},
510508
windowHandler: s,
511509
}

internal/transport/transport.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -123,13 +123,13 @@ func (b *recvBuffer) get() <-chan recvMsg {
123123
// recvBufferReader implements io.Reader interface to read the data from
124124
// recvBuffer.
125125
type recvBufferReader struct {
126-
_ noCopy
127-
closeStream func(error) // Closes the client transport stream with the given error and nil trailer metadata.
128-
ctx context.Context
129-
ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
130-
recv *recvBuffer
131-
last mem.Buffer // Stores the remaining data in the previous calls.
132-
err error
126+
_ noCopy
127+
clientStream *ClientStream // The client transport stream is closed with a status representing ctx.Err() and nil trailer metadata.
128+
ctx context.Context
129+
ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
130+
recv *recvBuffer
131+
last mem.Buffer // Stores the remaining data in the previous calls.
132+
err error
133133
}
134134

135135
func (r *recvBufferReader) ReadMessageHeader(header []byte) (n int, err error) {
@@ -140,7 +140,7 @@ func (r *recvBufferReader) ReadMessageHeader(header []byte) (n int, err error) {
140140
n, r.last = mem.ReadUnsafe(header, r.last)
141141
return n, nil
142142
}
143-
if r.closeStream != nil {
143+
if r.clientStream != nil {
144144
n, r.err = r.readMessageHeaderClient(header)
145145
} else {
146146
n, r.err = r.readMessageHeader(header)
@@ -165,7 +165,7 @@ func (r *recvBufferReader) Read(n int) (buf mem.Buffer, err error) {
165165
}
166166
return buf, nil
167167
}
168-
if r.closeStream != nil {
168+
if r.clientStream != nil {
169169
buf, r.err = r.readClient(n)
170170
} else {
171171
buf, r.err = r.read(n)
@@ -210,7 +210,7 @@ func (r *recvBufferReader) readMessageHeaderClient(header []byte) (n int, err er
210210
// TODO: delaying ctx error seems like a unnecessary side effect. What
211211
// we really want is to mark the stream as done, and return ctx error
212212
// faster.
213-
r.closeStream(ContextErr(r.ctx.Err()))
213+
r.clientStream.Close(ContextErr(r.ctx.Err()))
214214
m := <-r.recv.get()
215215
return r.readMessageHeaderAdditional(m, header)
216216
case m := <-r.recv.get():
@@ -237,7 +237,7 @@ func (r *recvBufferReader) readClient(n int) (buf mem.Buffer, err error) {
237237
// TODO: delaying ctx error seems like a unnecessary side effect. What
238238
// we really want is to mark the stream as done, and return ctx error
239239
// faster.
240-
r.closeStream(ContextErr(r.ctx.Err()))
240+
r.clientStream.Close(ContextErr(r.ctx.Err()))
241241
m := <-r.recv.get()
242242
return r.readAdditional(m, n)
243243
case m := <-r.recv.get():

preloader.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,6 @@ func (p *PreparedMsg) Encode(s Stream, msg any) error {
4747
}
4848

4949
// check if the context has the relevant information to prepareMsg
50-
if rpcInfo.preloaderInfo == nil {
51-
return status.Errorf(codes.Internal, "grpc: rpcInfo.preloaderInfo is nil")
52-
}
5350
if rpcInfo.preloaderInfo.codec == nil {
5451
return status.Errorf(codes.Internal, "grpc: rpcInfo.preloaderInfo.codec is nil")
5552
}

rpc_util.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -961,7 +961,7 @@ func recv(p *parser, c baseCodec, s recvCompressor, dc Decompressor, m any, maxR
961961
// Information about RPC
962962
type rpcInfo struct {
963963
failfast bool
964-
preloaderInfo *compressorInfo
964+
preloaderInfo compressorInfo
965965
}
966966

967967
// Information about Preloader
@@ -980,7 +980,7 @@ type rpcInfoContextKey struct{}
980980
func newContextWithRPCInfo(ctx context.Context, failfast bool, codec baseCodec, cp Compressor, comp encoding.Compressor) context.Context {
981981
return context.WithValue(ctx, rpcInfoContextKey{}, &rpcInfo{
982982
failfast: failfast,
983-
preloaderInfo: &compressorInfo{
983+
preloaderInfo: compressorInfo{
984984
codec: codec,
985985
cp: cp,
986986
comp: comp,

stream.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,8 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
177177
return cc.NewStream(ctx, desc, method, opts...)
178178
}
179179

180+
var emptyMethodConfig = serviceconfig.MethodConfig{}
181+
180182
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
181183
// Start tracking the RPC for idleness purposes. This is where a stream is
182184
// created for both streaming and unary RPCs, and hence is a good place to
@@ -217,7 +219,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
217219
return nil, err
218220
}
219221

220-
var mc serviceconfig.MethodConfig
222+
mc := &emptyMethodConfig
221223
var onCommit func()
222224
newStream := func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
223225
return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, nameResolutionDelayed, opts...)
@@ -240,7 +242,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
240242
if rpcConfig.Context != nil {
241243
ctx = rpcConfig.Context
242244
}
243-
mc = rpcConfig.MethodConfig
245+
mc = &rpcConfig.MethodConfig
244246
onCommit = rpcConfig.OnCommitted
245247
if rpcConfig.Interceptor != nil {
246248
rpcInfo.Context = nil
@@ -258,7 +260,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
258260
return newStream(ctx, func() {})
259261
}
260262

261-
func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), nameResolutionDelayed bool, opts ...CallOption) (_ iresolver.ClientStream, err error) {
263+
func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc *serviceconfig.MethodConfig, onCommit, doneFunc func(), nameResolutionDelayed bool, opts ...CallOption) (_ iresolver.ClientStream, err error) {
262264
callInfo := defaultCallInfo()
263265
if mc.WaitForReady != nil {
264266
callInfo.failFast = !*mc.WaitForReady
@@ -325,7 +327,7 @@ func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *Client
325327
cs := &clientStream{
326328
callHdr: callHdr,
327329
ctx: ctx,
328-
methodConfig: &mc,
330+
methodConfig: mc,
329331
opts: opts,
330332
callInfo: callInfo,
331333
cc: cc,

0 commit comments

Comments
 (0)