-
Notifications
You must be signed in to change notification settings - Fork 10
/
kafka_consumer.go
115 lines (97 loc) · 2.93 KB
/
kafka_consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"strings"
"sync"
"github.com/Shopify/sarama"
"github.com/golang/protobuf/proto"
"github.com/Juniper/jtimon/dialout"
)
func createKafkaConsumerGroup(name string, topics []string) error {
// Get the device config to get to know the paths to be subscribed
kafkaCfg := sarama.NewConfig()
kafkaCfg.Version = sarama.MaxVersion
kafkaClient, err := sarama.NewClient(strings.Split(*kafkaBroker, ","), kafkaCfg)
if err != nil {
log.Printf(fmt.Sprintf("Not able to connect to Kafka broker at %v: %v", *kafkaBroker, err))
return err
}
kafkaConsumerGroup, err := sarama.NewConsumerGroupFromClient(name, kafkaClient)
if err != nil {
log.Printf(fmt.Sprintf("Not able to create consumer: %v", err))
return err
}
ctx := context.Background()
//defer cancel()
var gnmiConsumer gnmiConsumerGroupHandler
go func() {
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
log.Printf("Inside loop")
if err := kafkaConsumerGroup.Consume(ctx, topics, &gnmiConsumer); err != nil {
log.Printf(fmt.Sprintf("Error from consumer: %v", err))
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
log.Printf("Err: %v", ctx.Err())
return
}
}
}()
return err
}
type gnmiConsumerGroupHandler struct {
m sync.RWMutex
dbHandles map[string]*JCtx
}
func (handler *gnmiConsumerGroupHandler) Setup(session sarama.ConsumerGroupSession) error {
var err error
log.Printf("Setup..")
handler.dbHandles = map[string]*JCtx{}
return err
}
func (*gnmiConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {
var err error
log.Printf("Cleanup..")
return err
}
func (handler *gnmiConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
var err error
// Process the response
// TODO: Vivek Use metadata to pass the complete context? what kafka version it is in jcloud?
// TODO: Vivek change cn
cn := *myListeningIP
log.Printf("Consuming..")
for msg := range claim.Messages() {
var dialOutRsp dialout.DialOutResponse
proto.Unmarshal(msg.Value, &dialOutRsp)
var cfg Config
json.Unmarshal(dialOutRsp.DialOutContext, &cfg)
var jctx *JCtx
handler.m.Lock()
jctx, ok := handler.dbHandles[cfg.Influx.Dbname]
if !ok {
jctx = &JCtx{config: cfg}
influxInit(jctx)
handler.dbHandles[cfg.Influx.Dbname] = jctx
handler.m.Unlock()
log.Printf("Host: %v, dbHandles: %v", cn, cfg.Influx.Dbname)
} else {
handler.m.Unlock()
jctx.config = cfg
}
for _, rsp := range dialOutRsp.Response {
err = gnmiHandleResponse(jctx, rsp)
if err != nil && strings.Contains(err.Error(), gGnmiJtimonIgnoreErrorSubstr) {
log.Printf("Host: %v, gnmiHandleResponse failed: %v", cn, err)
continue
}
}
}
return err
}