Skip to content

Commit

Permalink
cleanup: rename fields for clarity (grpc#8043)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley authored Jan 28, 2025
1 parent b0e2ae9 commit 3409a56
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 169 deletions.
10 changes: 5 additions & 5 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error)
}
cc.dopts.defaultServiceConfig, _ = scpr.Config.(*ServiceConfig)
}
cc.mkp = cc.dopts.copts.KeepaliveParams
cc.keepaliveParams = cc.dopts.copts.KeepaliveParams

if err = cc.initAuthority(); err != nil {
return nil, err
Expand Down Expand Up @@ -623,7 +623,7 @@ type ClientConn struct {
balancerWrapper *ccBalancerWrapper // Always recreated whenever entering idle to simplify Close.
sc *ServiceConfig // Latest service config received from the resolver.
conns map[*addrConn]struct{} // Set to nil on close.
mkp keepalive.ClientParameters // May be updated upon receipt of a GoAway.
keepaliveParams keepalive.ClientParameters // May be updated upon receipt of a GoAway.
// firstResolveEvent is used to track whether the name resolver sent us at
// least one update. RPCs block on this event. May be accessed without mu
// if we know we cannot be asked to enter idle mode while accessing it (e.g.
Expand Down Expand Up @@ -1215,8 +1215,8 @@ func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
case transport.GoAwayTooManyPings:
v := 2 * ac.dopts.copts.KeepaliveParams.Time
ac.cc.mu.Lock()
if v > ac.cc.mkp.Time {
ac.cc.mkp.Time = v
if v > ac.cc.keepaliveParams.Time {
ac.cc.keepaliveParams.Time = v
}
ac.cc.mu.Unlock()
}
Expand Down Expand Up @@ -1312,7 +1312,7 @@ func (ac *addrConn) tryAllAddrs(ctx context.Context, addrs []resolver.Address, c
ac.mu.Lock()

ac.cc.mu.RLock()
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
ac.dopts.copts.KeepaliveParams = ac.cc.keepaliveParams
ac.cc.mu.RUnlock()

copts := ac.dopts.copts
Expand Down
2 changes: 1 addition & 1 deletion clientconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ func (s) TestClientUpdatesParamsAfterGoAway(t *testing.T) {
for {
time.Sleep(10 * time.Millisecond)
cc.mu.RLock()
v := cc.mkp.Time
v := cc.keepaliveParams.Time
cc.mu.RUnlock()
if v == 20*time.Second {
// Success
Expand Down
4 changes: 2 additions & 2 deletions dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type dialOptions struct {
chainUnaryInts []UnaryClientInterceptor
chainStreamInts []StreamClientInterceptor

cp Compressor
compressorV0 Compressor
dc Decompressor
bs internalbackoff.Strategy
block bool
Expand Down Expand Up @@ -258,7 +258,7 @@ func WithCodec(c Codec) DialOption {
// Deprecated: use UseCompressor instead. Will be supported throughout 1.x.
func WithCompressor(cp Compressor) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.cp = cp
o.compressorV0 = cp
})
}

Expand Down
10 changes: 5 additions & 5 deletions rpc_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (d *gzipDecompressor) Type() string {

// callInfo contains all related configuration and information about an RPC.
type callInfo struct {
compressorType string
compressorName string
failFast bool
maxReceiveMessageSize *int
maxSendMessageSize *int
Expand Down Expand Up @@ -222,7 +222,7 @@ type HeaderCallOption struct {

func (o HeaderCallOption) before(*callInfo) error { return nil }
func (o HeaderCallOption) after(_ *callInfo, attempt *csAttempt) {
*o.HeaderAddr, _ = attempt.s.Header()
*o.HeaderAddr, _ = attempt.transportStream.Header()
}

// Trailer returns a CallOptions that retrieves the trailer metadata
Expand All @@ -244,7 +244,7 @@ type TrailerCallOption struct {

func (o TrailerCallOption) before(*callInfo) error { return nil }
func (o TrailerCallOption) after(_ *callInfo, attempt *csAttempt) {
*o.TrailerAddr = attempt.s.Trailer()
*o.TrailerAddr = attempt.transportStream.Trailer()
}

// Peer returns a CallOption that retrieves peer information for a unary RPC.
Expand All @@ -266,7 +266,7 @@ type PeerCallOption struct {

func (o PeerCallOption) before(*callInfo) error { return nil }
func (o PeerCallOption) after(_ *callInfo, attempt *csAttempt) {
if x, ok := peer.FromContext(attempt.s.Context()); ok {
if x, ok := peer.FromContext(attempt.transportStream.Context()); ok {
*o.PeerAddr = *x
}
}
Expand Down Expand Up @@ -435,7 +435,7 @@ type CompressorCallOption struct {
}

func (o CompressorCallOption) before(c *callInfo) error {
c.compressorType = o.CompressorType
c.compressorName = o.CompressorType
return nil
}
func (o CompressorCallOption) after(*callInfo, *csAttempt) {}
Expand Down
14 changes: 7 additions & 7 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1645,10 +1645,10 @@ func (s *Server) processStreamingRPC(ctx context.Context, stream *transport.Serv
// If dc is set and matches the stream's compression, use it. Otherwise, try
// to find a matching registered compressor for decomp.
if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
ss.dc = s.opts.dc
ss.decompressorV0 = s.opts.dc
} else if rc != "" && rc != encoding.Identity {
ss.decomp = encoding.GetCompressor(rc)
if ss.decomp == nil {
ss.decompressorV1 = encoding.GetCompressor(rc)
if ss.decompressorV1 == nil {
st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
ss.s.WriteStatus(st)
return st.Err()
Expand All @@ -1660,12 +1660,12 @@ func (s *Server) processStreamingRPC(ctx context.Context, stream *transport.Serv
//
// NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
if s.opts.cp != nil {
ss.cp = s.opts.cp
ss.compressorV0 = s.opts.cp
ss.sendCompressorName = s.opts.cp.Type()
} else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
// Legacy compressor not specified; attempt to respond with same encoding.
ss.comp = encoding.GetCompressor(rc)
if ss.comp != nil {
ss.compressorV1 = encoding.GetCompressor(rc)
if ss.compressorV1 != nil {
ss.sendCompressorName = rc
}
}
Expand All @@ -1676,7 +1676,7 @@ func (s *Server) processStreamingRPC(ctx context.Context, stream *transport.Serv
}
}

ss.ctx = newContextWithRPCInfo(ss.ctx, false, ss.codec, ss.cp, ss.comp)
ss.ctx = newContextWithRPCInfo(ss.ctx, false, ss.codec, ss.compressorV0, ss.compressorV1)

if trInfo != nil {
trInfo.tr.LazyLog(&trInfo.firstLine, false)
Expand Down
Loading

0 comments on commit 3409a56

Please sign in to comment.