Skip to content

Commit

Permalink
xds: simplify code handling certain error conditions in the resolver (g…
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars authored Feb 26, 2025
1 parent feaf942 commit e9c0617
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 40 deletions.
14 changes: 8 additions & 6 deletions xds/internal/resolver/serviceconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ type xdsClusterManagerConfig struct {
Children map[string]xdsChildConfig `json:"children"`
}

// serviceConfigJSON produces a service config in JSON format representing all
// the clusters referenced in activeClusters. This includes clusters with zero
// references, so they must be pruned first.
func serviceConfigJSON(activeClusters map[string]*clusterInfo) ([]byte, error) {
// serviceConfigJSON produces a service config in JSON format that contains LB
// policy config for the "xds_cluster_manager" LB policy, with entries in the
// children map for all active clusters.
func serviceConfigJSON(activeClusters map[string]*clusterInfo) []byte {
// Generate children (all entries in activeClusters).
children := make(map[string]xdsChildConfig)
for cluster, ci := range activeClusters {
Expand All @@ -87,11 +87,13 @@ func serviceConfigJSON(activeClusters map[string]*clusterInfo) ([]byte, error) {
),
}

// This is not expected to fail as we have constructed the service config by
// hand right above, and therefore ok to panic.
bs, err := json.Marshal(sc)
if err != nil {
return nil, fmt.Errorf("failed to marshal json: %v", err)
panic(fmt.Sprintf("failed to marshal service config %+v: %v", sc, err))
}
return bs, nil
return bs
}

type virtualHost struct {
Expand Down
43 changes: 18 additions & 25 deletions xds/internal/resolver/xds_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,7 @@ func (r *xdsResolver) Close() {
// sendNewServiceConfig prunes active clusters, generates a new service config
// based on the current set of active clusters, and sends an update to the
// channel with that service config and the provided config selector. Returns
// false if an error occurs while generating the service config and the update
// cannot be sent.
// false if an error occurs while sending an update to the channel.
//
// Only executed in the context of a serializer callback.
func (r *xdsResolver) sendNewServiceConfig(cs *configSelector) bool {
Expand All @@ -295,24 +294,28 @@ func (r *xdsResolver) sendNewServiceConfig(cs *configSelector) bool {
// There are no clusters and we are sending a failing configSelector.
// Send an empty config, which picks pick-first, with no address, and
// puts the ClientConn into transient failure.
r.cc.UpdateState(resolver.State{ServiceConfig: r.cc.ParseServiceConfig("{}")})
if err := r.cc.UpdateState(resolver.State{ServiceConfig: r.cc.ParseServiceConfig("{}")}); err != nil {
if r.logger.V(2) {
r.logger.Infof("Channel rejected new state (with empty service config) with error: %v", err)
}
return false
}
return true
}

sc, err := serviceConfigJSON(r.activeClusters)
if err != nil {
// JSON marshal error; should never happen.
r.logger.Errorf("For Listener resource %q and RouteConfiguration resource %q, failed to marshal newly built service config: %v", r.ldsResourceName, r.rdsResourceName, err)
r.cc.ReportError(err)
return false
}
sc := serviceConfigJSON(r.activeClusters)
r.logger.Infof("For Listener resource %q and RouteConfiguration resource %q, generated service config: %v", r.ldsResourceName, r.rdsResourceName, pretty.FormatJSON(sc))

// Send the update to the ClientConn.
state := iresolver.SetConfigSelector(resolver.State{
ServiceConfig: r.cc.ParseServiceConfig(string(sc)),
}, cs)
r.cc.UpdateState(xdsclient.SetClient(state, r.xdsClient))
if err := r.cc.UpdateState(xdsclient.SetClient(state, r.xdsClient)); err != nil {
if r.logger.V(2) {
r.logger.Infof("Channel rejected new state: %+v with error: %v", state, err)
}
return false
}
return true
}

Expand All @@ -321,7 +324,7 @@ func (r *xdsResolver) sendNewServiceConfig(cs *configSelector) bool {
// r.activeClusters for previously-unseen clusters.
//
// Only executed in the context of a serializer callback.
func (r *xdsResolver) newConfigSelector() (*configSelector, error) {
func (r *xdsResolver) newConfigSelector() *configSelector {
cs := &configSelector{
r: r,
virtualHost: virtualHost{
Expand Down Expand Up @@ -357,11 +360,7 @@ func (r *xdsResolver) newConfigSelector() (*configSelector, error) {
}
cs.routes[i].clusters = clusters

var err error
cs.routes[i].m, err = xdsresource.RouteToMatcher(rt)
if err != nil {
return nil, err
}
cs.routes[i].m = xdsresource.RouteToMatcher(rt)
cs.routes[i].actionType = rt.ActionType
if rt.MaxStreamDuration == nil {
cs.routes[i].maxStreamDuration = r.currentListener.MaxStreamDuration
Expand All @@ -381,7 +380,7 @@ func (r *xdsResolver) newConfigSelector() (*configSelector, error) {
atomic.AddInt32(&ci.refCount, 1)
}

return cs, nil
return cs
}

// pruneActiveClusters deletes entries in r.activeClusters with zero
Expand Down Expand Up @@ -437,13 +436,7 @@ func (r *xdsResolver) onResolutionComplete() {
return
}

cs, err := r.newConfigSelector()
if err != nil {
r.logger.Warningf("Failed to build a config selector for resource %q: %v", r.ldsResourceName, err)
r.cc.ReportError(err)
return
}

cs := r.newConfigSelector()
if !r.sendNewServiceConfig(cs) {
// JSON error creating the service config (unexpected); erase
// this config selector and ignore this update, continuing with
Expand Down
6 changes: 1 addition & 5 deletions xds/internal/xdsclient/xdsresource/filter_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,8 @@ func (fc *FilterChain) ConstructUsableRouteConfiguration(config RouteConfigUpdat
func (fc *FilterChain) convertVirtualHost(virtualHost *VirtualHost) (VirtualHostWithInterceptors, error) {
rs := make([]RouteWithInterceptors, len(virtualHost.Routes))
for i, r := range virtualHost.Routes {
var err error
rs[i].ActionType = r.ActionType
rs[i].M, err = RouteToMatcher(r)
if err != nil {
return VirtualHostWithInterceptors{}, fmt.Errorf("matcher construction: %v", err)
}
rs[i].M = RouteToMatcher(r)
for _, filter := range fc.HTTPFilters {
// Route is highest priority on server side, as there is no concept
// of an upstream cluster on server side.
Expand Down
11 changes: 7 additions & 4 deletions xds/internal/xdsclient/xdsresource/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ import (
)

// RouteToMatcher converts a route to a Matcher to match incoming RPC's against.
func RouteToMatcher(r *Route) (*CompositeMatcher, error) {
//
// Only expected to be called on a Route that passed validation checks by the
// xDS client.
func RouteToMatcher(r *Route) *CompositeMatcher {
var pm pathMatcher
switch {
case r.Regex != nil:
Expand All @@ -39,7 +42,7 @@ func RouteToMatcher(r *Route) (*CompositeMatcher, error) {
case r.Prefix != nil:
pm = newPathPrefixMatcher(*r.Prefix, r.CaseInsensitive)
default:
return nil, fmt.Errorf("illegal route: missing path_matcher")
panic("illegal route: missing path_matcher")
}

headerMatchers := make([]matcher.HeaderMatcher, 0, len(r.Headers))
Expand All @@ -62,7 +65,7 @@ func RouteToMatcher(r *Route) (*CompositeMatcher, error) {
case h.StringMatch != nil:
matcherT = matcher.NewHeaderStringMatcher(h.Name, *h.StringMatch, invert)
default:
return nil, fmt.Errorf("illegal route: missing header_match_specifier")
panic("illegal route: missing header_match_specifier")
}
headerMatchers = append(headerMatchers, matcherT)
}
Expand All @@ -71,7 +74,7 @@ func RouteToMatcher(r *Route) (*CompositeMatcher, error) {
if r.Fraction != nil {
fractionMatcher = newFractionMatcher(*r.Fraction)
}
return newCompositeMatcher(pm, headerMatchers, fractionMatcher), nil
return newCompositeMatcher(pm, headerMatchers, fractionMatcher)
}

// CompositeMatcher is a matcher that holds onto many matchers and aggregates
Expand Down

0 comments on commit e9c0617

Please sign in to comment.