Skip to content

Commit

Permalink
Merge pull request #7696 from macsko/fix_data_race_while_setting_delt…
Browse files Browse the repository at this point in the history
…a_cluster_state_in_parallel

Fix data race while setting delta cluster state in parallel
  • Loading branch information
k8s-ci-robot authored Jan 15, 2025
2 parents 5cd491a + b36e387 commit adda3d4
Showing 1 changed file with 13 additions and 39 deletions.
52 changes: 13 additions & 39 deletions cluster-autoscaler/simulator/clustersnapshot/store/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,24 +248,6 @@ func (data *internalDeltaSnapshotData) addPod(pod *apiv1.Pod, nodeName string) e
return nil
}

func (data *internalDeltaSnapshotData) addPodToNode(pod *apiv1.Pod, ni *schedulerframework.NodeInfo) error {
ni.AddPod(pod)

// Maybe consider deleting from the list in the future. Maybe not.
data.clearCaches()
return nil
}

func (data *internalDeltaSnapshotData) addPodsToNode(pods []*apiv1.Pod, ni *schedulerframework.NodeInfo) error {
for _, pod := range pods {
ni.AddPod(pod)
}

// Maybe consider deleting from the list in the future. Maybe not.
data.clearCaches()
return nil
}

func (data *internalDeltaSnapshotData) removePod(namespace, name, nodeName string) error {
// This always clones node info, even if the pod is actually missing.
// Not sure if we mind, since removing non-existent pod
Expand Down Expand Up @@ -456,19 +438,17 @@ func (snapshot *DeltaSnapshotStore) AddSchedulerNodeInfo(nodeInfo *schedulerfram
}

// setClusterStatePodsSequential sets the pods in cluster state in a sequential way.
func (snapshot *DeltaSnapshotStore) setClusterStatePodsSequential(nodeInfos []*schedulerframework.NodeInfo, nodeNameToIdx map[string]int, scheduledPods []*apiv1.Pod) error {
func (snapshot *DeltaSnapshotStore) setClusterStatePodsSequential(nodeInfos []*schedulerframework.NodeInfo, nodeNameToIdx map[string]int, scheduledPods []*apiv1.Pod) {
for _, pod := range scheduledPods {
if nodeIdx, ok := nodeNameToIdx[pod.Spec.NodeName]; ok {
if err := snapshot.data.addPodToNode(pod, nodeInfos[nodeIdx]); err != nil {
return err
}
// Can add pod directly. Cache will be cleared afterwards.
nodeInfos[nodeIdx].AddPod(pod)
}
}
return nil
}

// setClusterStatePodsParallelized sets the pods in cluster state in parallel based on snapshot.parallelism value.
func (snapshot *DeltaSnapshotStore) setClusterStatePodsParallelized(nodeInfos []*schedulerframework.NodeInfo, nodeNameToIdx map[string]int, scheduledPods []*apiv1.Pod) error {
func (snapshot *DeltaSnapshotStore) setClusterStatePodsParallelized(nodeInfos []*schedulerframework.NodeInfo, nodeNameToIdx map[string]int, scheduledPods []*apiv1.Pod) {
podsForNode := make([][]*apiv1.Pod, len(nodeInfos))
for _, pod := range scheduledPods {
nodeIdx, ok := nodeNameToIdx[pod.Spec.NodeName]
Expand All @@ -479,16 +459,13 @@ func (snapshot *DeltaSnapshotStore) setClusterStatePodsParallelized(nodeInfos []
}

ctx := context.Background()
ctx, cancel := context.WithCancelCause(ctx)

workqueue.ParallelizeUntil(ctx, snapshot.parallelism, len(nodeInfos), func(nodeIdx int) {
err := snapshot.data.addPodsToNode(podsForNode[nodeIdx], nodeInfos[nodeIdx])
if err != nil {
cancel(err)
nodeInfo := nodeInfos[nodeIdx]
for _, pod := range podsForNode[nodeIdx] {
// Can add pod directly. Cache will be cleared afterwards.
nodeInfo.AddPod(pod)
}
})

return context.Cause(ctx)
}

// SetClusterState sets the cluster state.
Expand All @@ -507,19 +484,16 @@ func (snapshot *DeltaSnapshotStore) SetClusterState(nodes []*apiv1.Node, schedul
}

if snapshot.parallelism > 1 {
err := snapshot.setClusterStatePodsParallelized(nodeInfos, nodeNameToIdx, scheduledPods)
if err != nil {
return err
}
snapshot.setClusterStatePodsParallelized(nodeInfos, nodeNameToIdx, scheduledPods)
} else {
// TODO(macsko): Migrate to setClusterStatePodsParallelized for parallelism == 1
// after making sure the implementation is always correct in CA 1.33.
err := snapshot.setClusterStatePodsSequential(nodeInfos, nodeNameToIdx, scheduledPods)
if err != nil {
return err
}
snapshot.setClusterStatePodsSequential(nodeInfos, nodeNameToIdx, scheduledPods)
}

// Clear caches after adding pods.
snapshot.data.clearCaches()

// TODO(DRA): Save DRA snapshot.
return nil
}
Expand Down

0 comments on commit adda3d4

Please sign in to comment.