Skip to content

Commit

Permalink
use franz-go in kafka output plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
DmitryRomanov committed Jun 27, 2024
1 parent 76f56f3 commit 5922273
Show file tree
Hide file tree
Showing 17 changed files with 306 additions and 318 deletions.
27 changes: 16 additions & 11 deletions e2e/kafka_auth/kafka_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,22 +111,23 @@ func (c *Config) Configure(t *testing.T, _ *cfg.Config, _ string) {
config.ClientCert = "./kafka_auth/certs/client_cert.pem"
}

kafka_out.NewProducer(config,
kafka_out.NewClient(config,
zap.NewNop().WithOptions(zap.WithFatalHook(zapcore.WriteThenPanic)).Sugar(),
)
},
func() {
config := &kafka_in.Config{
Brokers: c.Brokers,
Topics: []string{inTopic},
ConsumerGroup: "test-auth",
ClientID: "test-auth-in",
ChannelBufferSize: 256,
Offset_: kafka_in.OffsetTypeNewest,
ConsumerMaxProcessingTime_: 200 * time.Millisecond,
ConsumerMaxWaitTime_: 250 * time.Millisecond,
SslEnabled: true,
SslSkipVerify: true,
Brokers: c.Brokers,
Topics: []string{inTopic},
ConsumerGroup: "test-auth",
ClientID: "test-auth-in",
ChannelBufferSize: 256,
Offset_: kafka_in.OffsetTypeNewest,
ConsumerMaxWaitTime_: 250 * time.Millisecond,
SslEnabled: true,
SslSkipVerify: true,
SessionTimeout_: 10 * time.Second,
AutoCommitInterval_: 1 * time.Second,
}
if tt.sasl.Enabled {
config.SaslEnabled = true
Expand All @@ -139,6 +140,10 @@ func (c *Config) Configure(t *testing.T, _ *cfg.Config, _ string) {
config.ClientKey = "./kafka_auth/certs/client_key.pem"
config.ClientCert = "./kafka_auth/certs/client_cert.pem"
}

kafka_in.NewClient(config,
zap.NewNop().WithOptions(zap.WithFatalHook(zapcore.WriteThenPanic)).Sugar(),
)
},
}

Expand Down
34 changes: 22 additions & 12 deletions e2e/kafka_file/kafka_file.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package kafka_file

import (
"context"
"log"
"path"
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/ozontech/file.d/cfg"
kafka_out "github.com/ozontech/file.d/plugin/output/kafka"
"github.com/ozontech/file.d/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

// In this test, a message sender is created that generates one message for each partition. These messages are sent Count times.
Expand Down Expand Up @@ -41,27 +46,32 @@ func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string)

// Send creates a Partition of messages (one for each partition) and sends them Count times to kafka
func (c *Config) Send(t *testing.T) {
config := sarama.NewConfig()
config.Producer.Flush.Frequency = time.Millisecond
config.Producer.Return.Errors = true
config.Producer.Return.Successes = true
config := &kafka_out.Config{
Brokers: c.Brokers,
MaxMessageBytes_: 512,
BatchSize_: c.Count,
}

producer, err := sarama.NewSyncProducer(c.Brokers, config)
client := kafka_out.NewClient(config,
zap.NewNop().WithOptions(zap.WithFatalHook(zapcore.WriteThenPanic)).Sugar(),
)
adminClient := kadm.NewClient(client)
_, err := adminClient.CreateTopic(context.TODO(), 1, 1, nil, c.Topics[0])
if err != nil {
log.Fatalf("failed to create async producer: %s", err.Error())
t.Logf("cannot create topic: %s %s", c.Topics[0], err.Error())
}
msgs := make([]*sarama.ProducerMessage, c.Partition)
message := sarama.StringEncoder(`{"key":"value"}`)

msgs := make([]*kgo.Record, c.Partition)
for i := range msgs {
msgs[i] = &sarama.ProducerMessage{}
msgs[i] = kgo.SliceRecord([]byte(`{"key":"value"}`))
msgs[i].Topic = c.Topics[0]
msgs[i].Value = message
msgs[i].Partition = int32(i)
}

for i := 0; i < c.Count; i++ {
if err = producer.SendMessages(msgs); err != nil {
result := client.ProduceSync(context.TODO(), msgs...)
err := result.FirstErr()
if err != nil {
log.Fatalf("failed to send messages: %s", err.Error())
}
}
Expand Down
19 changes: 0 additions & 19 deletions e2e/split_join/handler.go

This file was deleted.

53 changes: 33 additions & 20 deletions e2e/split_join/split_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@ import (
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/ozontech/file.d/cfg"
kafka_in "github.com/ozontech/file.d/plugin/input/kafka"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

const (
Expand All @@ -27,7 +31,7 @@ const (

type Config struct {
inputDir string
consumer sarama.ConsumerGroup
client *kgo.Client
topic string
}

Expand All @@ -48,18 +52,23 @@ func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string)
output.Set("brokers", []string{brokerHost})
output.Set("default_topic", c.topic)

addrs := []string{brokerHost}
config := sarama.NewConfig()
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config := &kafka_in.Config{
Brokers: []string{brokerHost},
Topics: []string{c.topic},
ConsumerGroup: group,
Offset_: kafka_in.OffsetTypeOldest,
SessionTimeout_: 10 * time.Second,
AutoCommitInterval_: 1 * time.Second,
ConsumerMaxWaitTime_: 1 * time.Second,
HeartbeatInterval_: 10 * time.Second,
}

admin, err := sarama.NewClusterAdmin(addrs, config)
r.NoError(err)
r.NoError(admin.CreateTopic(c.topic, &sarama.TopicDetail{
NumPartitions: 1,
ReplicationFactor: 1,
}, false))
c.client = kafka_in.NewClient(config,
zap.NewNop().WithOptions(zap.WithFatalHook(zapcore.WriteThenPanic)).Sugar(),
)

c.consumer, err = sarama.NewConsumerGroup(addrs, group, config)
adminClient := kadm.NewClient(c.client)
_, err := adminClient.CreateTopic(context.TODO(), 1, 1, nil, c.topic)
r.NoError(err)
}

Expand Down Expand Up @@ -89,14 +98,18 @@ func (c *Config) Validate(t *testing.T) {
done := make(chan struct{})

go func() {
r.NoError(c.consumer.Consume(ctx, []string{c.topic}, handlerFunc(func(msg *sarama.ConsumerMessage) {
strBuilder.Write(msg.Value)
strBuilder.WriteString("\n")
gotEvents++
if gotEvents == expectedEventsCount {
close(done)
}
})))
for {
fetches := c.client.PollFetches(ctx)
fetches.EachError(func(topic string, p int32, err error) {})
fetches.EachRecord(func(r *kgo.Record) {
strBuilder.Write(r.Value)
strBuilder.WriteString("\n")
gotEvents++
if gotEvents == expectedEventsCount {
close(done)
}
})
}
}()

select {
Expand Down
19 changes: 4 additions & 15 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ require (
github.com/ClickHouse/ch-go v0.58.2
github.com/KimMachineGun/automemlimit v0.2.6
github.com/Masterminds/squirrel v1.5.4
github.com/Shopify/sarama v1.38.1
github.com/alecthomas/kingpin v2.2.6+incompatible
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137
github.com/alicebob/miniredis/v2 v2.30.5
Expand All @@ -32,6 +31,10 @@ require (
github.com/rjeczalik/notify v0.9.3
github.com/satori/go.uuid v1.2.0
github.com/stretchr/testify v1.9.0
github.com/twmb/franz-go v1.17.0
github.com/twmb/franz-go/pkg/kadm v1.12.0
github.com/twmb/franz-go/plugin/kzap v1.1.2
github.com/twmb/tlscfg v1.2.1
github.com/valyala/fasthttp v1.48.0
github.com/vitkovskii/insane-json v0.1.7
github.com/xdg-go/scram v1.1.2
Expand Down Expand Up @@ -60,9 +63,6 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dmarkham/enumer v1.5.8 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/eapache/go-resiliency v1.3.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/go-faster/city v1.0.1 // indirect
github.com/go-faster/errors v0.6.1 // indirect
Expand All @@ -76,7 +76,6 @@ require (
github.com/godbus/dbus/v5 v5.0.4 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.1.0 // indirect
Expand All @@ -88,7 +87,6 @@ require (
github.com/hashicorp/go-secure-stdlib/parseutil v0.1.6 // indirect
github.com/hashicorp/go-secure-stdlib/strutil v0.1.2 // indirect
github.com/hashicorp/go-sockaddr v1.0.2 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/imdario/mergo v0.3.7 // indirect
Expand All @@ -99,11 +97,6 @@ require (
github.com/jackc/pgtype v1.14.0 // indirect
github.com/jackc/puddle v1.3.0 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.3 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
Expand All @@ -123,16 +116,12 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/ryanuber/go-glob v1.0.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/twmb/franz-go v1.17.0 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.8.0 // indirect
github.com/twmb/franz-go/plugin/kzap v1.1.2 // indirect
github.com/twmb/tlscfg v1.2.1 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
Expand Down
Loading

0 comments on commit 5922273

Please sign in to comment.