-
Notifications
You must be signed in to change notification settings - Fork 0
/
forwarder_kafka.go
275 lines (245 loc) · 7.24 KB
/
forwarder_kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"net"
"os"
"strings"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/segmentio/kafka-go"
)
const (
defaultBatchPeriod = time.Second * 30
defaultBatchSize = 1000
envKafkaClientCert = "KAFKA_CLIENT_CERT"
envKafkaClientKey = "KAFKA_CLIENT_KEY"
envKafkaInterCert = "KAFKA_INTERMEDIATE_CERT"
envKafkaInterChain = "KAFKA_INTERMEDIATE_CHAIN"
envKafkaRootCert = "KAFKA_ROOT_CERT"
envKafkaBroker = "KAFKA_BROKERS"
envKafkaTopic = "KAFKA_TOPIC"
// amazonRootCACert is the certificate of one of Amazon's root CAs. The
// certificate chain that we encounter when connecting to our Kafka broker
// goes up to this CA. The root certificates are available at:
// https://www.amazontrust.com/repository/
amazonRootCACert = `
-----BEGIN CERTIFICATE-----
MIID7zCCAtegAwIBAgIBADANBgkqhkiG9w0BAQsFADCBmDELMAkGA1UEBhMCVVMx
EDAOBgNVBAgTB0FyaXpvbmExEzARBgNVBAcTClNjb3R0c2RhbGUxJTAjBgNVBAoT
HFN0YXJmaWVsZCBUZWNobm9sb2dpZXMsIEluYy4xOzA5BgNVBAMTMlN0YXJmaWVs
ZCBTZXJ2aWNlcyBSb290IENlcnRpZmljYXRlIEF1dGhvcml0eSAtIEcyMB4XDTA5
MDkwMTAwMDAwMFoXDTM3MTIzMTIzNTk1OVowgZgxCzAJBgNVBAYTAlVTMRAwDgYD
VQQIEwdBcml6b25hMRMwEQYDVQQHEwpTY290dHNkYWxlMSUwIwYDVQQKExxTdGFy
ZmllbGQgVGVjaG5vbG9naWVzLCBJbmMuMTswOQYDVQQDEzJTdGFyZmllbGQgU2Vy
dmljZXMgUm9vdCBDZXJ0aWZpY2F0ZSBBdXRob3JpdHkgLSBHMjCCASIwDQYJKoZI
hvcNAQEBBQADggEPADCCAQoCggEBANUMOsQq+U7i9b4Zl1+OiFOxHz/Lz58gE20p
OsgPfTz3a3Y4Y9k2YKibXlwAgLIvWX/2h/klQ4bnaRtSmpDhcePYLQ1Ob/bISdm2
8xpWriu2dBTrz/sm4xq6HZYuajtYlIlHVv8loJNwU4PahHQUw2eeBGg6345AWh1K
Ts9DkTvnVtYAcMtS7nt9rjrnvDH5RfbCYM8TWQIrgMw0R9+53pBlbQLPLJGmpufe
hRhJfGZOozptqbXuNC66DQO4M99H67FrjSXZm86B0UVGMpZwh94CDklDhbZsc7tk
6mFBrMnUVN+HL8cisibMn1lUaJ/8viovxFUcdUBgF4UCVTmLfwUCAwEAAaNCMEAw
DwYDVR0TAQH/BAUwAwEB/zAOBgNVHQ8BAf8EBAMCAQYwHQYDVR0OBBYEFJxfAN+q
AdcwKziIorhtSpzyEZGDMA0GCSqGSIb3DQEBCwUAA4IBAQBLNqaEd2ndOxmfZyMI
bw5hyf2E3F/YNoHN2BtBLZ9g3ccaaNnRbobhiCPPE95Dz+I0swSdHynVv/heyNXB
ve6SbzJ08pGCL72CQnqtKrcgfU28elUSwhXqvfdqlS5sdJ/PHLTyxQGjhdByPq1z
qwubdQxtRbeOlKyWN7Wg0I8VRw7j6IPdj/3vQQF3zCepYoUz8jcI73HPdwbeyBkd
iEDPfUYd/x7H4c7/I9vG+o1VTqkC50cRRj70/b17KSa7qWFiNyi2LSr2EIZkyXCn
0q23KXB56jzaYyWf/Wi3MOxw+3WKt21gZ7IeyLnp2KhvAotnDU0mV3HaIPzBSlCN
sSi6
-----END CERTIFICATE-----`
)
var errEnvVarUnset = errors.New("environment variable unset")
// kafkaWriter defines an interface that's implemented by kafka-go's
// kafka.Writer (which we use in production) and by dummyKafkaWriter (which we
// use for tests).
type kafkaWriter interface {
WriteMessages(ctx context.Context, msgs ...kafka.Message) error
}
type kafkaConfig struct {
batchPeriod time.Duration
batchSize int
clientCert *tls.Certificate
serverCerts *x509.CertPool
broker net.Addr
topic string
}
// kafkaForwarder implements a forwarder that sends tokenized data to a Kafka
// broker.
type kafkaForwarder struct {
sync.RWMutex
tokenCache *cache
conf *kafkaConfig
writer kafkaWriter
out chan token
done chan empty
}
func newKafkaForwarder() forwarder {
return &kafkaForwarder{
tokenCache: newCache(),
out: make(chan token),
done: make(chan empty),
}
}
func (k *kafkaForwarder) setConfig(c *config) {
k.Lock()
defer k.Unlock()
k.tokenCache.conf = c.kafkaConfig
k.conf = c.kafkaConfig
}
func (k *kafkaForwarder) outbox() chan token {
return k.out
}
func (k *kafkaForwarder) start() {
k.Lock()
k.writer = newKafkaWriter(k.conf)
k.Unlock()
k.tokenCache.start()
go func() {
defer k.tokenCache.stop()
for {
select {
case <-k.done:
return
case token := <-k.out:
k.tokenCache.submit(token)
k.maybeFlush()
}
}
}()
}
func (k *kafkaForwarder) stop() {
close(k.done)
}
func (k *kafkaForwarder) maybeFlush() {
elems, err := k.tokenCache.retrieve()
if err != nil {
return
}
// Turn tokens into Kafka messages.
kafkaMsgs := make([]kafka.Message, len(elems))
for i, e := range elems {
kafkaMsgs[i].Key = nil
kafkaMsgs[i].Value = e.(token)
}
batchSize := len(kafkaMsgs)
err = k.writer.WriteMessages(context.Background(), kafkaMsgs...)
if err != nil {
l := prometheus.Labels{
outcome: failBecause(fmt.Errorf("failed to forward tokens: %v", err)),
}
m.numForwarded.With(l).Add(float64(batchSize))
return
}
l.Printf("Flushed %d tokens to Kafka.", batchSize)
m.numForwarded.With(prometheus.Labels{
outcome: success,
}).Add(float64(batchSize))
}
func newKafkaWriter(conf *kafkaConfig) *kafka.Writer {
w := &kafka.Writer{
Addr: conf.broker,
Topic: conf.topic,
Transport: &kafka.Transport{
TLS: &tls.Config{
Certificates: []tls.Certificate{*conf.clientCert},
// As of 2022-12-21, our Kafka broker does not support TLS 1.3,
// which is why we're enforcing at least 1.2.
MinVersion: tls.VersionTLS12,
RootCAs: conf.serverCerts,
},
},
}
l.Printf("Created Kafka writer for %q using topic %q.", conf.broker, conf.topic)
return w
}
func loadKafkaCerts() (*tls.Certificate, *x509.CertPool, error) {
clientCertPath, exists := os.LookupEnv(envKafkaClientCert)
if !exists {
return nil, nil, errEnvVarUnset
}
clientKeyPath, exists := os.LookupEnv(envKafkaClientKey)
if !exists {
return nil, nil, errEnvVarUnset
}
clientCert, err := loadKafkaClientCert(clientCertPath, clientKeyPath)
if err != nil {
return nil, nil, err
}
interCertPath, exists := os.LookupEnv(envKafkaInterCert)
if !exists {
return nil, nil, errEnvVarUnset
}
interChainPath, exists := os.LookupEnv(envKafkaInterChain)
if !exists {
return nil, nil, errEnvVarUnset
}
rootCertPath, exists := os.LookupEnv(envKafkaRootCert)
if !exists {
return nil, nil, errEnvVarUnset
}
serverCerts, err := loadKafkaServerCerts(
[]string{interCertPath, interChainPath, rootCertPath},
)
if err != nil {
return nil, nil, err
}
return clientCert, serverCerts, nil
}
func loadKafkaClientCert(cert, key string) (*tls.Certificate, error) {
clientCert, err := tls.LoadX509KeyPair(cert, key)
if err != nil {
return nil, err
}
return &clientCert, nil
}
func loadKafkaServerCerts(paths []string) (*x509.CertPool, error) {
ourRootCAs, err := x509.SystemCertPool()
if err != nil {
l.Printf("Failed to instantiate system cert pool: %v", err)
ourRootCAs = x509.NewCertPool()
}
for _, path := range paths {
c, err := os.ReadFile(path)
if err != nil {
return nil, err
}
if ok := ourRootCAs.AppendCertsFromPEM(c); !ok {
return nil, errors.New("failed to parse intermediate certificate")
}
}
if ok := ourRootCAs.AppendCertsFromPEM([]byte(amazonRootCACert)); !ok {
return nil, errors.New("failed to parse Amazon's root CA certificate")
}
return ourRootCAs, nil
}
func loadKafkaConfig() (*kafkaConfig, error) {
clientCert, serverCerts, err := loadKafkaCerts()
if err != nil {
return nil, err
}
broker, exists := os.LookupEnv(envKafkaBroker)
if !exists {
return nil, errEnvVarUnset
}
// If we're dealing with a comma-separated list of brokers, simply select
// the first one.
broker = strings.Split(broker, ",")[0]
topic, exists := os.LookupEnv(envKafkaTopic)
if !exists {
return nil, errEnvVarUnset
}
l.Println("Loaded Kafka config.")
return &kafkaConfig{
batchSize: defaultBatchSize,
batchPeriod: defaultBatchPeriod,
clientCert: clientCert,
serverCerts: serverCerts,
broker: kafka.TCP(broker),
topic: topic,
}, nil
}