Skip to content

Commit

Permalink
Respect number of bytes returned by value decoders
Browse files Browse the repository at this point in the history
  • Loading branch information
soundmonster committed Dec 20, 2024
1 parent 84240e7 commit ee12c56
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 14 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]
### Fixed
- [#227](https://github.com/deviceinsight/kafkactl/issues/227) Incorrect handling of Base64-encoded values when producing from JSON

## 5.4.0 - 2024-11-28
### Added
Expand Down
24 changes: 24 additions & 0 deletions cmd/produce/produce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,30 @@ func TestProduceWithJSONFileIntegration(t *testing.T) {
testutil.AssertEquals(t, "1#a\n2#b\n3#c", kafkaCtl.GetStdOut())
}

func TestProduceWithJSONFileBase64ValuesIntegration(t *testing.T) {

testutil.StartIntegrationTest(t)
topic := testutil.CreateTopic(t, "produce-topic-json-base64-values")
kafkaCtl := testutil.CreateKafkaCtlCommand()

dataFilePath := filepath.Join(testutil.RootDir, "internal", "testutil", "testdata")

if _, err := kafkaCtl.Execute("produce", topic,
"--file", filepath.Join(dataFilePath, "msg-base64.json"),
"--value-encoding", "base64",
"--input-format", "json"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, "3 messages produced", kafkaCtl.GetStdOut())

if _, err := kafkaCtl.Execute("consume", topic, "--from-beginning", "--print-keys", "--value-encoding", "hex", "--exit"); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

testutil.AssertEquals(t, "1#000000000001\n2#68656c6c6f\n3#6b61666b61", kafkaCtl.GetStdOut())
}

func TestProduceProtoFileWithOnlyKeyEncodedIntegration(t *testing.T) {
testutil.StartIntegrationTest(t)

Expand Down
20 changes: 6 additions & 14 deletions internal/producer/MessageSerializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,27 +63,19 @@ func decodeBytes(data []byte, encoding string) ([]byte, error) {
switch encoding {
case HEX:
out = make([]byte, hex.DecodedLen(len(data)))
if _, err := hex.Decode(out, data); err != nil {
bytelen, err := hex.Decode(out, data)
if err != nil {
return nil, err
}
return out, nil
return out[:bytelen], nil
case BASE64:
out = make([]byte, base64.StdEncoding.DecodedLen(len(data)))
if _, err := base64.StdEncoding.Decode(out, data); err != nil {
bytelen, err := base64.StdEncoding.Decode(out, data)
if err != nil {
return nil, err
}
return out[:clen(out)], nil
return out[:bytelen], nil
default:
return data, nil
}
}

// https://stackoverflow.com/a/27834860/12143351
func clen(n []byte) int {
for i := 0; i < len(n); i++ {
if n[i] == 0 {
return i
}
}
return len(n)
}
65 changes: 65 additions & 0 deletions internal/producer/MessageSerializer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package producer

import (
"reflect"
"testing"
)

func TestDecodeBytesBase64WhenInputIsNil(t *testing.T) {
got, err := decodeBytes(nil, "base64")
if got != nil {
t.Errorf("Expected nil, got %v", got)
}
if err != nil {
t.Errorf("Expected nil, got %v", err)
}
}

func TestDecodeBytesBase64WhenInputIsInvalid(t *testing.T) {
got, err := decodeBytes([]byte("..this..is..not..base64.."), "base64")
if got != nil {
t.Errorf("Expected nil, got %v", got)
}
if err == nil {
t.Errorf("Expected error, got nil")
}
}

func TestDecodeBytesBase64WithRegularInput(t *testing.T) {
// length 4
got, err := decodeBytes([]byte("dGVzdA=="), "base64")
if string(got) != "test" {
t.Errorf("Expected hello, got %v", string(got))
}
if err != nil {
t.Errorf("Expected nil, got %v", err)
}
// length 5
got, err = decodeBytes([]byte("aGVsbG8="), "base64")
if string(got) != "hello" {
t.Errorf("Expected hello, got %v", string(got))
}
if err != nil {
t.Errorf("Expected nil, got %v", err)
}

// length 6
got, err = decodeBytes([]byte("aGVsbG8h"), "base64")
if string(got) != "hello!" {
t.Errorf("Expected hello, got %v", string(got))
}
if err != nil {
t.Errorf("Expected nil, got %v", err)
}
}

func TestDecodeBytesBase64WithZeroPaddedInput(t *testing.T) {
got, err := decodeBytes([]byte("AAAAAAAD"), "base64")
expected := []byte{0, 0, 0, 0, 0, 3}
if !reflect.DeepEqual(got, expected) {
t.Errorf("Expected hello, got %v", got)
}
if err != nil {
t.Errorf("Expected nil, got %v", err)
}
}
3 changes: 3 additions & 0 deletions internal/testutil/testdata/msg-base64.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"key": "1", "value": "AAAAAAAB"}
{"key": "2", "value": "aGVsbG8="}
{"key": "3", "value": "a2Fma2E="}

0 comments on commit ee12c56

Please sign in to comment.