Skip to content

Commit cda4583

Browse files
author
k8s-merge-robot
committed
Merge pull request kubernetes#23035 from xinxiaogang/xnxin-master
Auto commit by PR queue bot
2 parents f1c7bd0 + f5c631e commit cda4583

File tree

2 files changed

+32
-0
lines changed

2 files changed

+32
-0
lines changed

pkg/controller/endpoint/endpoints_controller.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ const (
4747
// often. Higher numbers = lower CPU/network load; lower numbers =
4848
// shorter amount of time before a mistaken endpoint is corrected.
4949
FullServiceResyncPeriod = 30 * time.Second
50+
51+
// We must avoid syncing service until the pod store has synced. If it hasn't synced, to
52+
// avoid a hot loop, we'll wait this long between checks.
53+
PodStoreSyncedPollPeriod = 100 * time.Millisecond
5054
)
5155

5256
var (
@@ -98,6 +102,7 @@ func NewEndpointController(client *clientset.Clientset, resyncPeriod controller.
98102
DeleteFunc: e.deletePod,
99103
},
100104
)
105+
e.podStoreSynced = e.podController.HasSynced
101106

102107
return e
103108
}
@@ -120,6 +125,9 @@ type EndpointController struct {
120125
// controllers.
121126
serviceController *framework.Controller
122127
podController *framework.Controller
128+
// podStoreSynced returns true if the pod store has been synced at least once.
129+
// Added as a member to the struct to allow injection for testing.
130+
podStoreSynced func() bool
123131
}
124132

125133
// Runs e; will not return until stopCh is closed. workers determines how many
@@ -268,6 +276,15 @@ func (e *EndpointController) syncService(key string) {
268276
defer func() {
269277
glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Now().Sub(startTime))
270278
}()
279+
280+
if !e.podStoreSynced() {
281+
// Sleep so we give the pod reflector goroutine a chance to run.
282+
time.Sleep(PodStoreSyncedPollPeriod)
283+
glog.Infof("Waiting for pods controller to sync, requeuing rc %v", key)
284+
e.queue.Add(key)
285+
return
286+
}
287+
271288
obj, exists, err := e.serviceStore.Store.GetByKey(key)
272289
if err != nil || !exists {
273290
// Delete the corresponding endpoint, as the service has been deleted.

pkg/controller/endpoint/endpoints_controller_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ import (
3636
utiltesting "k8s.io/kubernetes/pkg/util/testing"
3737
)
3838

39+
var alwaysReady = func() bool { return true }
40+
3941
func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int) {
4042
for i := 0; i < nPods+nNotReady; i++ {
4143
p := &api.Pod{
@@ -107,6 +109,7 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
107109
// defer testServer.Close()
108110
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
109111
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
112+
endpoints.podStoreSynced = alwaysReady
110113
endpoints.serviceStore.Store.Add(&api.Service{
111114
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
112115
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Port: 80}}},
@@ -140,6 +143,7 @@ func TestCheckLeftoverEndpoints(t *testing.T) {
140143
// defer testServer.Close()
141144
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
142145
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
146+
endpoints.podStoreSynced = alwaysReady
143147
endpoints.checkLeftoverEndpoints()
144148

145149
if e, a := 1, endpoints.queue.Len(); e != a {
@@ -169,6 +173,8 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) {
169173
// defer testServer.Close()
170174
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
171175
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
176+
endpoints.podStoreSynced = alwaysReady
177+
172178
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
173179
endpoints.serviceStore.Store.Add(&api.Service{
174180
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
@@ -211,6 +217,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) {
211217
// defer testServer.Close()
212218
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
213219
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
220+
endpoints.podStoreSynced = alwaysReady
214221
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
215222
endpoints.serviceStore.Store.Add(&api.Service{
216223
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
@@ -250,6 +257,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
250257
// defer testServer.Close()
251258
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
252259
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
260+
endpoints.podStoreSynced = alwaysReady
253261
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
254262
endpoints.serviceStore.Store.Add(&api.Service{
255263
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
@@ -288,6 +296,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
288296
// defer testServer.Close()
289297
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
290298
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
299+
endpoints.podStoreSynced = alwaysReady
291300
addPods(endpoints.podStore.Store, ns, 0, 1, 1)
292301
endpoints.serviceStore.Store.Add(&api.Service{
293302
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
@@ -326,6 +335,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
326335
// defer testServer.Close()
327336
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
328337
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
338+
endpoints.podStoreSynced = alwaysReady
329339
addPods(endpoints.podStore.Store, ns, 1, 1, 1)
330340
endpoints.serviceStore.Store.Add(&api.Service{
331341
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
@@ -368,6 +378,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
368378
// defer testServer.Close()
369379
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
370380
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
381+
endpoints.podStoreSynced = alwaysReady
371382
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
372383
endpoints.serviceStore.Store.Add(&api.Service{
373384
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
@@ -409,6 +420,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
409420
// defer testServer.Close()
410421
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
411422
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
423+
endpoints.podStoreSynced = alwaysReady
412424
addPods(endpoints.podStore.Store, api.NamespaceDefault, 1, 1, 0)
413425
endpoints.serviceStore.Store.Add(&api.Service{
414426
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
@@ -429,6 +441,7 @@ func TestSyncEndpointsItems(t *testing.T) {
429441
// defer testServer.Close()
430442
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
431443
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
444+
endpoints.podStoreSynced = alwaysReady
432445
addPods(endpoints.podStore.Store, ns, 3, 2, 0)
433446
addPods(endpoints.podStore.Store, "blah", 5, 2, 0) // make sure these aren't found!
434447
endpoints.serviceStore.Store.Add(&api.Service{
@@ -472,6 +485,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) {
472485
// defer testServer.Close()
473486
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
474487
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
488+
endpoints.podStoreSynced = alwaysReady
475489
addPods(endpoints.podStore.Store, ns, 3, 2, 0)
476490
serviceLabels := map[string]string{"foo": "bar"}
477491
endpoints.serviceStore.Store.Add(&api.Service{
@@ -533,6 +547,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
533547
// defer testServer.Close()
534548
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
535549
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
550+
endpoints.podStoreSynced = alwaysReady
536551
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
537552
serviceLabels := map[string]string{"baz": "blah"}
538553
endpoints.serviceStore.Store.Add(&api.Service{

0 commit comments

Comments
 (0)