From b983b5ecc0808e9b2bbc6909241c5375189441c1 Mon Sep 17 00:00:00 2001 From: QxBytes <39818795+QxBytes@users.noreply.github.com> Date: Wed, 26 Jun 2024 14:43:28 -0700 Subject: [PATCH] backport: fix: move bufio reader creation out of for loop to fix telemetry unmarshal errors (#2789) (#2812) * fix: move bufio reader creation out of for loop to fix telemetry unmarshal errors (#2789) * move bufio reader creation out of for loop if the bufio reader is created in the for loop we get unmarshaling errors * fix linter issue * add fixed ut * fix existing unit test flake due to closing pipe on error a previous fix ensured the socket closed on error, but this caused an existing ut to nondeterministically fail without the previous fix, the socket wouldn't have been closed on error * make read inline * make ut compatible with 1.4.x --- telemetry/telemetrybuffer.go | 18 +++++----------- telemetry/telemetrybuffer_test.go | 34 +++++++++++++++++++++++++++++-- 2 files changed, 37 insertions(+), 15 deletions(-) diff --git a/telemetry/telemetrybuffer.go b/telemetry/telemetrybuffer.go index 0ea13f4e0a..7fc49e7eb3 100644 --- a/telemetry/telemetrybuffer.go +++ b/telemetry/telemetrybuffer.go @@ -123,12 +123,14 @@ func (tb *TelemetryBuffer) StartServer() error { tb.connections = remove(tb.connections, index) } }() - + reader := bufio.NewReader(conn) for { - reportStr, err := read(conn) - if err != nil { + reportStr, readErr := reader.ReadBytes(Delimiter) + if readErr != nil { return } + reportStr = reportStr[:len(reportStr)-1] + var tmp map[string]interface{} err = json.Unmarshal(reportStr, &tmp) if err != nil { @@ -195,16 +197,6 @@ func (tb *TelemetryBuffer) PushData(ctx context.Context) { } } -// read - read from the file descriptor -func read(conn net.Conn) (b []byte, err error) { - b, err = bufio.NewReader(conn).ReadBytes(Delimiter) - if err == nil { - b = b[:len(b)-1] - } - - return -} - // Write - write to the file descriptor. func (tb *TelemetryBuffer) Write(b []byte) (c int, err error) { buf := make([]byte, len(b)) diff --git a/telemetry/telemetrybuffer_test.go b/telemetry/telemetrybuffer_test.go index f226c6a87b..cdb79849f7 100644 --- a/telemetry/telemetrybuffer_test.go +++ b/telemetry/telemetrybuffer_test.go @@ -67,6 +67,36 @@ func TestClientConnClose(t *testing.T) { tbClient.Close() } +func TestCloseOnWriteError(t *testing.T) { + tbServer, closeTBServer := createTBServer(t) + defer closeTBServer() + + tbClient := NewTelemetryBuffer() + err := tbClient.Connect() + require.NoError(t, err) + defer tbClient.Close() + + data := []byte("{\"good\":1}") + _, err = tbClient.Write(data) + require.NoError(t, err) + // need to wait for connection to populate in server + time.Sleep(1 * time.Second) + tbServer.mutex.Lock() + conns := tbServer.connections + tbServer.mutex.Unlock() + require.Len(t, conns, 1) + + // the connection should be automatically closed on failure + badData := []byte("} malformed json }}}") + _, err = tbClient.Write(badData) + require.NoError(t, err) + time.Sleep(1 * time.Second) + tbServer.mutex.Lock() + conns = tbServer.connections + tbServer.mutex.Unlock() + require.Empty(t, conns) +} + func TestWrite(t *testing.T) { _, closeTBServer := createTBServer(t) defer closeTBServer() @@ -84,8 +114,8 @@ func TestWrite(t *testing.T) { }{ { name: "write", - data: []byte("testdata"), - want: len("testdata") + 1, // +1 due to Delimiter('\n) + data: []byte("{\"testdata\":1}"), + want: len("{\"testdata\":1}") + 1, // +1 due to Delimiter('\n) wantErr: false, }, {