Skip to content

Commit

Permalink
Fix data race while setting delta cluster state in parallel
Browse files Browse the repository at this point in the history
macsko committed Jan 15, 2025
1 parent 5cd491a commit b36e387
Showing 1 changed file with 13 additions and 39 deletions.
52 changes: 13 additions & 39 deletions cluster-autoscaler/simulator/clustersnapshot/store/delta.go
Original file line number Diff line number Diff line change
@@ -248,24 +248,6 @@ func (data *internalDeltaSnapshotData) addPod(pod *apiv1.Pod, nodeName string) e
return nil
}

func (data *internalDeltaSnapshotData) addPodToNode(pod *apiv1.Pod, ni *schedulerframework.NodeInfo) error {
ni.AddPod(pod)

// Maybe consider deleting from the list in the future. Maybe not.
data.clearCaches()
return nil
}

func (data *internalDeltaSnapshotData) addPodsToNode(pods []*apiv1.Pod, ni *schedulerframework.NodeInfo) error {
for _, pod := range pods {
ni.AddPod(pod)
}

// Maybe consider deleting from the list in the future. Maybe not.
data.clearCaches()
return nil
}

func (data *internalDeltaSnapshotData) removePod(namespace, name, nodeName string) error {
// This always clones node info, even if the pod is actually missing.
// Not sure if we mind, since removing non-existent pod
@@ -456,19 +438,17 @@ func (snapshot *DeltaSnapshotStore) AddSchedulerNodeInfo(nodeInfo *schedulerfram
}

// setClusterStatePodsSequential sets the pods in cluster state in a sequential way.
func (snapshot *DeltaSnapshotStore) setClusterStatePodsSequential(nodeInfos []*schedulerframework.NodeInfo, nodeNameToIdx map[string]int, scheduledPods []*apiv1.Pod) error {
func (snapshot *DeltaSnapshotStore) setClusterStatePodsSequential(nodeInfos []*schedulerframework.NodeInfo, nodeNameToIdx map[string]int, scheduledPods []*apiv1.Pod) {
for _, pod := range scheduledPods {
if nodeIdx, ok := nodeNameToIdx[pod.Spec.NodeName]; ok {
if err := snapshot.data.addPodToNode(pod, nodeInfos[nodeIdx]); err != nil {
return err
}
// Can add pod directly. Cache will be cleared afterwards.
nodeInfos[nodeIdx].AddPod(pod)
}
}
return nil
}

// setClusterStatePodsParallelized sets the pods in cluster state in parallel based on snapshot.parallelism value.
func (snapshot *DeltaSnapshotStore) setClusterStatePodsParallelized(nodeInfos []*schedulerframework.NodeInfo, nodeNameToIdx map[string]int, scheduledPods []*apiv1.Pod) error {
func (snapshot *DeltaSnapshotStore) setClusterStatePodsParallelized(nodeInfos []*schedulerframework.NodeInfo, nodeNameToIdx map[string]int, scheduledPods []*apiv1.Pod) {
podsForNode := make([][]*apiv1.Pod, len(nodeInfos))
for _, pod := range scheduledPods {
nodeIdx, ok := nodeNameToIdx[pod.Spec.NodeName]
@@ -479,16 +459,13 @@ func (snapshot *DeltaSnapshotStore) setClusterStatePodsParallelized(nodeInfos []
}

ctx := context.Background()
ctx, cancel := context.WithCancelCause(ctx)

workqueue.ParallelizeUntil(ctx, snapshot.parallelism, len(nodeInfos), func(nodeIdx int) {
err := snapshot.data.addPodsToNode(podsForNode[nodeIdx], nodeInfos[nodeIdx])
if err != nil {
cancel(err)
nodeInfo := nodeInfos[nodeIdx]
for _, pod := range podsForNode[nodeIdx] {
// Can add pod directly. Cache will be cleared afterwards.
nodeInfo.AddPod(pod)
}
})

return context.Cause(ctx)
}

// SetClusterState sets the cluster state.
@@ -507,19 +484,16 @@ func (snapshot *DeltaSnapshotStore) SetClusterState(nodes []*apiv1.Node, schedul
}

if snapshot.parallelism > 1 {
err := snapshot.setClusterStatePodsParallelized(nodeInfos, nodeNameToIdx, scheduledPods)
if err != nil {
return err
}
snapshot.setClusterStatePodsParallelized(nodeInfos, nodeNameToIdx, scheduledPods)
} else {
// TODO(macsko): Migrate to setClusterStatePodsParallelized for parallelism == 1
// after making sure the implementation is always correct in CA 1.33.
err := snapshot.setClusterStatePodsSequential(nodeInfos, nodeNameToIdx, scheduledPods)
if err != nil {
return err
}
snapshot.setClusterStatePodsSequential(nodeInfos, nodeNameToIdx, scheduledPods)
}

// Clear caches after adding pods.
snapshot.data.clearCaches()

// TODO(DRA): Save DRA snapshot.
return nil
}

0 comments on commit b36e387

Please sign in to comment.