Skip to content

Commit

Permalink
Merge pull request kubernetes#257 from cmssczy/queue-workload-info
Browse files Browse the repository at this point in the history
do workload.NewInfo when inserting and updating
  • Loading branch information
k8s-ci-robot authored May 20, 2022
2 parents a4703d3 + 8385f9f commit afabae9
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 37 deletions.
10 changes: 5 additions & 5 deletions pkg/queue/cluster_queue_best_effort_fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,21 @@ func newClusterQueueBestEffortFIFO(cq *kueue.ClusterQueue) (ClusterQueue, error)
return cqBE, nil
}

func (cq *ClusterQueueBestEffortFIFO) PushOrUpdate(w *kueue.Workload) {
key := workload.Key(w)
func (cq *ClusterQueueBestEffortFIFO) PushOrUpdate(wInfo *workload.Info) {
key := workload.Key(wInfo.Obj)
oldInfo := cq.inadmissibleWorkloads[key]
if oldInfo != nil {
// update in place if the workload was inadmissible and didn't change
// to potentially become admissible.
if equality.Semantic.DeepEqual(oldInfo.Obj.Spec, w.Spec) {
cq.inadmissibleWorkloads[key] = workload.NewInfo(w)
if equality.Semantic.DeepEqual(oldInfo.Obj.Spec, wInfo.Obj.Spec) {
cq.inadmissibleWorkloads[key] = wInfo
return
}
// otherwise move or update in place in the queue.
delete(cq.inadmissibleWorkloads, key)
}

cq.ClusterQueueImpl.PushOrUpdate(w)
cq.ClusterQueueImpl.PushOrUpdate(wInfo)
}

func (cq *ClusterQueueBestEffortFIFO) Delete(w *kueue.Workload) {
Expand Down
14 changes: 8 additions & 6 deletions pkg/queue/cluster_queue_best_effort_fifo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestClusterQueueBestEffortFIFO(t *testing.T) {
}

for _, w := range test.workloadsToAdd {
cq.PushOrUpdate(w)
cq.PushOrUpdate(workload.NewInfo(w))
}

for _, w := range test.inadmissibleWorkloadsToRequeue {
Expand All @@ -114,7 +114,7 @@ func TestClusterQueueBestEffortFIFO(t *testing.T) {
}

for _, w := range test.workloadsToUpdate {
cq.PushOrUpdate(w)
cq.PushOrUpdate(workload.NewInfo(w))
}

for _, w := range test.workloadsToDelete {
Expand Down Expand Up @@ -152,13 +152,15 @@ func TestDeleteFromQueue(t *testing.T) {
inadmissibleWorkloads := []*kueue.Workload{wl3, wl4}

for _, w := range admissibleworkloads {
cqImpl.PushOrUpdate(w)
qImpl.AddOrUpdate(w)
wInfo := workload.NewInfo(w)
cqImpl.PushOrUpdate(wInfo)
qImpl.AddOrUpdate(wInfo)
}

for _, w := range inadmissibleWorkloads {
cqImpl.RequeueIfNotPresent(workload.NewInfo(w), false)
qImpl.AddOrUpdate(w)
wInfo := workload.NewInfo(w)
cqImpl.RequeueIfNotPresent(wInfo, false)
qImpl.AddOrUpdate(wInfo)
}

wantPending := len(admissibleworkloads) + len(inadmissibleWorkloads)
Expand Down
3 changes: 1 addition & 2 deletions pkg/queue/cluster_queue_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ func (c *ClusterQueueImpl) pushIfNotPresent(info *workload.Info) bool {
return c.heap.PushIfNotPresent(info)
}

func (c *ClusterQueueImpl) PushOrUpdate(w *kueue.Workload) {
info := workload.NewInfo(w)
func (c *ClusterQueueImpl) PushOrUpdate(info *workload.Info) {
c.heap.PushOrUpdate(info)
}

Expand Down
20 changes: 10 additions & 10 deletions pkg/queue/cluster_queue_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ func Test_PushOrUpdate(t *testing.T) {
if cq.Pending() != 0 {
t.Error("ClusterQueue should be empty")
}
cq.PushOrUpdate(wl)
cq.PushOrUpdate(workload.NewInfo(wl))
if cq.Pending() != 1 {
t.Error("ClusterQueue should have one workload")
}

// Just used to validate the update operation.
wl.ResourceVersion = "1"
cq.PushOrUpdate(wl)
cq.PushOrUpdate(workload.NewInfo(wl))
newWl := cq.Pop()
if cq.Pending() != 0 || newWl.Obj.ResourceVersion != "1" {
t.Error("failed to update a workload in ClusterQueue")
Expand All @@ -52,8 +52,8 @@ func Test_PushOrUpdate(t *testing.T) {
func Test_Pop(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, byCreationTime)
now := time.Now()
wl1 := utiltesting.MakeWorkload("workload-1", defaultNamespace).Creation(now).Obj()
wl2 := utiltesting.MakeWorkload("workload-2", defaultNamespace).Creation(now.Add(time.Second)).Obj()
wl1 := workload.NewInfo(utiltesting.MakeWorkload("workload-1", defaultNamespace).Creation(now).Obj())
wl2 := workload.NewInfo(utiltesting.MakeWorkload("workload-2", defaultNamespace).Creation(now.Add(time.Second)).Obj())
if cq.Pop() != nil {
t.Error("ClusterQueue should be empty")
}
Expand All @@ -76,8 +76,8 @@ func Test_Delete(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, byCreationTime)
wl1 := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj()
wl2 := utiltesting.MakeWorkload("workload-2", defaultNamespace).Obj()
cq.PushOrUpdate(wl1)
cq.PushOrUpdate(wl2)
cq.PushOrUpdate(workload.NewInfo(wl1))
cq.PushOrUpdate(workload.NewInfo(wl2))
if cq.Pending() != 2 {
t.Error("ClusterQueue should have two workload")
}
Expand All @@ -95,8 +95,8 @@ func Test_Delete(t *testing.T) {

func Test_Dump(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, byCreationTime)
wl1 := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj()
wl2 := utiltesting.MakeWorkload("workload-2", defaultNamespace).Obj()
wl1 := workload.NewInfo(utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj())
wl2 := workload.NewInfo(utiltesting.MakeWorkload("workload-2", defaultNamespace).Obj())
if _, ok := cq.Dump(); ok {
t.Error("ClusterQueue should be empty")
}
Expand All @@ -113,7 +113,7 @@ func Test_Info(t *testing.T) {
if info := cq.Info(keyFunc(workload.NewInfo(wl))); info != nil {
t.Error("workload doesn't exist")
}
cq.PushOrUpdate(wl)
cq.PushOrUpdate(workload.NewInfo(wl))
if info := cq.Info(keyFunc(workload.NewInfo(wl))); info == nil {
t.Error("expected workload to exist")
}
Expand All @@ -127,7 +127,7 @@ func Test_AddFromQueue(t *testing.T) {
wl.Name: workload.NewInfo(wl),
},
}
cq.PushOrUpdate(wl)
cq.PushOrUpdate(workload.NewInfo(wl))
if added := cq.AddFromQueue(queue); added {
t.Error("expected workload not to be added")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/queue/cluster_queue_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type ClusterQueue interface {

// PushOrUpdate pushes the workload to ClusterQueue.
// If the workload is already present, updates with the new one.
PushOrUpdate(*kueue.Workload)
PushOrUpdate(*workload.Info)
// Delete removes the workload from ClusterQueue.
Delete(*kueue.Workload)
// Pop removes the head of the queue and returns it. It returns nil if the
Expand Down
10 changes: 6 additions & 4 deletions pkg/queue/cluster_queue_strict_fifo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/utils/pointer"

kueue "sigs.k8s.io/kueue/apis/kueue/v1alpha1"
"sigs.k8s.io/kueue/pkg/workload"
)

const (
Expand Down Expand Up @@ -62,7 +63,7 @@ func TestFIFOClusterQueue(t *testing.T) {
},
}
for _, w := range ws {
q.PushOrUpdate(w)
q.PushOrUpdate(workload.NewInfo(w))
}
got := q.Pop()
if got == nil {
Expand All @@ -71,12 +72,13 @@ func TestFIFOClusterQueue(t *testing.T) {
if got.Obj.Name != "before" {
t.Errorf("Popped workload %q want %q", got.Obj.Name, "before")
}
q.PushOrUpdate(&kueue.Workload{
wlInfo := workload.NewInfo(&kueue.Workload{
ObjectMeta: metav1.ObjectMeta{
Name: "after",
CreationTimestamp: metav1.NewTime(now.Add(-time.Minute)),
},
})
q.PushOrUpdate(wlInfo)
got = q.Pop()
if got == nil {
t.Fatal("Queue is empty")
Expand Down Expand Up @@ -178,8 +180,8 @@ func TestStrictFIFO(t *testing.T) {
t.Fatalf("Failed creating ClusterQueue %v", err)
}

q.PushOrUpdate(tt.w1)
q.PushOrUpdate(tt.w2)
q.PushOrUpdate(workload.NewInfo(tt.w1))
q.PushOrUpdate(workload.NewInfo(tt.w2))

got := q.Pop()
if got == nil {
Expand Down
12 changes: 6 additions & 6 deletions pkg/queue/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (m *Manager) AddQueue(ctx context.Context, q *kueue.Queue) error {
if w.Spec.QueueName != q.Name || w.Spec.Admission != nil {
continue
}
qImpl.AddOrUpdate(&w)
qImpl.AddOrUpdate(workload.NewInfo(&w))
}
cq := m.clusterQueues[qImpl.ClusterQueue]
if cq != nil && cq.AddFromQueue(qImpl) {
Expand Down Expand Up @@ -274,13 +274,14 @@ func (m *Manager) addOrUpdateWorkload(w *kueue.Workload) bool {
if q == nil {
return false
}
q.AddOrUpdate(w)
wInfo := workload.NewInfo(w)
q.AddOrUpdate(wInfo)
q.reportPendingWorkloads()
cq := m.clusterQueues[q.ClusterQueue]
if cq == nil {
return false
}
cq.PushOrUpdate(w)
cq.PushOrUpdate(wInfo)
m.Broadcast()
return true
}
Expand All @@ -304,15 +305,14 @@ func (m *Manager) RequeueWorkload(ctx context.Context, info *workload.Info, imme
if q == nil {
return false
}
q.AddOrUpdate(&w)
info.Update(&w)
q.AddOrUpdate(info)
q.reportPendingWorkloads()
cq := m.clusterQueues[q.ClusterQueue]
if cq == nil {
return false
}

// TODO(#162) Update the info object instead of constructing one.
info = workload.NewInfo(&w)
added := cq.RequeueIfNotPresent(info, immediate)
if added {
m.Broadcast()
Expand Down
6 changes: 3 additions & 3 deletions pkg/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ func (q *Queue) update(apiQueue *kueue.Queue) {
q.ClusterQueue = string(apiQueue.Spec.ClusterQueue)
}

func (q *Queue) AddOrUpdate(w *kueue.Workload) {
key := workload.Key(w)
q.items[key] = workload.NewInfo(w)
func (q *Queue) AddOrUpdate(info *workload.Info) {
key := workload.Key(info.Obj)
q.items[key] = info
}

func (q *Queue) AddIfNotPresent(w *workload.Info) bool {
Expand Down
4 changes: 4 additions & 0 deletions pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func NewInfo(w *kueue.Workload) *Info {
}
}

func (i *Info) Update(wl *kueue.Workload) {
i.Obj = wl
}

func Key(w *kueue.Workload) string {
return fmt.Sprintf("%s/%s", w.Namespace, w.Name)
}
Expand Down

0 comments on commit afabae9

Please sign in to comment.