From b36e3879a21ec1170c86093d172cf68b613bb9df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Skocze=C5=84?= Date: Wed, 15 Jan 2025 12:41:04 +0000 Subject: [PATCH] Fix data race while setting delta cluster state in parallel --- .../simulator/clustersnapshot/store/delta.go | 52 +++++-------------- 1 file changed, 13 insertions(+), 39 deletions(-) diff --git a/cluster-autoscaler/simulator/clustersnapshot/store/delta.go b/cluster-autoscaler/simulator/clustersnapshot/store/delta.go index 705bc655893e..67c1bc67fe3b 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/store/delta.go +++ b/cluster-autoscaler/simulator/clustersnapshot/store/delta.go @@ -248,24 +248,6 @@ func (data *internalDeltaSnapshotData) addPod(pod *apiv1.Pod, nodeName string) e return nil } -func (data *internalDeltaSnapshotData) addPodToNode(pod *apiv1.Pod, ni *schedulerframework.NodeInfo) error { - ni.AddPod(pod) - - // Maybe consider deleting from the list in the future. Maybe not. - data.clearCaches() - return nil -} - -func (data *internalDeltaSnapshotData) addPodsToNode(pods []*apiv1.Pod, ni *schedulerframework.NodeInfo) error { - for _, pod := range pods { - ni.AddPod(pod) - } - - // Maybe consider deleting from the list in the future. Maybe not. - data.clearCaches() - return nil -} - func (data *internalDeltaSnapshotData) removePod(namespace, name, nodeName string) error { // This always clones node info, even if the pod is actually missing. // Not sure if we mind, since removing non-existent pod @@ -456,19 +438,17 @@ func (snapshot *DeltaSnapshotStore) AddSchedulerNodeInfo(nodeInfo *schedulerfram } // setClusterStatePodsSequential sets the pods in cluster state in a sequential way. -func (snapshot *DeltaSnapshotStore) setClusterStatePodsSequential(nodeInfos []*schedulerframework.NodeInfo, nodeNameToIdx map[string]int, scheduledPods []*apiv1.Pod) error { +func (snapshot *DeltaSnapshotStore) setClusterStatePodsSequential(nodeInfos []*schedulerframework.NodeInfo, nodeNameToIdx map[string]int, scheduledPods []*apiv1.Pod) { for _, pod := range scheduledPods { if nodeIdx, ok := nodeNameToIdx[pod.Spec.NodeName]; ok { - if err := snapshot.data.addPodToNode(pod, nodeInfos[nodeIdx]); err != nil { - return err - } + // Can add pod directly. Cache will be cleared afterwards. + nodeInfos[nodeIdx].AddPod(pod) } } - return nil } // setClusterStatePodsParallelized sets the pods in cluster state in parallel based on snapshot.parallelism value. -func (snapshot *DeltaSnapshotStore) setClusterStatePodsParallelized(nodeInfos []*schedulerframework.NodeInfo, nodeNameToIdx map[string]int, scheduledPods []*apiv1.Pod) error { +func (snapshot *DeltaSnapshotStore) setClusterStatePodsParallelized(nodeInfos []*schedulerframework.NodeInfo, nodeNameToIdx map[string]int, scheduledPods []*apiv1.Pod) { podsForNode := make([][]*apiv1.Pod, len(nodeInfos)) for _, pod := range scheduledPods { nodeIdx, ok := nodeNameToIdx[pod.Spec.NodeName] @@ -479,16 +459,13 @@ func (snapshot *DeltaSnapshotStore) setClusterStatePodsParallelized(nodeInfos [] } ctx := context.Background() - ctx, cancel := context.WithCancelCause(ctx) - workqueue.ParallelizeUntil(ctx, snapshot.parallelism, len(nodeInfos), func(nodeIdx int) { - err := snapshot.data.addPodsToNode(podsForNode[nodeIdx], nodeInfos[nodeIdx]) - if err != nil { - cancel(err) + nodeInfo := nodeInfos[nodeIdx] + for _, pod := range podsForNode[nodeIdx] { + // Can add pod directly. Cache will be cleared afterwards. + nodeInfo.AddPod(pod) } }) - - return context.Cause(ctx) } // SetClusterState sets the cluster state. @@ -507,19 +484,16 @@ func (snapshot *DeltaSnapshotStore) SetClusterState(nodes []*apiv1.Node, schedul } if snapshot.parallelism > 1 { - err := snapshot.setClusterStatePodsParallelized(nodeInfos, nodeNameToIdx, scheduledPods) - if err != nil { - return err - } + snapshot.setClusterStatePodsParallelized(nodeInfos, nodeNameToIdx, scheduledPods) } else { // TODO(macsko): Migrate to setClusterStatePodsParallelized for parallelism == 1 // after making sure the implementation is always correct in CA 1.33. - err := snapshot.setClusterStatePodsSequential(nodeInfos, nodeNameToIdx, scheduledPods) - if err != nil { - return err - } + snapshot.setClusterStatePodsSequential(nodeInfos, nodeNameToIdx, scheduledPods) } + // Clear caches after adding pods. + snapshot.data.clearCaches() + // TODO(DRA): Save DRA snapshot. return nil }