Skip to content

Commit

Permalink
Adding association between a Kafka cluster and a notifier
Browse files Browse the repository at this point in the history
Signed-off-by: muicoder <[email protected]>
linkedin#611
Adding WeCom/DingTalk template
  • Loading branch information
muicoder committed Jul 7, 2023
1 parent e593345 commit 1339403
Show file tree
Hide file tree
Showing 28 changed files with 752 additions and 25 deletions.
50 changes: 50 additions & 0 deletions .github/workflows/action.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
env:
BASE64manifest: IyEvYmluL3NoCgpDTUQ9JChpZiBidWlsZGFoID4vZGV2L251bGw7IHRoZW4gZWNobyBidWlsZGFoOyBlbGlmIHNlYWxvcyA+L2Rldi9udWxsOyB0aGVuIGVjaG8gc2VhbG9zOyBmaSkKTUY9Im1mOiQoZGF0ZSArJUYpIgoKUkVQTz0iJHsxOi1kb2NrZXIuaW8vYml0bmFtaS9tZXRyaWNzLXNlcnZlcjpkb2NrZXIuaW8vbXVpY29kZXIvbWV0cmljcy1zZXJ2ZXJ9IgpUQUdTPSIkezI6LTAuNi4zfSIKVEFHPSIkezM6LSRUQUdTfSIKCmlmIFsgIiR7UkVQTyU6Kn0iICE9ICIkUkVQTyIgXTsgdGhlbgogIGlmIFsgIiR7VEFHUyUsKn0iICE9ICIkVEFHUyIgXTsgdGhlbgogICAgZWNobyAiJFRBR1MiIHwgc2VkICJzfix+XG5+ZyIgfCB3aGlsZSByZWFkIC1yIHRhZzsgZG8KICAgICAgZWNobyAiJHtSRVBPJToqfTokdGFnIgogICAgZG9uZSB8ICRDTUQgcHVsbAogIGVsc2UKICAgICRDTUQgcHVsbCAtLXBvbGljeT1hbHdheXMgLS1wbGF0Zm9ybT1saW51eC9hbWQ2NCAiJHtSRVBPJToqfTokVEFHUyIKICAgICRDTUQgdGFnICIke1JFUE8lOip9OiRUQUdTIiAiJHtSRVBPIyo6fTokVEFHUy1hbWQ2NCIKICAgICRDTUQgcHVsbCAtLXBvbGljeT1hbHdheXMgLS1wbGF0Zm9ybT1saW51eC9hcm02NCAiJHtSRVBPJToqfTokVEFHUyIKICAgICRDTUQgdGFnICIke1JFUE8lOip9OiRUQUdTIiAiJHtSRVBPIyo6fTokVEFHUy1hcm02NCIKICAgIFRBR1M9IiRUQUdTLWFtZDY0LCRUQUdTLWFybTY0IgogIGZpCmZpCgplY2hvICIkVEFHUyIgfCBzZWQgInN+LH5cbn5nIiB8IHdoaWxlIHJlYWQgLXIgdGFnOyBkbwogIGVjaG8gIiR7UkVQTyMqOn06JHRhZyIKZG9uZSB8IHhhcmdzICRDTUQgbWFuaWZlc3QgY3JlYXRlIC0tYWxsICIkTUYiCiRDTUQgbWFuaWZlc3QgcHVzaCAtLWFsbCAiJE1GIiAiZG9ja2VyOi8vJHtSRVBPIyo6fTokVEFHIgokQ01EIG1hbmlmZXN0IHJtICIkTUYiIHx8IHRydWUK
jobs:
aio-manifest:
needs:
- build
runs-on: ubuntu-latest
steps:
- name: Login to DockerHub
uses: docker/login-action@v2
with:
password: ${{ secrets.DOCKERHUB_PASSWORD }}
username: ${{ secrets.DOCKERHUB_USERNAME }}
- name: manifest
run: echo ${{ env.BASE64manifest }} | base64 -d | sh -s docker.io/${{ secrets.DOCKERHUB_USERNAME }}/burrow action-amd64,action-arm64 action
build:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Login to DockerHub
uses: docker/login-action@v2
with:
password: ${{ secrets.DOCKERHUB_PASSWORD }}
username: ${{ secrets.DOCKERHUB_USERNAME }}
- name: Build and push
uses: docker/build-push-action@v3
with:
context: .
file: Dockerfile.${{ matrix.arch}}
platforms: ${{ matrix.os}}/${{ matrix.arch}}
provenance: false
pull: true
push: true
sbom: false
tags: ${{ secrets.DOCKERHUB_USERNAME }}/burrow:action-${{ matrix.arch}}
strategy:
matrix:
arch:
- arm64
- amd64
os:
- linux
name: build
on:
workflow_dispatch:
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: CI

on: [push, pull_request]
on: [pull_request]

jobs:
test:
Expand Down
27 changes: 27 additions & 0 deletions Dockerfile.amd64
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
FROM quay.io/coreos/etcd:v3.3.27 as etcd
FROM quay.io/coreos/zetcd:v0.0.5 as zetcd
FROM edenhill/kcat:1.7.1 as kcat

FROM golang:1.20-alpine as builder
ARG git_user=muicoder
ARG git_repo=Burrow
ARG git_branch=action
ENV CGO_ENABLED=0
RUN set -ex && \
wget -qO- https://github.com/$git_user/$git_repo/archive/refs/heads/$git_branch.tar.gz | tar -xz && \
cd $git_repo-$git_branch && \
go get -u all && go mod verify && go mod tidy && \
go build -trimpath -ldflags '-s -w -extldflags "-static"' -o $GOPATH/bin/burrow && \
go install -ldflags '-s -w -extldflags "-static"' github.com/etcd-io/zetcd/cmd/zkboom@latest && \
go install -ldflags '-s -w -extldflags "-static"' github.com/etcd-io/zetcd/cmd/zkctl@latest && \
ls -lh $GOPATH/bin

FROM alpine:edge as cached
COPY --from=etcd /usr/local/bin/etcd* /cached/
COPY --from=zetcd /usr/local/bin/zetcd* /cached/
COPY --from=builder /go/bin/* /cached/
COPY --from=kcat /usr/bin/kcat /cached/
# kcat on libcrypto1.1+libssl1.1
FROM alpine:3.16
RUN apk add --no-cache curl jq wget tzdata libcurl lz4-libs zstd-libs ca-certificates openssl
COPY --from=cached /cached /usr/local/bin/
25 changes: 25 additions & 0 deletions Dockerfile.arm64
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
FROM quay.io/coreos/etcd:v3.3.27-arm64 as etcd
FROM kbzjung359/zetcd:v0.0.5-alpine-arm64 as zetcd

FROM golang:1.20-alpine as builder
ARG git_user=muicoder
ARG git_repo=Burrow
ARG git_branch=action
ENV CGO_ENABLED=0
RUN set -ex && \
wget -qO- https://github.com/$git_user/$git_repo/archive/refs/heads/$git_branch.tar.gz | tar -xz && \
cd $git_repo-$git_branch && \
go get -u all && go mod verify && go mod tidy && \
go build -trimpath -ldflags '-s -w -extldflags "-static"' -o $GOPATH/bin/burrow && \
go install -ldflags '-s -w -extldflags "-static"' github.com/etcd-io/zetcd/cmd/zkboom@latest && \
go install -ldflags '-s -w -extldflags "-static"' github.com/etcd-io/zetcd/cmd/zkctl@latest && \
ls -lh $GOPATH/bin

FROM alpine:edge as cached
COPY --from=etcd /usr/local/bin/etcd* /cached/
COPY --from=zetcd /usr/local/bin/zetcd* /cached/
COPY --from=builder /go/bin/* /cached/
# kcat on libcrypto1.1+libssl1.1
FROM alpine:3.16
RUN apk add --no-cache curl jq wget tzdata libcurl lz4-libs zstd-libs ca-certificates openssl
COPY --from=cached /cached /usr/local/bin/
1 change: 1 addition & 0 deletions config/burrow.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ min-distance=1

[notifier.default]
class-name="http"
cluster="local"
url-open="http://someservice.example.com:1467/v1/event"
interval=60
timeout=5
Expand Down
52 changes: 52 additions & 0 deletions config/default-dingtalk-post.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
{"msgtype": "markdown","markdown": {"title":"Kafka LagChecker", "text": "
{{- $StatusURL := "https://pkg.go.dev/github.com/linkedin/Burrow/core/protocol#StatusConstant"}}
{{- $FormatString := "2006-01-02 15:04:05"}}
# Kafka: {{.Cluster}}
ConsumerGroup: {{.Group}}{{- with .Result.Status}}
{{- if eq . 0}}NotFound{{end}}
{{- if eq . 1}}normal{{end}}
{{- if eq . 2}}lagging{{end}}
{{- if eq . 3}}abnormal{{end}}
{{- end}}
**Status:** Total(Partitions={{.Result.TotalPartitions}},Lag={{.Result.TotalLag}})[{{- with .Result.Status}}
{{- if eq . 0}}NotFound{{end}}
{{- if eq . 1}}{{.}}{{end}}
{{- if eq . 2}}{{.}}{{end}}
{{- if eq . 3}}{{.}}{{end}}
{{- end}}]({{$StatusURL}}){{printf "%.2f" .Result.Complete}}
{{- if .Result.Maxlag|maxlag}}
**MaxLagDetails:**
{{- with .Result.Maxlag}}
{{.Topic}}[{{.Status.String}}](){{printf "%.2f" .Complete}}
{{- if .Owner}}\n\tConsumerHost={{.Owner}}{{end}}
{{- if .ClientID}}\n\tConsumerClientID={{.ClientID}}{{end}}
\tCurrentLag={{.CurrentLag}}
\tPartition={{.Partition}}
{{- end}}
{{- end}}
{{- $TotalErrors := len .Result.Partitions}}
{{- if $TotalErrors}}
### {{$TotalErrors}} partitions have problems
>**CountPartitions:**
{{- range $k,$v := .Result.Partitions|partitioncounts}}
{{- if ne $v 0}}\n\t{{$k}}={{$v}}{{end}}
{{- end}}
**TopicsByStatus:**
{{- range $k,$v := .Result.Partitions|topicsbystatus}}
\t{{$k}}={{$v}}
{{- end}}
**PartitionDetails:**
{{- range .Result.Partitions}}
{{.Topic}}[{{.Status.String}}](){{printf "%.2f" .Complete}}
{{- if .Owner}}\n\tConsumerHost={{.Owner}}{{end}}
{{- if .ClientID}}\n\tConsumerClientID={{.ClientID}}{{end}}
\tCurrentLag={{.CurrentLag}}
\tPartition={{.Partition}}
\tStart={{formattimestamp .Start.Timestamp $FormatString}}
\t\tOffset={{.Start.Offset}}\tLag={{.Start.Lag.Value}}
\tEnd={{formattimestamp .End.Timestamp $FormatString}}
\t\tOffset={{.End.Offset}}\tLag={{.End.Lag.Value}}
{{- end}}
{{- end}}
"
}}
52 changes: 52 additions & 0 deletions config/default-wecom-post.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
{"msgtype": "markdown","markdown": {"content": "
{{- $StatusURL := "https://pkg.go.dev/github.com/linkedin/Burrow/core/protocol#StatusConstant"}}
{{- $FormatString := "2006-01-02 15:04:05"}}
# Kafka: {{.Cluster}}
ConsumerGroup: {{.Group}}{{- with .Result.Status}}
{{- if eq . 0}}NotFound{{end}}
{{- if eq . 1}}<font color=\"info\">normal</font>{{end}}
{{- if eq . 2}}<font color=\"warning\">lagging</font>{{end}}
{{- if eq . 3}}<font color=\"comment\">abnormal</font>{{end}}
{{- end}}
**Status:** Total(Partitions={{.Result.TotalPartitions}},Lag={{.Result.TotalLag}})[{{- with .Result.Status}}
{{- if eq . 0}}NotFound{{end}}
{{- if eq . 1}}<font color=\"info\">{{.}}</font>{{end}}
{{- if eq . 2}}<font color=\"warning\">{{.}}</font>{{end}}
{{- if eq . 3}}<font color=\"comment\">{{.}}</font>{{end}}
{{- end}}]({{$StatusURL}}){{printf "%.2f" .Result.Complete}}
{{- if .Result.Maxlag|maxlag}}
**MaxLagDetails:**
{{- with .Result.Maxlag}}
{{.Topic}}[{{.Status.String}}](){{printf "%.2f" .Complete}}
{{- if .Owner}}\n\tConsumerHost={{.Owner}}{{end}}
{{- if .ClientID}}\n\tConsumerClientID={{.ClientID}}{{end}}
\tCurrentLag={{.CurrentLag}}
\tPartition={{.Partition}}
{{- end}}
{{- end}}
{{- $TotalErrors := len .Result.Partitions}}
{{- if $TotalErrors}}
### <font color=\"comment\">{{$TotalErrors}} partitions have problems</font>
>**CountPartitions:**
{{- range $k,$v := .Result.Partitions|partitioncounts}}
{{- if ne $v 0}}\n\t{{$k}}={{$v}}{{end}}
{{- end}}
**TopicsByStatus:**
{{- range $k,$v := .Result.Partitions|topicsbystatus}}
\t{{$k}}={{$v}}
{{- end}}
**PartitionDetails:**
{{- range .Result.Partitions}}
{{.Topic}}[{{.Status.String}}](){{printf "%.2f" .Complete}}
{{- if .Owner}}\n\tConsumerHost={{.Owner}}{{end}}
{{- if .ClientID}}\n\tConsumerClientID={{.ClientID}}{{end}}
\tCurrentLag={{.CurrentLag}}
\tPartition={{.Partition}}
\tStart={{formattimestamp .Start.Timestamp $FormatString}}
\t\tOffset={{.Start.Offset}}\tLag={{.Start.Lag.Value}}
\tEnd={{formattimestamp .End.Timestamp $FormatString}}
\t\tOffset={{.End.Offset}}\tLag={{.End.Lag.Value}}
{{- end}}
{{- end}}
"
}}
6 changes: 6 additions & 0 deletions core/internal/helpers/coordinators.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ func (m *MockModule) GetName() string {
return args.String(0)
}

// GetCluster mocks the notifier.Module GetCluster func
func (m *MockModule) GetCluster() string {
args := m.Called()
return args.String(0)
}

// GetGroupAllowlist mocks the notifier.Module GetGroupAllowlist func
func (m *MockModule) GetGroupAllowlist() *regexp.Regexp {
args := m.Called()
Expand Down
2 changes: 2 additions & 0 deletions core/internal/httpserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ func (hc *Coordinator) configNotifierHTTP(w http.ResponseWriter, r *http.Request
SendClose: viper.GetBool(configRoot + ".send-close"),
ExtraCa: viper.GetString(configRoot + ".extra-ca"),
NoVerify: viper.GetString(configRoot + ".noverify"),
Cluster: viper.GetString(configRoot + ".cluster"),
},
Request: requestInfo,
})
Expand Down Expand Up @@ -265,6 +266,7 @@ func (hc *Coordinator) configNotifierEmail(w http.ResponseWriter, r *http.Reques
To: viper.GetString(configRoot + ".to"),
ExtraCa: viper.GetString(configRoot + ".extra-ca"),
NoVerify: viper.GetString(configRoot + ".noverify"),
Cluster: viper.GetString(configRoot + ".cluster"),
},
Request: requestInfo,
})
Expand Down
2 changes: 2 additions & 0 deletions core/internal/httpserver/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ type httpResponseConfigModuleNotifierHTTP struct {
SendClose bool `json:"send-close"`
ExtraCa string `json:"extra-ca"`
NoVerify string `json:"noverify"`
Cluster string `json:"cluster"`
}

type httpResponseConfigModuleNotifierSlack struct {
Expand Down Expand Up @@ -238,6 +239,7 @@ type httpResponseConfigModuleNotifierEmail struct {
To string `json:"to"`
ExtraCa string `json:"extra-ca"`
NoVerify string `json:"noverify"`
Cluster string `json:"cluster"`
}

type httpResponseConfigModuleNotifierNull struct {
Expand Down
13 changes: 11 additions & 2 deletions core/internal/notifier/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
type Module interface {
protocol.Module
GetName() string
GetCluster() string
GetGroupAllowlist() *regexp.Regexp
GetGroupDenylist() *regexp.Regexp
GetLogger() *zap.Logger
Expand Down Expand Up @@ -95,7 +96,7 @@ type Coordinator struct {

// getModuleForClass returns the correct module based on the passed className. As part of the Configure steps, if there
// is any error, it will panic with an appropriate message describing the problem.
func getModuleForClass(app *protocol.ApplicationContext, moduleName, className string, groupAllowlist, groupDenylist *regexp.Regexp, extras map[string]string, templateOpen, templateClose *template.Template) protocol.Module {
func getModuleForClass(app *protocol.ApplicationContext, moduleName, className string, groupAllowlist, groupDenylist *regexp.Regexp, extras map[string]string, templateOpen, templateClose *template.Template, cluster string) protocol.Module {
logger := app.Logger.With(
zap.String("type", "module"),
zap.String("coordinator", "notifier"),
Expand All @@ -113,6 +114,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s
extras: extras,
templateOpen: templateOpen,
templateClose: templateClose,
cluster: cluster,
}
case "email":
return &EmailNotifier{
Expand All @@ -123,6 +125,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s
extras: extras,
templateOpen: templateOpen,
templateClose: templateClose,
cluster: cluster,
}
case "null":
return &NullNotifier{
Expand All @@ -133,6 +136,7 @@ func getModuleForClass(app *protocol.ApplicationContext, moduleName, className s
extras: extras,
templateOpen: templateOpen,
templateClose: templateClose,
cluster: cluster,
}
default:
panic("Unknown notifier className provided: " + className)
Expand Down Expand Up @@ -194,6 +198,8 @@ func (nc *Coordinator) Configure() {
groupAllowlist = re
}

cluster := viper.GetString(configRoot + ".cluster")

// Compile the denylist for the consumer groups to not notify for
var groupDenylist *regexp.Regexp
denylist := viper.GetString(configRoot + ".group-denylist")
Expand Down Expand Up @@ -227,7 +233,7 @@ func (nc *Coordinator) Configure() {
templateClose = tmpl.Templates()[0]
}

module := getModuleForClass(nc.App, name, viper.GetString(configRoot+".class-name"), groupAllowlist, groupDenylist, extras, templateOpen, templateClose)
module := getModuleForClass(nc.App, name, viper.GetString(configRoot+".class-name"), groupAllowlist, groupDenylist, extras, templateOpen, templateClose, cluster)
module.Configure(name, configRoot)
nc.modules[name] = module
interval := viper.GetInt64(configRoot + ".interval")
Expand Down Expand Up @@ -436,6 +442,9 @@ func (nc *Coordinator) checkAndSendResponseToModules(response *protocol.Consumer
for _, genericModule := range nc.modules {
module := genericModule.(Module)

if module.GetCluster() != "" && response.Cluster != module.GetCluster() {
continue
}
// No allowlist means everything passes
groupAllowlist := module.GetGroupAllowlist()
groupDenylist := module.GetGroupDenylist()
Expand Down
Loading

0 comments on commit 1339403

Please sign in to comment.