Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix franz offset out #665

Merged
merged 11 commits into from
Sep 16, 2024
113 changes: 113 additions & 0 deletions cfg/kafka_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package cfg

import (
"crypto/tls"
"os"

"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/sasl/aws"
"github.com/twmb/franz-go/pkg/sasl/plain"
"github.com/twmb/franz-go/pkg/sasl/scram"
"github.com/twmb/franz-go/plugin/kzap"
"github.com/twmb/tlscfg"
"go.uber.org/zap"
)

type KafkaClientConfig interface {
GetBrokers() []string
GetClientID() string

IsSaslEnabled() bool
GetSaslConfig() KafkaClientSaslConfig

IsSslEnabled() bool
GetSslConfig() KafkaClientSslConfig
}

type KafkaClientSaslConfig struct {
SaslMechanism string
SaslUsername string
SaslPassword string
}

type KafkaClientSslConfig struct {
CACert string
ClientCert string
ClientKey string
SslSkipVerify bool
}

func GetKafkaClientOptions(c KafkaClientConfig, l *zap.Logger) []kgo.Opt {
opts := []kgo.Opt{
kgo.SeedBrokers(c.GetBrokers()...),
kgo.ClientID(c.GetClientID()),
kgo.WithLogger(kzap.New(l)),
}

if c.IsSaslEnabled() {
saslConfig := c.GetSaslConfig()
switch saslConfig.SaslMechanism {
case "PLAIN":
opts = append(opts, kgo.SASL(plain.Auth{
User: saslConfig.SaslUsername,
Pass: saslConfig.SaslPassword,
}.AsMechanism()))
case "SCRAM-SHA-256":
opts = append(opts, kgo.SASL(scram.Auth{
User: saslConfig.SaslUsername,
Pass: saslConfig.SaslPassword,
}.AsSha256Mechanism()))
case "SCRAM-SHA-512":
opts = append(opts, kgo.SASL(scram.Auth{
User: saslConfig.SaslUsername,
Pass: saslConfig.SaslPassword,
}.AsSha512Mechanism()))
case "AWS_MSK_IAM":
opts = append(opts, kgo.SASL(aws.Auth{
AccessKey: saslConfig.SaslUsername,
SecretKey: saslConfig.SaslPassword,
}.AsManagedStreamingIAMMechanism()))
}
opts = append(opts, kgo.DialTLSConfig(new(tls.Config)))
}

if c.IsSslEnabled() {
sslConfig := c.GetSslConfig()
tlsOpts := []tlscfg.Opt{}
if sslConfig.CACert != "" || sslConfig.ClientCert != "" || sslConfig.ClientKey != "" {
if sslConfig.CACert != "" {
if _, err := os.Stat(sslConfig.CACert); err != nil {
tlsOpts = append(tlsOpts,
tlscfg.WithCA(
[]byte(sslConfig.CACert), tlscfg.ForClient,
),
)
} else {
tlsOpts = append(tlsOpts,
tlscfg.MaybeWithDiskCA(sslConfig.CACert, tlscfg.ForClient),
)
}
}

if _, err := os.Stat(sslConfig.ClientCert); err != nil {
tlsOpts = append(tlsOpts,
tlscfg.WithKeyPair(
[]byte(sslConfig.ClientCert), []byte(sslConfig.ClientKey),
),
)
} else {
tlsOpts = append(tlsOpts,
tlscfg.MaybeWithDiskKeyPair(sslConfig.ClientCert, sslConfig.ClientKey),
)
}
}
tc, err := tlscfg.New(tlsOpts...)
if err != nil {
l.Fatal("unable to create tls config", zap.Error(err))
}
tc.InsecureSkipVerify = sslConfig.SslSkipVerify
opts = append(opts, kgo.DialTLSConfig(tc))
}

return opts
}
38 changes: 24 additions & 14 deletions e2e/kafka_auth/kafka_auth.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package kafka_auth

import (
"context"
"testing"
"time"

"github.com/ozontech/file.d/cfg"
kafka_in "github.com/ozontech/file.d/plugin/input/kafka"
kafka_out "github.com/ozontech/file.d/plugin/output/kafka"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
Expand Down Expand Up @@ -111,22 +113,23 @@ func (c *Config) Configure(t *testing.T, _ *cfg.Config, _ string) {
config.ClientCert = "./kafka_auth/certs/client_cert.pem"
}

kafka_out.NewProducer(config,
zap.NewNop().WithOptions(zap.WithFatalHook(zapcore.WriteThenPanic)).Sugar(),
kafka_out.NewClient(config,
zap.NewNop().WithOptions(zap.WithFatalHook(zapcore.WriteThenPanic)),
)
},
func() {
config := &kafka_in.Config{
Brokers: c.Brokers,
Topics: []string{inTopic},
ConsumerGroup: "test-auth",
ClientID: "test-auth-in",
ChannelBufferSize: 256,
Offset_: kafka_in.OffsetTypeNewest,
ConsumerMaxProcessingTime_: 200 * time.Millisecond,
ConsumerMaxWaitTime_: 250 * time.Millisecond,
SslEnabled: true,
SslSkipVerify: true,
Brokers: c.Brokers,
Topics: []string{inTopic},
ConsumerGroup: "test-auth",
ClientID: "test-auth-in",
ChannelBufferSize: 256,
Offset_: kafka_in.OffsetTypeNewest,
ConsumerMaxWaitTime_: 250 * time.Millisecond,
SslEnabled: true,
SslSkipVerify: true,
SessionTimeout_: 10 * time.Second,
AutoCommitInterval_: 1 * time.Second,
}
if tt.sasl.Enabled {
config.SaslEnabled = true
Expand All @@ -140,8 +143,9 @@ func (c *Config) Configure(t *testing.T, _ *cfg.Config, _ string) {
config.ClientCert = "./kafka_auth/certs/client_cert.pem"
}

kafka_in.NewConsumerGroup(config,
zap.NewNop().WithOptions(zap.WithFatalHook(zapcore.WriteThenPanic)).Sugar(),
kafka_in.NewClient(config,
zap.NewNop().WithOptions(zap.WithFatalHook(zapcore.WriteThenPanic)),
Consumer{},
)
},
}
Expand All @@ -159,3 +163,9 @@ func (c *Config) Configure(t *testing.T, _ *cfg.Config, _ string) {
func (c *Config) Send(_ *testing.T) {}

func (c *Config) Validate(_ *testing.T) {}

type Consumer struct{}

func (c Consumer) Assigned(_ context.Context, _ *kgo.Client, assigned map[string][]int32) {}

func (c Consumer) Lost(_ context.Context, _ *kgo.Client, lost map[string][]int32) {}
35 changes: 23 additions & 12 deletions e2e/kafka_file/kafka_file.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package kafka_file

import (
"context"
"log"
"path"
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/ozontech/file.d/cfg"
kafka_out "github.com/ozontech/file.d/plugin/output/kafka"
"github.com/ozontech/file.d/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

// In this test, a message sender is created that generates one message for each partition. These messages are sent Count times.
Expand Down Expand Up @@ -41,27 +46,33 @@ func (c *Config) Configure(t *testing.T, conf *cfg.Config, pipelineName string)

// Send creates a Partition of messages (one for each partition) and sends them Count times to kafka
func (c *Config) Send(t *testing.T) {
config := sarama.NewConfig()
config.Producer.Flush.Frequency = time.Millisecond
config.Producer.Return.Errors = true
config.Producer.Return.Successes = true
config := &kafka_out.Config{
Brokers: c.Brokers,
MaxMessageBytes_: 512,
BatchSize_: c.Count,
}

producer, err := sarama.NewSyncProducer(c.Brokers, config)
client := kafka_out.NewClient(config,
zap.NewNop().WithOptions(zap.WithFatalHook(zapcore.WriteThenPanic)),
)
adminClient := kadm.NewClient(client)
_, err := adminClient.CreateTopic(context.TODO(), 1, 1, nil, c.Topics[0])
if err != nil {
log.Fatalf("failed to create async producer: %s", err.Error())
t.Logf("cannot create topic: %s %s", c.Topics[0], err.Error())
}
msgs := make([]*sarama.ProducerMessage, c.Partition)
message := sarama.StringEncoder(`{"key":"value"}`)

msgs := make([]*kgo.Record, c.Partition)
for i := range msgs {
msgs[i] = &sarama.ProducerMessage{}
msgs[i] = &kgo.Record{}
msgs[i].Value = []byte(`{"key":"value"}`)
msgs[i].Topic = c.Topics[0]
msgs[i].Value = message
msgs[i].Partition = int32(i)
}

for i := 0; i < c.Count; i++ {
if err = producer.SendMessages(msgs); err != nil {
result := client.ProduceSync(context.TODO(), msgs...)
err := result.FirstErr()
if err != nil {
log.Fatalf("failed to send messages: %s", err.Error())
}
}
Expand Down
19 changes: 0 additions & 19 deletions e2e/split_join/handler.go

This file was deleted.

Loading
Loading