Skip to content

Commit

Permalink
Merge pull request #13 from imriz/support_tag_filtering
Browse files Browse the repository at this point in the history
Support filtering by (multiple) tags
  • Loading branch information
joshm91 authored Dec 5, 2021
2 parents d429063 + d4d60c2 commit 368a4c1
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 21 deletions.
71 changes: 55 additions & 16 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ const (
nodeExporterPort = 11002
)

type tags map[string]string

var (
outFile = flag.String("output", "msk_file_sd.yml", "path of the file to write MSK discovery information to")
interval = flag.Duration("scrape-interval", 5*time.Minute, "interval at which to scrape the AWS API for MSK cluster information")
jobPrefix = flag.String("job-prefix", "msk", "string with which to prefix each job label")
clusterFilter = flag.String("filter", "", "a regex pattern to filter cluster names from the results")
awsRegion = flag.String("region", "", "the aws region in which to scan for MSK clusters")
outFile = flag.String("output", "msk_file_sd.yml", "path of the file to write MSK discovery information to")
interval = flag.Duration("scrape-interval", 5*time.Minute, "interval at which to scrape the AWS API for MSK cluster information")
jobPrefix = flag.String("job-prefix", "msk", "string with which to prefix each job label")
clusterFilter = flag.String("filter", "", "a regex pattern to filter cluster names from the results")
awsRegion = flag.String("region", "", "the aws region in which to scan for MSK clusters")
)

type kafkaClient interface {
Expand Down Expand Up @@ -56,6 +58,21 @@ type clusterDetails struct {
NodeExporter bool
}

type Filter struct {
NameFilter regexp.Regexp
TagFilter tags
}

func (i *tags) String() string {
return fmt.Sprint(*i)
}
func (i *tags) Set(value string) error {
split := strings.Split(value, "=")

(*i)[split[0]] = split[1]
return nil
}

// (ClusterDetails).StaticConfig generates a PrometheusStaticConfig based on the cluster's details
func (c clusterDetails) StaticConfig() PrometheusStaticConfig {
ret := PrometheusStaticConfig{}
Expand Down Expand Up @@ -132,11 +149,22 @@ func buildClusterDetails(svc kafkaClient, c types.ClusterInfo) (clusterDetails,
return cluster, nil
}

func filterClusters(clusters kafka.ListClustersOutput, filter regexp.Regexp) *kafka.ListClustersOutput {
func filterClusters(clusters kafka.ListClustersOutput, filter Filter) *kafka.ListClustersOutput {
var filteredClusters []types.ClusterInfo

var tagMatch bool
for _, cluster := range clusters.ClusterInfoList {
if filter.MatchString(*cluster.ClusterName) {
if len(filter.TagFilter) == 0 {
tagMatch = true
} else {
tagMatch = false
}
for tagKey, tagValue := range filter.TagFilter {
if cluster.Tags[tagKey] == tagValue {
tagMatch = true
break
}
}
if filter.NameFilter.MatchString(*cluster.ClusterName) && tagMatch {
filteredClusters = append(filteredClusters, cluster)
}
}
Expand All @@ -145,19 +173,23 @@ func filterClusters(clusters kafka.ListClustersOutput, filter regexp.Regexp) *ka
}

// GetStaticConfigs pulls a list of MSK clusters and brokers and returns a slice of PrometheusStaticConfigs
func GetStaticConfigs(svc kafkaClient, opt_filter ...regexp.Regexp) ([]PrometheusStaticConfig, error) {
filter, _ := regexp.Compile(``)
if len(opt_filter) > 0 {
filter = &opt_filter[0]
}

func GetStaticConfigs(svc kafkaClient, opt_filter ...Filter) ([]PrometheusStaticConfig, error) {
clusters, err := getClusters(svc)
if err != nil {
return []PrometheusStaticConfig{}, err
}
staticConfigs := []PrometheusStaticConfig{}

clusters = filterClusters(*clusters, *filter)
// Assign a default Filter, if none is passed.
defaultNameRegex, _ := regexp.Compile(``)
filter := Filter{
NameFilter: *defaultNameRegex,
}
if len(opt_filter) > 0 {
filter = opt_filter[0]
}

clusters = filterClusters(*clusters, filter)

for _, cluster := range clusters.ClusterInfoList {
clusterDetails, err := buildClusterDetails(svc, cluster)
Expand All @@ -174,6 +206,8 @@ func GetStaticConfigs(svc kafkaClient, opt_filter ...regexp.Regexp) ([]Prometheu
}

func main() {
var tagFilters tags = make(tags)
flag.Var(&tagFilters, "tag", "A key=value for filtering by tags. Flag can be specified multiple times, resulting OR expression.")
flag.Parse()

cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion(*awsRegion), config.WithEC2IMDSRegion())
Expand All @@ -191,7 +225,12 @@ func main() {
return
}

staticConfigs, err := GetStaticConfigs(client, *regexpFilter)
filter := Filter{
NameFilter: *regexpFilter,
TagFilter: tagFilters,
}

staticConfigs, err := GetStaticConfigs(client, filter)
if err != nil {
fmt.Println(err)
return
Expand Down
67 changes: 62 additions & 5 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,23 @@ func strPtr(str string) *string {
func Test_filterClusters(t *testing.T) {
type args struct {
clusters kafka.ListClustersOutput
filter regexp.Regexp
filter Filter
}
defaultFilter := Filter{
NameFilter: *(regexp.MustCompile(``)),
}

testClusterFilter := Filter{
NameFilter: *(regexp.MustCompile(`test`)),
}

defaultFilter, _ := regexp.Compile(``)
testClusterFilter, _ := regexp.Compile(`test`)
tagFilter := Filter{
NameFilter: *(regexp.MustCompile(``)),
TagFilter: map[string]string{
"Enviroment": "test",
"SomeOther": "tag",
},
}

tests := []struct {
name string
Expand All @@ -190,7 +202,7 @@ func Test_filterClusters(t *testing.T) {
},
},
},
filter: *defaultFilter,
filter: defaultFilter,
},
want: &kafka.ListClustersOutput{
ClusterInfoList: []types.ClusterInfo{
Expand All @@ -213,12 +225,57 @@ func Test_filterClusters(t *testing.T) {
},
},
},
filter: *testClusterFilter,
filter: testClusterFilter,
},
want: &kafka.ListClustersOutput{
ClusterInfoList: []types.ClusterInfo{
{
ClusterName: strPtr("test-cluster"),
},
},
},
},
{
name: "test-tag-filter",
args: args{
clusters: kafka.ListClustersOutput{
ClusterInfoList: []types.ClusterInfo{
{
ClusterName: strPtr("test-cluster"),
Tags: map[string]string{
"Enviroment": "test",
"SomeOther": "DifferentTag",
},
},
{
ClusterName: strPtr("second-test-cluster"),
Tags: map[string]string{
"Enviroment": "staging",
"SomeOther": "tag",
},
},
{
ClusterName: strPtr("filtered-cluster"),
},
},
},
filter: tagFilter,
},
want: &kafka.ListClustersOutput{
ClusterInfoList: []types.ClusterInfo{
{
ClusterName: strPtr("test-cluster"),
Tags: map[string]string{
"Enviroment": "test",
"SomeOther": "DifferentTag",
},
},
{
ClusterName: strPtr("second-test-cluster"),
Tags: map[string]string{
"Enviroment": "staging",
"SomeOther": "tag",
},
},
},
},
Expand Down

0 comments on commit 368a4c1

Please sign in to comment.