Skip to content

Commit ece7397

Browse files
authored
transport: Reduce pointer usage in Stream structs (#8624)
The pprof profiles for unary RPC benchmarks indicate significant time spent in `runtime.mallocgc` and `runtime.gcBgMarkWorker`. This indicates gRPC is spending significant CPU cycles allocating or garbage collecting. This change reduces the number of pointer fields in the structs that represent client and server stream. This will reduce number of memory allocations (faster) and also reduce pressure on garbage collector (faster garbage collections) since the GC doesn't need to scan non-pointer fields. For structs which were stored as pointers to ensure values are not copied, a `noCopy` struct is embedded that will cause `go vet` to fail if copies are performed. Non-pointer fields are also moved to the end of the struct to improve allocation speed. ## Results There are improvements in QPS, latency and allocs/op for unary RPCs. ```sh # test command go run benchmark/benchmain/main.go -benchtime=60s -workloads=unary \ -compression=off -maxConcurrentCalls=500 -trace=off \ -reqSizeBytes=100 -respSizeBytes=100 -networkMode=Local -resultFile="${RUN_NAME}" -recvBufferPool=simple go run benchmark/benchresult/main.go unary-before unary-after Title Before After Percentage TotalOps 7690250 7991877 3.92% SendOps 0 0 NaN% RecvOps 0 0 NaN% Bytes/op 10218.14 10084.00 -1.31% Allocs/op 164.85 151.85 -7.89% ReqT/op 102536666.67 106558360.00 3.92% RespT/op 102536666.67 106558360.00 3.92% 50th-Lat 3.57283ms 3.435143ms -3.85% 90th-Lat 5.152403ms 4.979906ms -3.35% 99th-Lat 5.985282ms 5.827893ms -2.63% Avg-Lat 3.89872ms 3.750449ms -3.80% GoVersion go1.24.4 go1.24.4 GrpcVersion 1.77.0-dev 1.77.0-dev ``` ## Resources * go/go/performance?polyglot=open-source#application-spends-too-much-on-gc-or-allocations * go/go/performance?polyglot=open-source#memory-optimizations RELEASE NOTES: * transport: Reduce pointer usage to lower garbage collection pressure and improve unary RPC performance.
1 parent 8389ddb commit ece7397

File tree

11 files changed

+117
-90
lines changed

11 files changed

+117
-90
lines changed

internal/transport/client_stream.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,25 +29,27 @@ import (
2929

3030
// ClientStream implements streaming functionality for a gRPC client.
3131
type ClientStream struct {
32-
*Stream // Embed for common stream functionality.
32+
Stream // Embed for common stream functionality.
3333

3434
ct *http2Client
3535
done chan struct{} // closed at the end of stream to unblock writers.
3636
doneFunc func() // invoked at the end of stream.
3737

38-
headerChan chan struct{} // closed to indicate the end of header metadata.
39-
headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
38+
headerChan chan struct{} // closed to indicate the end of header metadata.
39+
header metadata.MD // the received header metadata
40+
41+
status *status.Status // the status error received from the server
42+
43+
// Non-pointer fields are at the end to optimize GC allocations.
44+
4045
// headerValid indicates whether a valid header was received. Only
4146
// meaningful after headerChan is closed (always call waitOnHeader() before
4247
// reading its value).
43-
headerValid bool
44-
header metadata.MD // the received header metadata
45-
noHeaders bool // set if the client never received headers (set only after the stream is done).
46-
47-
bytesReceived atomic.Bool // indicates whether any bytes have been received on this stream
48-
unprocessed atomic.Bool // set if the server sends a refused stream or GOAWAY including this stream
49-
50-
status *status.Status // the status error received from the server
48+
headerValid bool
49+
noHeaders bool // set if the client never received headers (set only after the stream is done).
50+
headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
51+
bytesReceived atomic.Bool // indicates whether any bytes have been received on this stream
52+
unprocessed atomic.Bool // set if the server sends a refused stream or GOAWAY including this stream
5153
}
5254

5355
// Read reads an n byte message from the input stream.

internal/transport/flowcontrol.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import (
2828
// writeQuota is a soft limit on the amount of data a stream can
2929
// schedule before some of it is written out.
3030
type writeQuota struct {
31-
quota int32
31+
_ noCopy
3232
// get waits on read from when quota goes less than or equal to zero.
3333
// replenish writes on it when quota goes positive again.
3434
ch chan struct{}
@@ -38,16 +38,17 @@ type writeQuota struct {
3838
// It is implemented as a field so that it can be updated
3939
// by tests.
4040
replenish func(n int)
41+
quota int32
4142
}
4243

43-
func newWriteQuota(sz int32, done <-chan struct{}) *writeQuota {
44-
w := &writeQuota{
45-
quota: sz,
46-
ch: make(chan struct{}, 1),
47-
done: done,
48-
}
44+
// init allows a writeQuota to be initialized in-place, which is useful for
45+
// resetting a buffer or for avoiding a heap allocation when the buffer is
46+
// embedded in another struct.
47+
func (w *writeQuota) init(sz int32, done <-chan struct{}) {
48+
w.quota = sz
49+
w.ch = make(chan struct{}, 1)
50+
w.done = done
4951
w.replenish = w.realReplenish
50-
return w
5152
}
5253

5354
func (w *writeQuota) get(sz int32) error {

internal/transport/handler_server.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -411,11 +411,10 @@ func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream
411411
ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
412412
req := ht.req
413413
s := &ServerStream{
414-
Stream: &Stream{
414+
Stream: Stream{
415415
id: 0, // irrelevant
416416
ctx: ctx,
417417
requestRead: func(int) {},
418-
buf: newRecvBuffer(),
419418
method: req.URL.Path,
420419
recvCompress: req.Header.Get("grpc-encoding"),
421420
contentSubtype: ht.contentSubtype,
@@ -424,8 +423,9 @@ func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream
424423
st: ht,
425424
headerWireLength: 0, // won't have access to header wire length until golang/go#18997.
426425
}
427-
s.trReader = &transportReader{
428-
reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf},
426+
s.Stream.buf.init()
427+
s.trReader = transportReader{
428+
reader: recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: &s.buf},
429429
windowHandler: func(int) {},
430430
}
431431

internal/transport/http2_client.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -481,30 +481,30 @@ func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
481481
func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *ClientStream {
482482
// TODO(zhaoq): Handle uint32 overflow of Stream.id.
483483
s := &ClientStream{
484-
Stream: &Stream{
484+
Stream: Stream{
485485
method: callHdr.Method,
486486
sendCompress: callHdr.SendCompress,
487-
buf: newRecvBuffer(),
488487
contentSubtype: callHdr.ContentSubtype,
489488
},
490489
ct: t,
491490
done: make(chan struct{}),
492491
headerChan: make(chan struct{}),
493492
doneFunc: callHdr.DoneFunc,
494493
}
495-
s.wq = newWriteQuota(defaultWriteQuota, s.done)
494+
s.Stream.buf.init()
495+
s.Stream.wq.init(defaultWriteQuota, s.done)
496496
s.requestRead = func(n int) {
497497
t.adjustWindow(s, uint32(n))
498498
}
499499
// The client side stream context should have exactly the same life cycle with the user provided context.
500500
// That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
501501
// So we use the original context here instead of creating a copy.
502502
s.ctx = ctx
503-
s.trReader = &transportReader{
504-
reader: &recvBufferReader{
503+
s.trReader = transportReader{
504+
reader: recvBufferReader{
505505
ctx: s.ctx,
506506
ctxDone: s.ctx.Done(),
507-
recv: s.buf,
507+
recv: &s.buf,
508508
closeStream: func(err error) {
509509
s.Close(err)
510510
},
@@ -823,7 +823,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*ClientS
823823
return nil
824824
},
825825
onOrphaned: cleanup,
826-
wq: s.wq,
826+
wq: &s.wq,
827827
}
828828
firstTry := true
829829
var ch chan struct{}
@@ -854,7 +854,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*ClientS
854854
transportDrainRequired = t.nextID > MaxStreamID
855855

856856
s.id = hdr.streamID
857-
s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
857+
s.fc = inFlow{limit: uint32(t.initialWindowSize)}
858858
t.activeStreams[s.id] = s
859859
t.mu.Unlock()
860860

internal/transport/http2_server.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -390,16 +390,15 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
390390
}
391391
t.maxStreamID = streamID
392392

393-
buf := newRecvBuffer()
394393
s := &ServerStream{
395-
Stream: &Stream{
396-
id: streamID,
397-
buf: buf,
398-
fc: &inFlow{limit: uint32(t.initialWindowSize)},
394+
Stream: Stream{
395+
id: streamID,
396+
fc: inFlow{limit: uint32(t.initialWindowSize)},
399397
},
400398
st: t,
401399
headerWireLength: int(frame.Header().Length),
402400
}
401+
s.Stream.buf.init()
403402
var (
404403
// if false, content-type was missing or invalid
405404
isGRPC = false
@@ -644,12 +643,12 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
644643
t.adjustWindow(s, uint32(n))
645644
}
646645
s.ctxDone = s.ctx.Done()
647-
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
648-
s.trReader = &transportReader{
649-
reader: &recvBufferReader{
646+
s.Stream.wq.init(defaultWriteQuota, s.ctxDone)
647+
s.trReader = transportReader{
648+
reader: recvBufferReader{
650649
ctx: s.ctx,
651650
ctxDone: s.ctxDone,
652-
recv: s.buf,
651+
recv: &s.buf,
653652
},
654653
windowHandler: func(n int) {
655654
t.updateWindow(s, uint32(n))
@@ -658,7 +657,7 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
658657
// Register the stream with loopy.
659658
t.controlBuf.put(&registerStream{
660659
streamID: s.id,
661-
wq: s.wq,
660+
wq: &s.wq,
662661
})
663662
handle(s)
664663
return nil

internal/transport/server_stream.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import (
3232

3333
// ServerStream implements streaming functionality for a gRPC server.
3434
type ServerStream struct {
35-
*Stream // Embed for common stream functionality.
35+
Stream // Embed for common stream functionality.
3636

3737
st internalServerTransport
3838
ctxDone <-chan struct{} // closed at the end of stream. Cache of ctx.Done() (for performance)
@@ -43,12 +43,13 @@ type ServerStream struct {
4343
// Holds compressor names passed in grpc-accept-encoding metadata from the
4444
// client.
4545
clientAdvertisedCompressors string
46-
headerWireLength int
4746

4847
// hdrMu protects outgoing header and trailer metadata.
4948
hdrMu sync.Mutex
5049
header metadata.MD // the outgoing header metadata. Updated by WriteHeader.
5150
headerSent atomic.Bool // atomically set when the headers are sent out.
51+
52+
headerWireLength int
5253
}
5354

5455
// Read reads an n byte message from the input stream.

internal/transport/transport.go

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,11 @@ type recvBuffer struct {
6868
err error
6969
}
7070

71-
func newRecvBuffer() *recvBuffer {
72-
b := &recvBuffer{
73-
c: make(chan recvMsg, 1),
74-
}
75-
return b
71+
// init allows a recvBuffer to be initialized in-place, which is useful
72+
// for resetting a buffer or for avoiding a heap allocation when the buffer
73+
// is embedded in another struct.
74+
func (b *recvBuffer) init() {
75+
b.c = make(chan recvMsg, 1)
7676
}
7777

7878
func (b *recvBuffer) put(r recvMsg) {
@@ -123,6 +123,7 @@ func (b *recvBuffer) get() <-chan recvMsg {
123123
// recvBufferReader implements io.Reader interface to read the data from
124124
// recvBuffer.
125125
type recvBufferReader struct {
126+
_ noCopy
126127
closeStream func(error) // Closes the client transport stream with the given error and nil trailer metadata.
127128
ctx context.Context
128129
ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
@@ -285,27 +286,28 @@ const (
285286

286287
// Stream represents an RPC in the transport layer.
287288
type Stream struct {
288-
id uint32
289289
ctx context.Context // the associated context of the stream
290290
method string // the associated RPC method of the stream
291291
recvCompress string
292292
sendCompress string
293-
buf *recvBuffer
294-
trReader *transportReader
295-
fc *inFlow
296-
wq *writeQuota
297293

298294
// Callback to state application's intentions to read data. This
299295
// is used to adjust flow control, if needed.
300296
requestRead func(int)
301297

302-
state streamState
303-
304298
// contentSubtype is the content-subtype for requests.
305299
// this must be lowercase or the behavior is undefined.
306300
contentSubtype string
307301

308302
trailer metadata.MD // the key-value map of trailer metadata.
303+
304+
// Non-pointer fields are at the end to optimize GC performance.
305+
state streamState
306+
id uint32
307+
buf recvBuffer
308+
trReader transportReader
309+
fc inFlow
310+
wq writeQuota
309311
}
310312

311313
func (s *Stream) swapState(st streamState) streamState {
@@ -401,16 +403,28 @@ func (s *Stream) read(n int) (data mem.BufferSlice, err error) {
401403
return data, nil
402404
}
403405

406+
// noCopy may be embedded into structs which must not be copied
407+
// after the first use.
408+
//
409+
// See https://golang.org/issues/8005#issuecomment-190753527
410+
// for details.
411+
type noCopy struct {
412+
}
413+
414+
func (*noCopy) Lock() {}
415+
func (*noCopy) Unlock() {}
416+
404417
// transportReader reads all the data available for this Stream from the transport and
405418
// passes them into the decoder, which converts them into a gRPC message stream.
406419
// The error is io.EOF when the stream is done or another non-nil error if
407420
// the stream broke.
408421
type transportReader struct {
409-
reader *recvBufferReader
422+
_ noCopy
410423
// The handler to control the window update procedure for both this
411424
// particular stream and the associated transport.
412425
windowHandler func(int)
413426
er error
427+
reader recvBufferReader
414428
}
415429

416430
func (t *transportReader) ReadMessageHeader(header []byte) (int, error) {

0 commit comments

Comments
 (0)