From 1e6e5f27d61b61cb9ef0475fbde134e1858d8654 Mon Sep 17 00:00:00 2001 From: Shaza Aldawamneh Date: Tue, 21 Oct 2025 14:45:03 +0100 Subject: [PATCH 1/3] UPSTREAM: : Add audit annotation for watch rejections due to storage initialization Signed-off-by: Shaza Aldawamneh --- .../src/k8s.io/apiserver/pkg/audit/context.go | 15 +++++++++ .../apiserver/pkg/audit/context_test.go | 15 +++++++++ .../apiserver/pkg/storage/cacher/ready.go | 31 ++++++++++++------- 3 files changed, 50 insertions(+), 11 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/audit/context.go b/staging/src/k8s.io/apiserver/pkg/audit/context.go index 5b93d594bffa8..3e9efe63ac31a 100644 --- a/staging/src/k8s.io/apiserver/pkg/audit/context.go +++ b/staging/src/k8s.io/apiserver/pkg/audit/context.go @@ -281,6 +281,21 @@ func AddAuditAnnotation(ctx context.Context, key, value string) { addAuditAnnotationLocked(ac, key, value) } +// AddAuditAnnotationForRejectWithReason records an audit annotation +// indicating that a watch request was rejected for the given reason. +func AddAuditAnnotationForRejectWithReason(ctx context.Context, reason string) { + AddAuditAnnotation(ctx, "audit.k8s.io/watch-reject-reason", reason) +} + +// AddAuditAnnotationForRejectMessage adds a human-readable message annotation +// truncated to 1024 characters to avoid excessive log size. +func AddAuditAnnotationForRejectMessage(ctx context.Context, msg string) { + if len(msg) > 1024 { + msg = msg[:1024] + "…" + } + AddAuditAnnotation(ctx, "audit.k8s.io/watch-reject-message", msg) +} + // AddAuditAnnotations is a bulk version of AddAuditAnnotation. Refer to AddAuditAnnotation for // restrictions on when this can be called. // keysAndValues are the key-value pairs to add, and must have an even number of items. diff --git a/staging/src/k8s.io/apiserver/pkg/audit/context_test.go b/staging/src/k8s.io/apiserver/pkg/audit/context_test.go index 9606d395cdbf4..11c0149e00a89 100644 --- a/staging/src/k8s.io/apiserver/pkg/audit/context_test.go +++ b/staging/src/k8s.io/apiserver/pkg/audit/context_test.go @@ -182,3 +182,18 @@ func withAuditContextAndLevel(ctx context.Context, t *testing.T, l auditinternal } return ctx } +func TestAddAuditAnnotationForRejectWithReason(t *testing.T) { + ctx := WithAuditContext(context.Background()) + AddAuditAnnotationForRejectWithReason(ctx, "storage_initializing") + AddAuditAnnotationForRejectMessage(ctx, "storage is (re)initializing") + + ac := AuditContextFrom(ctx) + annotations := ac.GetEventAnnotations() + + if got := annotations["audit.k8s.io/watch-reject-reason"]; got != "storage_initializing" { + t.Errorf("expected reason 'storage_initializing', got %q", got) + } + if got := annotations["audit.k8s.io/watch-reject-message"]; got != "storage is (re)initializing" { + t.Errorf("expected message 'storage is (re)initializing', got %q", got) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go index 68ff509f029c7..610a8367548fd 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go @@ -18,13 +18,20 @@ package cacher import ( "context" + "errors" "fmt" "sync" "time" + "k8s.io/apiserver/pkg/audit" "k8s.io/utils/clock" ) +// ErrStorageInitializing is returned when the cacher is still initializing. +// This allows callers to detect this specific condition and handle it +// (e.g., add an audit annotation or return HTTP 429). +var ErrStorageInitializing = errors.New("storage is (re)initializing") + type status int const ( @@ -80,7 +87,6 @@ func (r *ready) wait(ctx context.Context) error { // of times we entered ready state if Ready and error otherwise. func (r *ready) waitAndReadGeneration(ctx context.Context) (int, error) { for { - // r.done() only blocks if state is Pending select { case <-ctx.Done(): return 0, ctx.Err() @@ -89,18 +95,22 @@ func (r *ready) waitAndReadGeneration(ctx context.Context) (int, error) { r.lock.RLock() if r.state == Pending { - // since we allow to switch between the states Pending and Ready - // if there is a quick transition from Pending -> Ready -> Pending - // a process that was waiting can get unblocked and see a Pending - // state again. If the state is Pending we have to wait again to - // avoid an inconsistent state on the system, with some processes not - // waiting despite the state moved back to Pending. r.lock.RUnlock() continue } + generation, err := r.readGenerationLocked() r.lock.RUnlock() - return generation, err + + if err != nil { + if errors.Is(err, ErrStorageInitializing) { + audit.AddAuditAnnotationForRejectWithReason(ctx, "storage_initializing") + audit.AddAuditAnnotationForRejectMessage(ctx, err.Error()) + } + return 0, err + } + + return generation, nil } } @@ -122,10 +132,9 @@ func (r *ready) readGenerationLocked() (int, error) { switch r.state { case Pending: if r.lastErr == nil { - return 0, fmt.Errorf("storage is (re)initializing") - } else { - return 0, fmt.Errorf("storage is (re)initializing: %w", r.lastErr) + return 0, ErrStorageInitializing } + return 0, fmt.Errorf("%w: %v", ErrStorageInitializing, r.lastErr) case Ready: return r.generation, nil case Stopped: From 32790a0373b43cf54d7340c786c2bb1a3451d603 Mon Sep 17 00:00:00 2001 From: Shaza Aldawamneh Date: Thu, 23 Oct 2025 11:30:15 +0100 Subject: [PATCH 2/3] UPSTREAM: : Add audit annotation for watch rejections due to storage initialization Signed-off-by: Shaza Aldawamneh --- .../src/k8s.io/apiserver/pkg/audit/context.go | 15 -------------- .../apiserver/pkg/audit/context_test.go | 15 -------------- .../apiserver/pkg/storage/cacher/cacher.go | 12 +++++++++++ .../apiserver/pkg/storage/cacher/ready.go | 20 ++++++++----------- 4 files changed, 20 insertions(+), 42 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/audit/context.go b/staging/src/k8s.io/apiserver/pkg/audit/context.go index 3e9efe63ac31a..5b93d594bffa8 100644 --- a/staging/src/k8s.io/apiserver/pkg/audit/context.go +++ b/staging/src/k8s.io/apiserver/pkg/audit/context.go @@ -281,21 +281,6 @@ func AddAuditAnnotation(ctx context.Context, key, value string) { addAuditAnnotationLocked(ac, key, value) } -// AddAuditAnnotationForRejectWithReason records an audit annotation -// indicating that a watch request was rejected for the given reason. -func AddAuditAnnotationForRejectWithReason(ctx context.Context, reason string) { - AddAuditAnnotation(ctx, "audit.k8s.io/watch-reject-reason", reason) -} - -// AddAuditAnnotationForRejectMessage adds a human-readable message annotation -// truncated to 1024 characters to avoid excessive log size. -func AddAuditAnnotationForRejectMessage(ctx context.Context, msg string) { - if len(msg) > 1024 { - msg = msg[:1024] + "…" - } - AddAuditAnnotation(ctx, "audit.k8s.io/watch-reject-message", msg) -} - // AddAuditAnnotations is a bulk version of AddAuditAnnotation. Refer to AddAuditAnnotation for // restrictions on when this can be called. // keysAndValues are the key-value pairs to add, and must have an even number of items. diff --git a/staging/src/k8s.io/apiserver/pkg/audit/context_test.go b/staging/src/k8s.io/apiserver/pkg/audit/context_test.go index 11c0149e00a89..9606d395cdbf4 100644 --- a/staging/src/k8s.io/apiserver/pkg/audit/context_test.go +++ b/staging/src/k8s.io/apiserver/pkg/audit/context_test.go @@ -182,18 +182,3 @@ func withAuditContextAndLevel(ctx context.Context, t *testing.T, l auditinternal } return ctx } -func TestAddAuditAnnotationForRejectWithReason(t *testing.T) { - ctx := WithAuditContext(context.Background()) - AddAuditAnnotationForRejectWithReason(ctx, "storage_initializing") - AddAuditAnnotationForRejectMessage(ctx, "storage is (re)initializing") - - ac := AuditContextFrom(ctx) - annotations := ac.GetEventAnnotations() - - if got := annotations["audit.k8s.io/watch-reject-reason"]; got != "storage_initializing" { - t.Errorf("expected reason 'storage_initializing', got %q", got) - } - if got := annotations["audit.k8s.io/watch-reject-message"]; got != "storage is (re)initializing" { - t.Errorf("expected message 'storage is (re)initializing', got %q", got) - } -} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 3770bc8370333..5c2454725216b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -506,6 +506,18 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions } else { readyGeneration, err = c.ready.waitAndReadGeneration(ctx) if err != nil { + if err == ErrStorageInitializing || strings.HasPrefix(err.Error(), "storage is (re)initializing") { + // Add audit annotations with OpenShift prefix + audit.AddAuditAnnotation(ctx, "openshift.io/watch-reject-reason", "storage_initializing") + + msg := err.Error() + if len(msg) > 1024 { + msg = msg[:1024] + "…" + } + audit.AddAuditAnnotation(ctx, "openshift.io/watch-reject-message", msg) + } + + // Return HTTP 503 (ServiceUnavailable) to the client return nil, errors.NewServiceUnavailable(err.Error()) } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go index 610a8367548fd..0654bcaa4678a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go @@ -23,7 +23,6 @@ import ( "sync" "time" - "k8s.io/apiserver/pkg/audit" "k8s.io/utils/clock" ) @@ -87,6 +86,7 @@ func (r *ready) wait(ctx context.Context) error { // of times we entered ready state if Ready and error otherwise. func (r *ready) waitAndReadGeneration(ctx context.Context) (int, error) { for { + // r.done() only blocks if state is Pending select { case <-ctx.Done(): return 0, ctx.Err() @@ -95,22 +95,18 @@ func (r *ready) waitAndReadGeneration(ctx context.Context) (int, error) { r.lock.RLock() if r.state == Pending { + // since we allow to switch between the states Pending and Ready + // if there is a quick transition from Pending -> Ready -> Pending + // a process that was waiting can get unblocked and see a Pending + // state again. If the state is Pending we have to wait again to + // avoid an inconsistent state on the system, with some processes not + // waiting despite the state moved back to Pending. r.lock.RUnlock() continue } - generation, err := r.readGenerationLocked() r.lock.RUnlock() - - if err != nil { - if errors.Is(err, ErrStorageInitializing) { - audit.AddAuditAnnotationForRejectWithReason(ctx, "storage_initializing") - audit.AddAuditAnnotationForRejectMessage(ctx, err.Error()) - } - return 0, err - } - - return generation, nil + return generation, err } } From 9fbc4ac4a39f7df2ef5e3ed4093c0bf6c8a5249f Mon Sep 17 00:00:00 2001 From: Shaza Aldawamneh Date: Fri, 24 Oct 2025 11:41:40 +0100 Subject: [PATCH 3/3] UPSTREAM: : Add audit annotations for watch rejections due to storage initializing Signed-off-by: Shaza Aldawamneh --- .../apiserver/pkg/storage/cacher/cacher.go | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 5c2454725216b..5a338e073d250 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -501,23 +501,17 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions var downtime time.Duration readyGeneration, downtime, err = c.ready.checkAndReadGeneration() if err != nil { + audit.AddAuditAnnotation(ctx, "openshift.io/watch-reject-reason", "storage_initializing") + msg := err.Error() + if len(msg) > 1024 { + msg = msg[:1024] + "…" + } + audit.AddAuditAnnotation(ctx, "openshift.io/watch-reject-message", msg) return nil, errors.NewTooManyRequests(err.Error(), calculateRetryAfterForUnreadyCache(downtime)) } } else { readyGeneration, err = c.ready.waitAndReadGeneration(ctx) if err != nil { - if err == ErrStorageInitializing || strings.HasPrefix(err.Error(), "storage is (re)initializing") { - // Add audit annotations with OpenShift prefix - audit.AddAuditAnnotation(ctx, "openshift.io/watch-reject-reason", "storage_initializing") - - msg := err.Error() - if len(msg) > 1024 { - msg = msg[:1024] + "…" - } - audit.AddAuditAnnotation(ctx, "openshift.io/watch-reject-message", msg) - } - - // Return HTTP 503 (ServiceUnavailable) to the client return nil, errors.NewServiceUnavailable(err.Error()) } }