diff --git a/internal/controllers/topology/cluster/structuredmerge/dryrun.go b/internal/controllers/topology/cluster/structuredmerge/dryrun.go index 4db1e34dc534..b2314a2a7a28 100644 --- a/internal/controllers/topology/cluster/structuredmerge/dryrun.go +++ b/internal/controllers/topology/cluster/structuredmerge/dryrun.go @@ -54,7 +54,7 @@ func dryRunSSAPatch(ctx context.Context, dryRunCtx *dryRunSSAPatchInput) (bool, // The identifier consists of: gvk, namespace, name and resourceVersion of originalUnstructured // and a hash of modifiedUnstructured. // This ensures that we re-run the request as soon as either original or modified changes. - requestIdentifier, err := ssa.ComputeRequestIdentifier(dryRunCtx.client.Scheme(), dryRunCtx.originalUnstructured, dryRunCtx.modifiedUnstructured) + requestIdentifier, err := ssa.ComputeRequestIdentifier(dryRunCtx.client.Scheme(), dryRunCtx.originalUnstructured.GetResourceVersion(), dryRunCtx.modifiedUnstructured) if err != nil { return false, false, nil, err } diff --git a/internal/util/ssa/cache.go b/internal/util/ssa/cache.go index 0587e893358f..e8bf7e89971e 100644 --- a/internal/util/ssa/cache.go +++ b/internal/util/ssa/cache.go @@ -104,18 +104,18 @@ func (r *ssaCache) Has(key, kind string) bool { // ComputeRequestIdentifier computes a request identifier for the cache. // The identifier is unique for a specific request to ensure we don't have to re-run the request // once we found out that it would not produce a diff. -// The identifier consists of: gvk, namespace, name and resourceVersion of the original object and a hash of the modified -// object. This ensures that we re-run the request as soon as either original or modified changes. -func ComputeRequestIdentifier(scheme *runtime.Scheme, original, modified client.Object) (string, error) { - modifiedObjectHash, err := hash.Compute(modified) +// The identifier consists of: gvk, namespace, name and resourceVersion of the object and a hash of the modified +// object. This ensures that we re-run the request as soon as anything changes. +func ComputeRequestIdentifier(scheme *runtime.Scheme, resourceVersion string, obj client.Object) (string, error) { + objHash, err := hash.Compute(obj) if err != nil { - return "", errors.Wrapf(err, "failed to calculate request identifier: failed to compute hash for modified object") + return "", errors.Wrapf(err, "failed to calculate request identifier: failed to compute hash for object") } - gvk, err := apiutil.GVKForObject(original, scheme) + gvk, err := apiutil.GVKForObject(obj, scheme) if err != nil { - return "", errors.Wrapf(err, "failed to calculate request identifier: failed to get GroupVersionKind of original object %s", klog.KObj(original)) + return "", errors.Wrapf(err, "failed to calculate request identifier: failed to get GroupVersionKind of object %s", klog.KObj(obj)) } - return fmt.Sprintf("%s.%s.%s.%d", gvk.String(), klog.KObj(original), original.GetResourceVersion(), modifiedObjectHash), nil + return fmt.Sprintf("%s.%s.%s.%d", gvk.String(), klog.KObj(obj), resourceVersion, objHash), nil } diff --git a/internal/util/ssa/patch.go b/internal/util/ssa/patch.go index dad0ab7b7f06..eefb0c4d11a3 100644 --- a/internal/util/ssa/patch.go +++ b/internal/util/ssa/patch.go @@ -84,6 +84,7 @@ func Patch(ctx context.Context, c client.Client, fieldManager string, modified c if err != nil { return err } + modifiedUnstructuredBeforeApply := modifiedUnstructured.DeepCopy() gvk, err := apiutil.GVKForObject(modifiedUnstructured, c.Scheme()) if err != nil { @@ -93,7 +94,7 @@ func Patch(ctx context.Context, c client.Client, fieldManager string, modified c var requestIdentifier string if options.WithCachingProxy { // Check if the request is cached. - requestIdentifier, err = ComputeRequestIdentifier(c.Scheme(), options.Original, modifiedUnstructured) + requestIdentifier, err = ComputeRequestIdentifier(c.Scheme(), options.Original.GetResourceVersion(), modifiedUnstructured) if err != nil { return errors.Wrapf(err, "failed to apply object") } @@ -132,10 +133,16 @@ func Patch(ctx context.Context, c client.Client, fieldManager string, modified c // Add the request to the cache only if dry-run was not used. if options.WithCachingProxy && !options.WithDryRun { - // If the SSA call did not update the object, add the request to the cache. - if options.Original.GetResourceVersion() == modifiedUnstructured.GetResourceVersion() { - options.Cache.Add(requestIdentifier) + // If the object changed, we need to recompute the request identifier before caching. + if options.Original.GetResourceVersion() != modifiedUnstructured.GetResourceVersion() { + // NOTE: This uses the resourceVersion from modifiedUnstructured (after apply), and the hash from + // modifiedUnstructuredBeforeApply (what we wanted to apply), which is what we want. + requestIdentifier, err = ComputeRequestIdentifier(c.Scheme(), modifiedUnstructured.GetResourceVersion(), modifiedUnstructuredBeforeApply) + if err != nil { + return errors.Wrapf(err, "failed to compute request identifier after apply") + } } + options.Cache.Add(requestIdentifier) } return nil diff --git a/internal/util/ssa/patch_test.go b/internal/util/ssa/patch_test.go index b1eadd3f7428..3753d83a07df 100644 --- a/internal/util/ssa/patch_test.go +++ b/internal/util/ssa/patch_test.go @@ -17,19 +17,33 @@ limitations under the License. package ssa import ( + "context" "testing" "time" . "github.com/onsi/gomega" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2" "sigs.k8s.io/cluster-api/util/test/builder" ) +// clientWithWatch wraps a client.Client and adds a Watch method to satisfy client.WithWatch interface. +type clientWithWatch struct { + client.Client +} + +func (c *clientWithWatch) Watch(_ context.Context, _ client.ObjectList, _ ...client.ListOption) (watch.Interface, error) { + // This is not used in the tests, but required to satisfy the client.WithWatch interface + panic("Watch not implemented") +} + func TestPatch(t *testing.T) { g := NewWithT(t) @@ -48,26 +62,49 @@ func TestPatch(t *testing.T) { fieldManager := "test-manager" ssaCache := NewCache("test-controller") + // Wrap the client with an interceptor to count API calls + var applyCallCount int + countingClient := interceptor.NewClient(&clientWithWatch{Client: env.GetClient()}, interceptor.Funcs{ + Apply: func(ctx context.Context, c client.WithWatch, obj runtime.ApplyConfiguration, opts ...client.ApplyOption) error { + applyCallCount++ + return c.Apply(ctx, obj, opts...) + }, + }) + // 1. Create the object createObject := initialObject.DeepCopy() - g.Expect(Patch(ctx, env.GetClient(), fieldManager, createObject)).To(Succeed()) + g.Expect(Patch(ctx, countingClient, fieldManager, createObject)).To(Succeed()) + g.Expect(applyCallCount).To(Equal(1), "Expected 1 API call for create") - // 2. Update the object and verify that the request was not cached as the object was changed. + // 2. Update the object and verify that the request was not cached with the old identifier, + // but is cached with a new identifier (after apply). // Get the original object. originalObject := initialObject.DeepCopy() g.Expect(env.GetAPIReader().Get(ctx, client.ObjectKeyFromObject(originalObject), originalObject)).To(Succeed()) // Modify the object modifiedObject := initialObject.DeepCopy() g.Expect(unstructured.SetNestedField(modifiedObject.Object, "baz", "spec", "foo")).To(Succeed()) - // Compute request identifier, so we can later verify that the update call was not cached. + // Compute request identifier before the update, so we can later verify that the update call was not cached with this identifier. modifiedUnstructured, err := prepareModified(env.Scheme(), modifiedObject) g.Expect(err).ToNot(HaveOccurred()) - requestIdentifier, err := ComputeRequestIdentifier(env.GetScheme(), originalObject, modifiedUnstructured) + oldRequestIdentifier, err := ComputeRequestIdentifier(env.GetScheme(), originalObject.GetResourceVersion(), modifiedUnstructured) g.Expect(err).ToNot(HaveOccurred()) + // Save a copy of modifiedUnstructured before apply to compute the new identifier later + modifiedUnstructuredBeforeApply := modifiedUnstructured.DeepCopy() // Update the object - g.Expect(Patch(ctx, env.GetClient(), fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed()) - // Verify that request was not cached (as it changed the object) - g.Expect(ssaCache.Has(requestIdentifier, initialObject.GetKind())).To(BeFalse()) + applyCallCount = 0 + g.Expect(Patch(ctx, countingClient, fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed()) + g.Expect(applyCallCount).To(Equal(1), "Expected 1 API call for first update (object changed)") + // Verify that request was not cached with the old identifier (as it changed the object) + g.Expect(ssaCache.Has(oldRequestIdentifier, initialObject.GetKind())).To(BeFalse()) + // Get the actual object from server after apply to compute the new request identifier + objectAfterApply := initialObject.DeepCopy() + g.Expect(env.GetAPIReader().Get(ctx, client.ObjectKeyFromObject(objectAfterApply), objectAfterApply)).To(Succeed()) + // Compute the new request identifier (after apply) + newRequestIdentifier, err := ComputeRequestIdentifier(env.GetScheme(), objectAfterApply.GetResourceVersion(), modifiedUnstructuredBeforeApply) + g.Expect(err).ToNot(HaveOccurred()) + // Verify that request was cached with the new identifier (after apply) + g.Expect(ssaCache.Has(newRequestIdentifier, initialObject.GetKind())).To(BeTrue()) // 3. Repeat the same update and verify that the request was cached as the object was not changed. // Get the original object. @@ -79,12 +116,14 @@ func TestPatch(t *testing.T) { // Compute request identifier, so we can later verify that the update call was cached. modifiedUnstructured, err = prepareModified(env.Scheme(), modifiedObject) g.Expect(err).ToNot(HaveOccurred()) - requestIdentifier, err = ComputeRequestIdentifier(env.GetScheme(), originalObject, modifiedUnstructured) + requestIdentifierNoOp, err := ComputeRequestIdentifier(env.GetScheme(), originalObject.GetResourceVersion(), modifiedUnstructured) g.Expect(err).ToNot(HaveOccurred()) // Update the object - g.Expect(Patch(ctx, env.GetClient(), fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed()) + applyCallCount = 0 + g.Expect(Patch(ctx, countingClient, fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed()) + g.Expect(applyCallCount).To(Equal(0), "Expected 0 API calls for repeat update (should hit cache)") // Verify that request was cached (as it did not change the object) - g.Expect(ssaCache.Has(requestIdentifier, initialObject.GetKind())).To(BeTrue()) + g.Expect(ssaCache.Has(requestIdentifierNoOp, initialObject.GetKind())).To(BeTrue()) }) t.Run("Test patch with Machine", func(*testing.T) { @@ -123,30 +162,53 @@ func TestPatch(t *testing.T) { fieldManager := "test-manager" ssaCache := NewCache("test-controller") + // Wrap the client with an interceptor to count API calls + var applyCallCount int + countingClient := interceptor.NewClient(&clientWithWatch{Client: env.GetClient()}, interceptor.Funcs{ + Apply: func(ctx context.Context, c client.WithWatch, obj runtime.ApplyConfiguration, opts ...client.ApplyOption) error { + applyCallCount++ + return c.Apply(ctx, obj, opts...) + }, + }) + // 1. Create the object createObject := initialObject.DeepCopy() - g.Expect(Patch(ctx, env.GetClient(), fieldManager, createObject)).To(Succeed()) + g.Expect(Patch(ctx, countingClient, fieldManager, createObject)).To(Succeed()) + g.Expect(applyCallCount).To(Equal(1), "Expected 1 API call for create") // Verify that gvk is still set g.Expect(createObject.GroupVersionKind()).To(Equal(initialObject.GroupVersionKind())) - // 2. Update the object and verify that the request was not cached as the object was changed. + // 2. Update the object and verify that the request was not cached with the old identifier, + // but is cached with a new identifier (after apply). // Get the original object. originalObject := initialObject.DeepCopy() g.Expect(env.GetAPIReader().Get(ctx, client.ObjectKeyFromObject(originalObject), originalObject)).To(Succeed()) // Modify the object modifiedObject := initialObject.DeepCopy() modifiedObject.Spec.Deletion.NodeDrainTimeoutSeconds = ptr.To(int32(5)) - // Compute request identifier, so we can later verify that the update call was not cached. + // Compute request identifier before the update, so we can later verify that the update call was not cached with this identifier. modifiedUnstructured, err := prepareModified(env.Scheme(), modifiedObject) g.Expect(err).ToNot(HaveOccurred()) - requestIdentifier, err := ComputeRequestIdentifier(env.GetScheme(), originalObject, modifiedUnstructured) + oldRequestIdentifier, err := ComputeRequestIdentifier(env.GetScheme(), originalObject.GetResourceVersion(), modifiedUnstructured) g.Expect(err).ToNot(HaveOccurred()) + // Save a copy of modifiedUnstructured before apply to compute the new identifier later + modifiedUnstructuredBeforeApply := modifiedUnstructured.DeepCopy() // Update the object - g.Expect(Patch(ctx, env.GetClient(), fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed()) + applyCallCount = 0 + g.Expect(Patch(ctx, countingClient, fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed()) + g.Expect(applyCallCount).To(Equal(1), "Expected 1 API call for first update (object changed)") // Verify that gvk is still set g.Expect(modifiedObject.GroupVersionKind()).To(Equal(initialObject.GroupVersionKind())) - // Verify that request was not cached (as it changed the object) - g.Expect(ssaCache.Has(requestIdentifier, initialObject.GetObjectKind().GroupVersionKind().Kind)).To(BeFalse()) + // Verify that request was not cached with the old identifier (as it changed the object) + g.Expect(ssaCache.Has(oldRequestIdentifier, initialObject.GetObjectKind().GroupVersionKind().Kind)).To(BeFalse()) + // Get the actual object from server after apply to compute the new request identifier + objectAfterApply := initialObject.DeepCopy() + g.Expect(env.GetAPIReader().Get(ctx, client.ObjectKeyFromObject(objectAfterApply), objectAfterApply)).To(Succeed()) + // Compute the new request identifier (after apply) + newRequestIdentifier, err := ComputeRequestIdentifier(env.GetScheme(), objectAfterApply.GetResourceVersion(), modifiedUnstructuredBeforeApply) + g.Expect(err).ToNot(HaveOccurred()) + // Verify that request was cached with the new identifier (after apply) + g.Expect(ssaCache.Has(newRequestIdentifier, initialObject.GetObjectKind().GroupVersionKind().Kind)).To(BeTrue()) // Wait for 1 second. We are also trying to verify in this test that the resourceVersion of the Machine // is not increased. Under some circumstances this would only happen if the timestamp in managedFields would @@ -166,12 +228,14 @@ func TestPatch(t *testing.T) { // Compute request identifier, so we can later verify that the update call was cached. modifiedUnstructured, err = prepareModified(env.Scheme(), modifiedObject) g.Expect(err).ToNot(HaveOccurred()) - requestIdentifier, err = ComputeRequestIdentifier(env.GetScheme(), originalObject, modifiedUnstructured) + requestIdentifierNoOp, err := ComputeRequestIdentifier(env.GetScheme(), originalObject.GetResourceVersion(), modifiedUnstructured) g.Expect(err).ToNot(HaveOccurred()) // Update the object - g.Expect(Patch(ctx, env.GetClient(), fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed()) + applyCallCount = 0 + g.Expect(Patch(ctx, countingClient, fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed()) + g.Expect(applyCallCount).To(Equal(0), "Expected 0 API calls for repeat update (should hit cache)") // Verify that request was cached (as it did not change the object) - g.Expect(ssaCache.Has(requestIdentifier, initialObject.GetObjectKind().GroupVersionKind().Kind)).To(BeTrue()) + g.Expect(ssaCache.Has(requestIdentifierNoOp, initialObject.GetObjectKind().GroupVersionKind().Kind)).To(BeTrue()) // Verify that gvk is still set g.Expect(modifiedObject.GroupVersionKind()).To(Equal(initialObject.GroupVersionKind())) })