Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 37 additions & 44 deletions internal/xds/resolver/serviceconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,28 +97,23 @@ func serviceConfigJSON(activeClusters map[string]*clusterInfo) []byte {
}

type virtualHost struct {
// map from filter name to its config
httpFilterConfigOverride map[string]httpfilter.FilterConfig
// retry policy present in virtual host
retryConfig *xdsresource.RetryConfig
}

// routeCluster holds information about a cluster as referenced by a route.
type routeCluster struct {
name string
// map from filter name to its config
httpFilterConfigOverride map[string]httpfilter.FilterConfig
name string // Name of the cluster.
interceptor iresolver.ClientInterceptor // HTTP filters to run for RPCs matching this route.
}

type route struct {
m *xdsresource.CompositeMatcher // converted from route matchers
actionType xdsresource.RouteActionType // holds route action type
clusters wrr.WRR // holds *routeCluster entries
maxStreamDuration time.Duration
// map from filter name to its config
httpFilterConfigOverride map[string]httpfilter.FilterConfig
retryConfig *xdsresource.RetryConfig
hashPolicies []*xdsresource.HashPolicy
retryConfig *xdsresource.RetryConfig
hashPolicies []*xdsresource.HashPolicy
}

func (r route) String() string {
Expand Down Expand Up @@ -200,11 +195,6 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP
ref := &cs.clusters[cluster.name].refCount
atomic.AddInt32(ref, 1)

interceptor, err := cs.newInterceptor(rt, cluster)
if err != nil {
return nil, annotateErrorWithNodeID(err, cs.xdsNodeID)
}

lbCtx := clustermanager.SetPickedCluster(rpcInfo.Context, cluster.name)
lbCtx = iringhash.SetXDSRequestHash(lbCtx, cs.generateHash(rpcInfo, rt.hashPolicies))

Expand All @@ -220,7 +210,7 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP
cs.sendNewServiceConfig()
}
},
Interceptor: interceptor,
Interceptor: cluster.interceptor,
}

if rt.maxStreamDuration != 0 {
Expand Down Expand Up @@ -310,35 +300,6 @@ func (cs *configSelector) generateHash(rpcInfo iresolver.RPCInfo, hashPolicies [
return rand.Uint64()
}

func (cs *configSelector) newInterceptor(rt *route, cluster *routeCluster) (iresolver.ClientInterceptor, error) {
if len(cs.httpFilterConfig) == 0 {
return nil, nil
}
interceptors := make([]iresolver.ClientInterceptor, 0, len(cs.httpFilterConfig))
for _, filter := range cs.httpFilterConfig {
override := cluster.httpFilterConfigOverride[filter.Name] // cluster is highest priority
if override == nil {
override = rt.httpFilterConfigOverride[filter.Name] // route is second priority
}
if override == nil {
override = cs.virtualHost.httpFilterConfigOverride[filter.Name] // VH is third & lowest priority
}
ib, ok := filter.Filter.(httpfilter.ClientInterceptorBuilder)
if !ok {
// Should not happen if it passed xdsClient validation.
return nil, fmt.Errorf("filter does not support use in client")
}
i, err := ib.BuildClientInterceptor(filter.Config, override)
if err != nil {
return nil, fmt.Errorf("error constructing filter: %v", err)
}
if i != nil {
interceptors = append(interceptors, i)
}
}
return &interceptorList{interceptors: interceptors}, nil
}

// stop decrements refs of all clusters referenced by this config selector.
func (cs *configSelector) stop() {
// The resolver's old configSelector may be nil. Handle that here.
Expand All @@ -363,6 +324,38 @@ func (cs *configSelector) stop() {
}
}

// newInterceptor builds a chain of client interceptors for the given filters
// and override configuration. The cluster override has the highest priority,
// followed by the route override, and finally the virtual host override.
func newInterceptor(filters []xdsresource.HTTPFilter, clusterOverride, routeOverride, virtualHostOverride map[string]httpfilter.FilterConfig) (iresolver.ClientInterceptor, error) {
if len(filters) == 0 {
return nil, nil
}
interceptors := make([]iresolver.ClientInterceptor, 0, len(filters))
for _, filter := range filters {
override := clusterOverride[filter.Name]
if override == nil {
override = routeOverride[filter.Name]
}
if override == nil {
override = virtualHostOverride[filter.Name]
}
ib, ok := filter.Filter.(httpfilter.ClientInterceptorBuilder)
if !ok {
// Should not happen if it passed xdsClient validation.
return nil, fmt.Errorf("filter %q does not support use in client", filter.Name)
}
i, err := ib.BuildClientInterceptor(filter.Config, override)
if err != nil {
return nil, fmt.Errorf("error constructing filter: %v", err)
}
if i != nil {
interceptors = append(interceptors, i)
}
}
return &interceptorList{interceptors: interceptors}, nil
}

type interceptorList struct {
interceptors []iresolver.ClientInterceptor
}
Expand Down
Loading