Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func dryRunSSAPatch(ctx context.Context, dryRunCtx *dryRunSSAPatchInput) (bool,
// The identifier consists of: gvk, namespace, name and resourceVersion of originalUnstructured
// and a hash of modifiedUnstructured.
// This ensures that we re-run the request as soon as either original or modified changes.
requestIdentifier, err := ssa.ComputeRequestIdentifier(dryRunCtx.client.Scheme(), dryRunCtx.originalUnstructured, dryRunCtx.modifiedUnstructured)
requestIdentifier, err := ssa.ComputeRequestIdentifier(dryRunCtx.client.Scheme(), dryRunCtx.originalUnstructured.GetResourceVersion(), dryRunCtx.modifiedUnstructured)
if err != nil {
return false, false, nil, err
}
Expand Down
16 changes: 8 additions & 8 deletions internal/util/ssa/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,18 +104,18 @@ func (r *ssaCache) Has(key, kind string) bool {
// ComputeRequestIdentifier computes a request identifier for the cache.
// The identifier is unique for a specific request to ensure we don't have to re-run the request
// once we found out that it would not produce a diff.
// The identifier consists of: gvk, namespace, name and resourceVersion of the original object and a hash of the modified
// object. This ensures that we re-run the request as soon as either original or modified changes.
func ComputeRequestIdentifier(scheme *runtime.Scheme, original, modified client.Object) (string, error) {
modifiedObjectHash, err := hash.Compute(modified)
// The identifier consists of: gvk, namespace, name and resourceVersion of the object and a hash of the modified
// object. This ensures that we re-run the request as soon as anything changes.
func ComputeRequestIdentifier(scheme *runtime.Scheme, resourceVersion string, obj client.Object) (string, error) {
objHash, err := hash.Compute(obj)
if err != nil {
return "", errors.Wrapf(err, "failed to calculate request identifier: failed to compute hash for modified object")
return "", errors.Wrapf(err, "failed to calculate request identifier: failed to compute hash for object")
}

gvk, err := apiutil.GVKForObject(original, scheme)
gvk, err := apiutil.GVKForObject(obj, scheme)
if err != nil {
return "", errors.Wrapf(err, "failed to calculate request identifier: failed to get GroupVersionKind of original object %s", klog.KObj(original))
return "", errors.Wrapf(err, "failed to calculate request identifier: failed to get GroupVersionKind of object %s", klog.KObj(obj))
}

return fmt.Sprintf("%s.%s.%s.%d", gvk.String(), klog.KObj(original), original.GetResourceVersion(), modifiedObjectHash), nil
return fmt.Sprintf("%s.%s.%s.%d", gvk.String(), klog.KObj(obj), resourceVersion, objHash), nil
}
15 changes: 11 additions & 4 deletions internal/util/ssa/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func Patch(ctx context.Context, c client.Client, fieldManager string, modified c
if err != nil {
return err
}
modifiedUnstructuredBeforeApply := modifiedUnstructured.DeepCopy()

gvk, err := apiutil.GVKForObject(modifiedUnstructured, c.Scheme())
if err != nil {
Expand All @@ -93,7 +94,7 @@ func Patch(ctx context.Context, c client.Client, fieldManager string, modified c
var requestIdentifier string
if options.WithCachingProxy {
// Check if the request is cached.
requestIdentifier, err = ComputeRequestIdentifier(c.Scheme(), options.Original, modifiedUnstructured)
requestIdentifier, err = ComputeRequestIdentifier(c.Scheme(), options.Original.GetResourceVersion(), modifiedUnstructured)
if err != nil {
return errors.Wrapf(err, "failed to apply object")
}
Expand Down Expand Up @@ -132,10 +133,16 @@ func Patch(ctx context.Context, c client.Client, fieldManager string, modified c

// Add the request to the cache only if dry-run was not used.
if options.WithCachingProxy && !options.WithDryRun {
// If the SSA call did not update the object, add the request to the cache.
if options.Original.GetResourceVersion() == modifiedUnstructured.GetResourceVersion() {
options.Cache.Add(requestIdentifier)
// If the object changed, we need to recompute the request identifier before caching.
if options.Original.GetResourceVersion() != modifiedUnstructured.GetResourceVersion() {
// NOTE: This uses the resourceVersion from modifiedUnstructured (after apply), and the hash from
// modifiedUnstructuredBeforeApply (what we wanted to apply), which is what we want.
requestIdentifier, err = ComputeRequestIdentifier(c.Scheme(), modifiedUnstructured.GetResourceVersion(), modifiedUnstructuredBeforeApply)
if err != nil {
return errors.Wrapf(err, "failed to compute request identifier after apply")
}
}
options.Cache.Add(requestIdentifier)
}

return nil
Expand Down
104 changes: 84 additions & 20 deletions internal/util/ssa/patch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,33 @@ limitations under the License.
package ssa

import (
"context"
"testing"
"time"

. "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"

clusterv1 "sigs.k8s.io/cluster-api/api/core/v1beta2"
"sigs.k8s.io/cluster-api/util/test/builder"
)

// clientWithWatch wraps a client.Client and adds a Watch method to satisfy client.WithWatch interface.
type clientWithWatch struct {
client.Client
}

func (c *clientWithWatch) Watch(_ context.Context, _ client.ObjectList, _ ...client.ListOption) (watch.Interface, error) {
// This is not used in the tests, but required to satisfy the client.WithWatch interface
panic("Watch not implemented")
}

func TestPatch(t *testing.T) {
g := NewWithT(t)

Expand All @@ -48,26 +62,49 @@ func TestPatch(t *testing.T) {
fieldManager := "test-manager"
ssaCache := NewCache("test-controller")

// Wrap the client with an interceptor to count API calls
var applyCallCount int
countingClient := interceptor.NewClient(&clientWithWatch{Client: env.GetClient()}, interceptor.Funcs{
Apply: func(ctx context.Context, c client.WithWatch, obj runtime.ApplyConfiguration, opts ...client.ApplyOption) error {
applyCallCount++
return c.Apply(ctx, obj, opts...)
},
})

// 1. Create the object
createObject := initialObject.DeepCopy()
g.Expect(Patch(ctx, env.GetClient(), fieldManager, createObject)).To(Succeed())
g.Expect(Patch(ctx, countingClient, fieldManager, createObject)).To(Succeed())
g.Expect(applyCallCount).To(Equal(1), "Expected 1 API call for create")

// 2. Update the object and verify that the request was not cached as the object was changed.
// 2. Update the object and verify that the request was not cached with the old identifier,
// but is cached with a new identifier (after apply).
// Get the original object.
originalObject := initialObject.DeepCopy()
g.Expect(env.GetAPIReader().Get(ctx, client.ObjectKeyFromObject(originalObject), originalObject)).To(Succeed())
// Modify the object
modifiedObject := initialObject.DeepCopy()
g.Expect(unstructured.SetNestedField(modifiedObject.Object, "baz", "spec", "foo")).To(Succeed())
// Compute request identifier, so we can later verify that the update call was not cached.
// Compute request identifier before the update, so we can later verify that the update call was not cached with this identifier.
modifiedUnstructured, err := prepareModified(env.Scheme(), modifiedObject)
g.Expect(err).ToNot(HaveOccurred())
requestIdentifier, err := ComputeRequestIdentifier(env.GetScheme(), originalObject, modifiedUnstructured)
oldRequestIdentifier, err := ComputeRequestIdentifier(env.GetScheme(), originalObject.GetResourceVersion(), modifiedUnstructured)
g.Expect(err).ToNot(HaveOccurred())
// Save a copy of modifiedUnstructured before apply to compute the new identifier later
modifiedUnstructuredBeforeApply := modifiedUnstructured.DeepCopy()
// Update the object
g.Expect(Patch(ctx, env.GetClient(), fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed())
// Verify that request was not cached (as it changed the object)
g.Expect(ssaCache.Has(requestIdentifier, initialObject.GetKind())).To(BeFalse())
applyCallCount = 0
g.Expect(Patch(ctx, countingClient, fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed())
g.Expect(applyCallCount).To(Equal(1), "Expected 1 API call for first update (object changed)")
// Verify that request was not cached with the old identifier (as it changed the object)
g.Expect(ssaCache.Has(oldRequestIdentifier, initialObject.GetKind())).To(BeFalse())
// Get the actual object from server after apply to compute the new request identifier
objectAfterApply := initialObject.DeepCopy()
g.Expect(env.GetAPIReader().Get(ctx, client.ObjectKeyFromObject(objectAfterApply), objectAfterApply)).To(Succeed())
// Compute the new request identifier (after apply)
newRequestIdentifier, err := ComputeRequestIdentifier(env.GetScheme(), objectAfterApply.GetResourceVersion(), modifiedUnstructuredBeforeApply)
g.Expect(err).ToNot(HaveOccurred())
// Verify that request was cached with the new identifier (after apply)
g.Expect(ssaCache.Has(newRequestIdentifier, initialObject.GetKind())).To(BeTrue())

// 3. Repeat the same update and verify that the request was cached as the object was not changed.
// Get the original object.
Expand All @@ -79,12 +116,14 @@ func TestPatch(t *testing.T) {
// Compute request identifier, so we can later verify that the update call was cached.
modifiedUnstructured, err = prepareModified(env.Scheme(), modifiedObject)
g.Expect(err).ToNot(HaveOccurred())
requestIdentifier, err = ComputeRequestIdentifier(env.GetScheme(), originalObject, modifiedUnstructured)
requestIdentifierNoOp, err := ComputeRequestIdentifier(env.GetScheme(), originalObject.GetResourceVersion(), modifiedUnstructured)
g.Expect(err).ToNot(HaveOccurred())
// Update the object
g.Expect(Patch(ctx, env.GetClient(), fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed())
applyCallCount = 0
g.Expect(Patch(ctx, countingClient, fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed())
g.Expect(applyCallCount).To(Equal(0), "Expected 0 API calls for repeat update (should hit cache)")
// Verify that request was cached (as it did not change the object)
g.Expect(ssaCache.Has(requestIdentifier, initialObject.GetKind())).To(BeTrue())
g.Expect(ssaCache.Has(requestIdentifierNoOp, initialObject.GetKind())).To(BeTrue())
})

t.Run("Test patch with Machine", func(*testing.T) {
Expand Down Expand Up @@ -123,30 +162,53 @@ func TestPatch(t *testing.T) {
fieldManager := "test-manager"
ssaCache := NewCache("test-controller")

// Wrap the client with an interceptor to count API calls
var applyCallCount int
countingClient := interceptor.NewClient(&clientWithWatch{Client: env.GetClient()}, interceptor.Funcs{
Apply: func(ctx context.Context, c client.WithWatch, obj runtime.ApplyConfiguration, opts ...client.ApplyOption) error {
applyCallCount++
return c.Apply(ctx, obj, opts...)
},
})

// 1. Create the object
createObject := initialObject.DeepCopy()
g.Expect(Patch(ctx, env.GetClient(), fieldManager, createObject)).To(Succeed())
g.Expect(Patch(ctx, countingClient, fieldManager, createObject)).To(Succeed())
g.Expect(applyCallCount).To(Equal(1), "Expected 1 API call for create")
// Verify that gvk is still set
g.Expect(createObject.GroupVersionKind()).To(Equal(initialObject.GroupVersionKind()))

// 2. Update the object and verify that the request was not cached as the object was changed.
// 2. Update the object and verify that the request was not cached with the old identifier,
// but is cached with a new identifier (after apply).
// Get the original object.
originalObject := initialObject.DeepCopy()
g.Expect(env.GetAPIReader().Get(ctx, client.ObjectKeyFromObject(originalObject), originalObject)).To(Succeed())
// Modify the object
modifiedObject := initialObject.DeepCopy()
modifiedObject.Spec.Deletion.NodeDrainTimeoutSeconds = ptr.To(int32(5))
// Compute request identifier, so we can later verify that the update call was not cached.
// Compute request identifier before the update, so we can later verify that the update call was not cached with this identifier.
modifiedUnstructured, err := prepareModified(env.Scheme(), modifiedObject)
g.Expect(err).ToNot(HaveOccurred())
requestIdentifier, err := ComputeRequestIdentifier(env.GetScheme(), originalObject, modifiedUnstructured)
oldRequestIdentifier, err := ComputeRequestIdentifier(env.GetScheme(), originalObject.GetResourceVersion(), modifiedUnstructured)
g.Expect(err).ToNot(HaveOccurred())
// Save a copy of modifiedUnstructured before apply to compute the new identifier later
modifiedUnstructuredBeforeApply := modifiedUnstructured.DeepCopy()
// Update the object
g.Expect(Patch(ctx, env.GetClient(), fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed())
applyCallCount = 0
g.Expect(Patch(ctx, countingClient, fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed())
g.Expect(applyCallCount).To(Equal(1), "Expected 1 API call for first update (object changed)")
// Verify that gvk is still set
g.Expect(modifiedObject.GroupVersionKind()).To(Equal(initialObject.GroupVersionKind()))
// Verify that request was not cached (as it changed the object)
g.Expect(ssaCache.Has(requestIdentifier, initialObject.GetObjectKind().GroupVersionKind().Kind)).To(BeFalse())
// Verify that request was not cached with the old identifier (as it changed the object)
g.Expect(ssaCache.Has(oldRequestIdentifier, initialObject.GetObjectKind().GroupVersionKind().Kind)).To(BeFalse())
// Get the actual object from server after apply to compute the new request identifier
objectAfterApply := initialObject.DeepCopy()
g.Expect(env.GetAPIReader().Get(ctx, client.ObjectKeyFromObject(objectAfterApply), objectAfterApply)).To(Succeed())
// Compute the new request identifier (after apply)
newRequestIdentifier, err := ComputeRequestIdentifier(env.GetScheme(), objectAfterApply.GetResourceVersion(), modifiedUnstructuredBeforeApply)
g.Expect(err).ToNot(HaveOccurred())
// Verify that request was cached with the new identifier (after apply)
g.Expect(ssaCache.Has(newRequestIdentifier, initialObject.GetObjectKind().GroupVersionKind().Kind)).To(BeTrue())

// Wait for 1 second. We are also trying to verify in this test that the resourceVersion of the Machine
// is not increased. Under some circumstances this would only happen if the timestamp in managedFields would
Expand All @@ -166,12 +228,14 @@ func TestPatch(t *testing.T) {
// Compute request identifier, so we can later verify that the update call was cached.
modifiedUnstructured, err = prepareModified(env.Scheme(), modifiedObject)
g.Expect(err).ToNot(HaveOccurred())
requestIdentifier, err = ComputeRequestIdentifier(env.GetScheme(), originalObject, modifiedUnstructured)
requestIdentifierNoOp, err := ComputeRequestIdentifier(env.GetScheme(), originalObject.GetResourceVersion(), modifiedUnstructured)
g.Expect(err).ToNot(HaveOccurred())
// Update the object
g.Expect(Patch(ctx, env.GetClient(), fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed())
applyCallCount = 0
g.Expect(Patch(ctx, countingClient, fieldManager, modifiedObject, WithCachingProxy{Cache: ssaCache, Original: originalObject})).To(Succeed())
g.Expect(applyCallCount).To(Equal(0), "Expected 0 API calls for repeat update (should hit cache)")
// Verify that request was cached (as it did not change the object)
g.Expect(ssaCache.Has(requestIdentifier, initialObject.GetObjectKind().GroupVersionKind().Kind)).To(BeTrue())
g.Expect(ssaCache.Has(requestIdentifierNoOp, initialObject.GetObjectKind().GroupVersionKind().Kind)).To(BeTrue())
// Verify that gvk is still set
g.Expect(modifiedObject.GroupVersionKind()).To(Equal(initialObject.GroupVersionKind()))
})
Expand Down