Skip to content

Commit

Permalink
backport: fix: move bufio reader creation out of for loop to fix tele…
Browse files Browse the repository at this point in the history
…metry unmarshal errors (#2789) (#2812)

* fix: move bufio reader creation out of for loop to fix telemetry unmarshal errors (#2789)

* move bufio reader creation out of for loop

if the bufio reader is created in the for loop we get unmarshaling errors

* fix linter issue

* add fixed ut

* fix existing unit test flake due to closing pipe on error

a previous fix ensured the socket closed on error, but this caused an existing ut to nondeterministically fail
without the previous fix, the socket wouldn't have been closed on error

* make read inline

* make ut compatible with 1.4.x
  • Loading branch information
QxBytes authored Jun 26, 2024
1 parent 7002b34 commit b983b5e
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 15 deletions.
18 changes: 5 additions & 13 deletions telemetry/telemetrybuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,14 @@ func (tb *TelemetryBuffer) StartServer() error {
tb.connections = remove(tb.connections, index)
}
}()

reader := bufio.NewReader(conn)
for {
reportStr, err := read(conn)
if err != nil {
reportStr, readErr := reader.ReadBytes(Delimiter)
if readErr != nil {
return
}
reportStr = reportStr[:len(reportStr)-1]

var tmp map[string]interface{}
err = json.Unmarshal(reportStr, &tmp)
if err != nil {
Expand Down Expand Up @@ -195,16 +197,6 @@ func (tb *TelemetryBuffer) PushData(ctx context.Context) {
}
}

// read - read from the file descriptor
func read(conn net.Conn) (b []byte, err error) {
b, err = bufio.NewReader(conn).ReadBytes(Delimiter)
if err == nil {
b = b[:len(b)-1]
}

return
}

// Write - write to the file descriptor.
func (tb *TelemetryBuffer) Write(b []byte) (c int, err error) {
buf := make([]byte, len(b))
Expand Down
34 changes: 32 additions & 2 deletions telemetry/telemetrybuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,36 @@ func TestClientConnClose(t *testing.T) {
tbClient.Close()
}

func TestCloseOnWriteError(t *testing.T) {
tbServer, closeTBServer := createTBServer(t)
defer closeTBServer()

tbClient := NewTelemetryBuffer()
err := tbClient.Connect()
require.NoError(t, err)
defer tbClient.Close()

data := []byte("{\"good\":1}")
_, err = tbClient.Write(data)
require.NoError(t, err)
// need to wait for connection to populate in server
time.Sleep(1 * time.Second)
tbServer.mutex.Lock()
conns := tbServer.connections
tbServer.mutex.Unlock()
require.Len(t, conns, 1)

// the connection should be automatically closed on failure
badData := []byte("} malformed json }}}")
_, err = tbClient.Write(badData)
require.NoError(t, err)
time.Sleep(1 * time.Second)
tbServer.mutex.Lock()
conns = tbServer.connections
tbServer.mutex.Unlock()
require.Empty(t, conns)
}

func TestWrite(t *testing.T) {
_, closeTBServer := createTBServer(t)
defer closeTBServer()
Expand All @@ -84,8 +114,8 @@ func TestWrite(t *testing.T) {
}{
{
name: "write",
data: []byte("testdata"),
want: len("testdata") + 1, // +1 due to Delimiter('\n)
data: []byte("{\"testdata\":1}"),
want: len("{\"testdata\":1}") + 1, // +1 due to Delimiter('\n)
wantErr: false,
},
{
Expand Down

0 comments on commit b983b5e

Please sign in to comment.