From 381352f2d13890195e4cd6f173751eccb202b7eb Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Sun, 16 Nov 2025 15:30:22 +0900 Subject: [PATCH] fix panic on health check failure when using stream push Signed-off-by: SungJin1212 --- CHANGELOG.md | 1 + pkg/ingester/client/client.go | 5 +++- pkg/ingester/client/client_test.go | 44 ++++++++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 252816a7db8..6127bd3cd9a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ * [ENHANCEMENT] Distributor: Add count, spans, and buckets validations for native histogram. #7072 * [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088 * [BUGFIX] Ruler: Add XFunctions validation support. #7111 +* [BUGFIX] Distributor: Fix panic on health check failure when using stream push. #7116 ## 1.20.0 2025-11-10 diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index ed8bacd45aa..eb0ac91ec00 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -231,7 +231,10 @@ func (c *closableHealthAndIngesterClient) worker(ctx context.Context) error { select { case <-ctx.Done(): return - case job := <-c.streamPushChan: + case job, ok := <-c.streamPushChan: + if !ok { + return + } err = stream.Send(job.req) if err == io.EOF { job.resp = &cortexpb.WriteResponse{} diff --git a/pkg/ingester/client/client_test.go b/pkg/ingester/client/client_test.go index 02edc8d070d..4f8316147a2 100644 --- a/pkg/ingester/client/client_test.go +++ b/pkg/ingester/client/client_test.go @@ -9,6 +9,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "google.golang.org/grpc" @@ -115,12 +116,18 @@ func createTestIngesterClient(maxInflightPushRequests int64, currentInflightRequ type mockIngester struct { IngesterClient + mock.Mock } func (m *mockIngester) Push(_ context.Context, _ *cortexpb.WriteRequest, _ ...grpc.CallOption) (*cortexpb.WriteResponse, error) { return &cortexpb.WriteResponse{}, nil } +func (m *mockIngester) PushStream(ctx context.Context, opts ...grpc.CallOption) (Ingester_PushStreamClient, error) { + args := m.Called(ctx, opts) + return args.Get(0).(Ingester_PushStreamClient), nil +} + type mockClientConn struct { ClosableClientConn } @@ -227,3 +234,40 @@ func TestClosableHealthAndIngesterClient_Close_WithPendingJobs(t *testing.T) { assert.True(t, job1Cancelled, "job1 should have been cancelled") assert.True(t, job2Cancelled, "job2 should have been cancelled") } + +type mockClientStream struct { + mock.Mock + grpc.ClientStream +} + +func (m *mockClientStream) Send(msg *cortexpb.StreamWriteRequest) error { + args := m.Called(msg) + return args.Error(0) +} + +func (m *mockClientStream) Recv() (*cortexpb.WriteResponse, error) { + return &cortexpb.WriteResponse{}, nil +} + +func TestClosableHealthAndIngesterClient_ShouldNotPanicWhenClose(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + streamChan := make(chan *streamWriteJob) + + mockIngester := &mockIngester{} + mockStream := &mockClientStream{} + mockIngester.On("PushStream", mock.Anything, mock.Anything).Return(mockStream, nil).Once() + + client := &closableHealthAndIngesterClient{ + IngesterClient: mockIngester, + conn: &mockClientConn{}, + addr: "test-addr", + inflightPushRequests: prometheus.NewGaugeVec(prometheus.GaugeOpts{}, []string{"ingester"}), + streamCtx: ctx, + streamCancel: cancel, + streamPushChan: streamChan, + } + require.NoError(t, client.worker(context.Background())) + require.NoError(t, client.Close()) + + time.Sleep(100 * time.Millisecond) +}