Skip to content

Commit e2261d9

Browse files
authored
Merge pull request #1149 from FillZpp/use-protobuf-for-native-resources
⚠️ Use application/vnd.kubernetes.protobuf as content-type if possible
2 parents 3645df0 + 7c26bc0 commit e2261d9

File tree

4 files changed

+62
-17
lines changed

4 files changed

+62
-17
lines changed

pkg/cache/internal/informers_map.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformer
236236
return nil, err
237237
}
238238

239-
client, err := apiutil.RESTClientForGVK(gvk, ip.config, ip.codecs)
239+
client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs)
240240
if err != nil {
241241
return nil, err
242242
}

pkg/client/apiutil/apimachinery.go

+35-3
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,41 @@ package apiutil
2121

2222
import (
2323
"fmt"
24+
"sync"
2425

2526
"k8s.io/apimachinery/pkg/api/meta"
2627
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2728
"k8s.io/apimachinery/pkg/runtime"
2829
"k8s.io/apimachinery/pkg/runtime/schema"
2930
"k8s.io/apimachinery/pkg/runtime/serializer"
3031
"k8s.io/client-go/discovery"
32+
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
3133
"k8s.io/client-go/rest"
3234
"k8s.io/client-go/restmapper"
3335
)
3436

37+
var (
38+
protobufScheme = runtime.NewScheme()
39+
protobufSchemeLock sync.RWMutex
40+
)
41+
42+
func init() {
43+
// Currently only enabled for built-in resources which are guaranteed to implement Protocol Buffers.
44+
// For custom resources, CRDs can not support Protocol Buffers but Aggregated API can.
45+
// See doc: https://kubernetes.io/docs/concepts/extend-kubernetes/api-extension/custom-resources/#advanced-features-and-flexibility
46+
if err := clientgoscheme.AddToScheme(protobufScheme); err != nil {
47+
panic(err)
48+
}
49+
}
50+
51+
// AddToProtobufScheme add the given SchemeBuilder into protobufScheme, which should
52+
// be additional types that do support protobuf.
53+
func AddToProtobufScheme(builder runtime.SchemeBuilder) error {
54+
protobufSchemeLock.Lock()
55+
defer protobufSchemeLock.Unlock()
56+
return builder.AddToScheme(protobufScheme)
57+
}
58+
3559
// NewDiscoveryRESTMapper constructs a new RESTMapper based on discovery
3660
// information fetched by a new client with the given config.
3761
func NewDiscoveryRESTMapper(c *rest.Config) (meta.RESTMapper, error) {
@@ -93,16 +117,16 @@ func GVKForObject(obj runtime.Object, scheme *runtime.Scheme) (schema.GroupVersi
93117
// RESTClientForGVK constructs a new rest.Interface capable of accessing the resource associated
94118
// with the given GroupVersionKind. The REST client will be configured to use the negotiated serializer from
95119
// baseConfig, if set, otherwise a default serializer will be set.
96-
func RESTClientForGVK(gvk schema.GroupVersionKind, baseConfig *rest.Config, codecs serializer.CodecFactory) (rest.Interface, error) {
97-
cfg := createRestConfig(gvk, baseConfig)
120+
func RESTClientForGVK(gvk schema.GroupVersionKind, isUnstructured bool, baseConfig *rest.Config, codecs serializer.CodecFactory) (rest.Interface, error) {
121+
cfg := createRestConfig(gvk, isUnstructured, baseConfig)
98122
if cfg.NegotiatedSerializer == nil {
99123
cfg.NegotiatedSerializer = serializer.WithoutConversionCodecFactory{CodecFactory: codecs}
100124
}
101125
return rest.RESTClientFor(cfg)
102126
}
103127

104128
//createRestConfig copies the base config and updates needed fields for a new rest config
105-
func createRestConfig(gvk schema.GroupVersionKind, baseConfig *rest.Config) *rest.Config {
129+
func createRestConfig(gvk schema.GroupVersionKind, isUnstructured bool, baseConfig *rest.Config) *rest.Config {
106130
gv := gvk.GroupVersion()
107131

108132
cfg := rest.CopyConfig(baseConfig)
@@ -115,5 +139,13 @@ func createRestConfig(gvk schema.GroupVersionKind, baseConfig *rest.Config) *res
115139
if cfg.UserAgent == "" {
116140
cfg.UserAgent = rest.DefaultKubernetesUserAgent()
117141
}
142+
// TODO(FillZpp): In the long run, we want to check discovery or something to make sure that this is actually true.
143+
if cfg.ContentType == "" && !isUnstructured {
144+
protobufSchemeLock.RLock()
145+
if protobufScheme.Recognizes(gvk) {
146+
cfg.ContentType = runtime.ContentTypeProtobuf
147+
}
148+
protobufSchemeLock.RUnlock()
149+
}
118150
return cfg
119151
}

pkg/client/client.go

+7-5
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,13 @@ func New(config *rest.Config, options Options) (Client, error) {
7272
}
7373

7474
clientcache := &clientCache{
75-
config: config,
76-
scheme: options.Scheme,
77-
mapper: options.Mapper,
78-
codecs: serializer.NewCodecFactory(options.Scheme),
79-
resourceByType: make(map[schema.GroupVersionKind]*resourceMeta),
75+
config: config,
76+
scheme: options.Scheme,
77+
mapper: options.Mapper,
78+
codecs: serializer.NewCodecFactory(options.Scheme),
79+
80+
structuredResourceByType: make(map[schema.GroupVersionKind]*resourceMeta),
81+
unstructuredResourceByType: make(map[schema.GroupVersionKind]*resourceMeta),
8082
}
8183

8284
rawMetaClient, err := metadata.NewForConfig(config)

pkg/client/client_cache.go

+19-8
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222

2323
"k8s.io/apimachinery/pkg/api/meta"
2424
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2526
"k8s.io/apimachinery/pkg/runtime"
2627
"k8s.io/apimachinery/pkg/runtime/schema"
2728
"k8s.io/apimachinery/pkg/runtime/serializer"
@@ -43,20 +44,22 @@ type clientCache struct {
4344
// codecs are used to create a REST client for a gvk
4445
codecs serializer.CodecFactory
4546

46-
// resourceByType caches type metadata
47-
resourceByType map[schema.GroupVersionKind]*resourceMeta
48-
mu sync.RWMutex
47+
// structuredResourceByType caches structured type metadata
48+
structuredResourceByType map[schema.GroupVersionKind]*resourceMeta
49+
// unstructuredResourceByType caches unstructured type metadata
50+
unstructuredResourceByType map[schema.GroupVersionKind]*resourceMeta
51+
mu sync.RWMutex
4952
}
5053

5154
// newResource maps obj to a Kubernetes Resource and constructs a client for that Resource.
5255
// If the object is a list, the resource represents the item's type instead.
53-
func (c *clientCache) newResource(gvk schema.GroupVersionKind, isList bool) (*resourceMeta, error) {
56+
func (c *clientCache) newResource(gvk schema.GroupVersionKind, isList, isUnstructured bool) (*resourceMeta, error) {
5457
if strings.HasSuffix(gvk.Kind, "List") && isList {
5558
// if this was a list, treat it as a request for the item's resource
5659
gvk.Kind = gvk.Kind[:len(gvk.Kind)-4]
5760
}
5861

59-
client, err := apiutil.RESTClientForGVK(gvk, c.config, c.codecs)
62+
client, err := apiutil.RESTClientForGVK(gvk, isUnstructured, c.config, c.codecs)
6063
if err != nil {
6164
return nil, err
6265
}
@@ -75,10 +78,18 @@ func (c *clientCache) getResource(obj runtime.Object) (*resourceMeta, error) {
7578
return nil, err
7679
}
7780

81+
_, isUnstructured := obj.(*unstructured.Unstructured)
82+
_, isUnstructuredList := obj.(*unstructured.UnstructuredList)
83+
isUnstructured = isUnstructured || isUnstructuredList
84+
7885
// It's better to do creation work twice than to not let multiple
7986
// people make requests at once
8087
c.mu.RLock()
81-
r, known := c.resourceByType[gvk]
88+
resourceByType := c.structuredResourceByType
89+
if isUnstructured {
90+
resourceByType = c.unstructuredResourceByType
91+
}
92+
r, known := resourceByType[gvk]
8293
c.mu.RUnlock()
8394

8495
if known {
@@ -88,11 +99,11 @@ func (c *clientCache) getResource(obj runtime.Object) (*resourceMeta, error) {
8899
// Initialize a new Client
89100
c.mu.Lock()
90101
defer c.mu.Unlock()
91-
r, err = c.newResource(gvk, meta.IsListType(obj))
102+
r, err = c.newResource(gvk, meta.IsListType(obj), isUnstructured)
92103
if err != nil {
93104
return nil, err
94105
}
95-
c.resourceByType[gvk] = r
106+
resourceByType[gvk] = r
96107
return r, err
97108
}
98109

0 commit comments

Comments
 (0)