Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingest from Publisher side to track drop rate #2

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions clients.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package main

import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"strconv"
"sync"
)

type RawMsgs struct {
Records []PubsubMsg `json:"records,omitempty"`
}

type KustoIngestionClient struct {
mutex sync.Mutex
requestUri string
pubSubMsgs []PubsubMsg
counter int
batchSize int
}

func NewKustoIngestionClient() KustoIngestionClient {
targetUri := os.Getenv("TARGETURI")
if targetUri == "" {
log.Fatalf("Failed to set target uri env var")
}

functionKey := os.Getenv("FUNCTIONKEY")
if functionKey == "" {
log.Fatalf("Failed to set function key env var")
}

INGESTIONSERVICEBATCHSIZE := os.Getenv("INGESTIONSERVICEBATCHSIZE")
batchSize, err := strconv.Atoi(INGESTIONSERVICEBATCHSIZE)
if INGESTIONSERVICEBATCHSIZE == "" || err != nil {
log.Fatalf("Failed to set INGESTIONSERVICEBATCHSIZE %v", err)
}

requestUri := targetUri + "?code=" + functionKey;

pubSubMsgs := make([]PubsubMsg, 0, batchSize)

return KustoIngestionClient {
requestUri:requestUri,
pubSubMsgs: pubSubMsgs,
batchSize: batchSize,
}
}

func (client *KustoIngestionClient) SendAsync(msg PubsubMsg) error {
client.mutex.Lock()
defer client.mutex.Unlock()

client.pubSubMsgs = append(client.pubSubMsgs, msg);
client.counter += 1

if client.counter == client.batchSize {
msgs := make([]PubsubMsg, client.batchSize, client.batchSize)
copy(msgs, client.pubSubMsgs)

client.counter = 0
client.pubSubMsgs = make([]PubsubMsg, 0, client.batchSize)
return client.SendAsyncBatch(msgs)
}

return nil
}

func (client *KustoIngestionClient) SendAsyncBatch(msgs []PubsubMsg) error {
rawMsgs := RawMsgs{Records: msgs}

buf, err := json.Marshal(rawMsgs)
if err != nil {
log.Fatalf(err.Error())
}

var resp *http.Response
for i := 0; i < 3; i++ {
resp, err = http.Post(client.requestUri, "application/json", bytes.NewBuffer(buf))
if err == nil {
break
}
fmt.Printf("Error publishing count %d with error: %v \n", i, err)
}

if err != nil {
return err
}

fmt.Println("Response Status:", resp.Status)

return nil
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require github.com/dapr/go-sdk v1.6.0

require (
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
golang.org/x/net v0.0.0-20220621193019-9d032be2e588 // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
Expand Down
35 changes: 35 additions & 0 deletions helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package main

import (
"time"

"github.com/google/uuid"
)

func getObj(i string, batchCorrelationId string) PubsubMsg {
now := time.Now().UTC().Format("2006-01-02T15:04:05.000Z")
return PubsubMsg{
Key: i,
ID: now,
Details: map[string]interface{}{"missing_labels": []interface{}{"test"}},
EventType: "violation_audited",
Group: "constraints.gatekeeper.sh",
Version: "v1beta1",
Kind: "K8sRequiredLabels",
Name: "pod-must-have-test",
Namespace: "",
Message: "you must provide labels: {\"test\"}",
EnforcementAction: "deny",
ConstraintAnnotations: map[string]string(nil),
ResourceGroup: "",
ResourceAPIVersion: "v1",
ResourceKind: "Pod",
ResourceNamespace: "nginx",
ResourceName: "dywuperf-deployment-10kpods-69bd64c867-h2wdx",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we parameterize this? We can also suffix with violation count 1 to n being the total number of pods in the test.

ResourceLabels: map[string]string{"app": "dywuperf-app-100kpods", "pod-template-hash": "69bd64c867"},
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here as well

Timestamp: now,
BrokerName: BROKERNAME,
UserAgent: "Publisher",
CorrelationId: uuid.New().String(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my info - are guid not supported in Go? Looks like uuid is more universal with 128-bit but wondering if that is needed. Could maybe save us from importing a package ....

BatchCorrelationId: batchCorrelationId}
}
91 changes: 48 additions & 43 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"strconv"
"time"

"github.com/dapr/go-sdk/client"
"github.com/google/uuid"
)

// code
var (
PUBSUB_NAME = "pubsub"
TOPIC_NAME = "audit"
Expand Down Expand Up @@ -51,6 +52,11 @@ type PubsubMsg struct {
// Additional Metadata for benchmarking
BrokerName string `json:"brokerName,omitempty"`
Timestamp string `json:"timestamp,omitempty"`
UserAgent string `json:"userAgent,omitempty"`
CorrelationId string `json:"correlationId,omitempty"`
PublishedTimestamp string `json:"publishedTimestamp,omitempty"`
ReceivedTimestamp string `json:"receivedTimestamp,omitempty"`
BatchCorrelationId string `json:"batchCorrelationId,omitempty"`
}

func main() {
Expand All @@ -69,58 +75,57 @@ func main() {
time.Sleep(300 * time.Hour)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

configurable? In case we need to adjust intervals in the future

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch will add a forever loop

}

func getObj(i string) interface{} {
now := time.Now().UTC().Format("2006-01-02T15:04:05.000Z")
return PubsubMsg{
Key: i,
ID: now,
Details: map[string]interface{}{"missing_labels": []interface{}{"test"}},
EventType: "violation_audited",
Group: "constraints.gatekeeper.sh",
Version: "v1beta1",
Kind: "K8sRequiredLabels",
Name: "pod-must-have-test",
Namespace: "",
Message: "you must provide labels: {\"test\"}",
EnforcementAction: "deny",
ConstraintAnnotations: map[string]string(nil),
ResourceGroup: "",
ResourceAPIVersion: "v1",
ResourceKind: "Pod",
ResourceNamespace: "nginx",
ResourceName: "dywuperf-deployment-10kpods-69bd64c867-h2wdx",
ResourceLabels: map[string]string{"app": "dywuperf-app-100kpods", "pod-template-hash": "69bd64c867"},
Timestamp: now,
BrokerName: BROKERNAME}
}

func (r *Dapr) Send() {
var kustoIngestionClient = NewKustoIngestionClient()

value := os.Getenv("NUMBER")
log.Printf("sending %s messages", value)
total, err := strconv.Atoi(value)
if err != nil {
log.Fatalf("error getting number: %v", err)
total = 100000
total = 1000000
}
ctx := context.Background()
start_time := time.Now()
log.Println("starting publish")

for i := 0; i < total; i++ {
test := getObj(strconv.Itoa(i))
jsonData, err := json.Marshal(test)
if err != nil {
log.Fatalf("error marshaling data: %v", err)
}

for _, c := range r.client {
//Using Dapr SDK to publish a topic
// log.Println("Published data: " + string(jsonData))
if err := c.client.PublishEvent(ctx, PUBSUB_NAME, TOPIC_NAME, jsonData); err != nil {
panic(err)
// Always run
for true {
log.Println("starting publish")
start_time := time.Now()

// Each batch has a separate correlation id
batchCorrelationId := uuid.New().String();

for i := 0; i < total; i ++ {
msg := getObj(strconv.Itoa(i), batchCorrelationId)

for _, c := range r.client {

// Send to ingestion service to track drop rate
err = kustoIngestionClient.SendAsync(msg)
if err != nil {
// Continue if ingestion error, to do track count of these
log.Printf("Ingestion error for batchCorrelationId: %s, %v", batchCorrelationId, err)
}

// Set the published timestamp
msg.PublishedTimestamp = time.Now().UTC().Format("2006-01-02T15:04:05.000Z")
jsonData, err := json.Marshal(msg)
if err != nil {
log.Fatalf("error marshaling data: %v", err)
}

// Using Dapr SDK to publish a topic and send to Subscriber
// What is the publisher's streaming rate? Does the batching above significantly impact this rate
if err := c.client.PublishEvent(ctx, PUBSUB_NAME, TOPIC_NAME, jsonData); err != nil {
panic(err)
}
}
}

end_time := time.Now()
log.Printf("total time it took %v", end_time.Sub(start_time))

fmt.Println("Sleep starting now: ", time.Now())
time.Sleep(15*time.Minute);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Configurable? Not sure if this 15 minute or the 5 minute inside main() actually toggles the interval simulating GK audit.

}
end_time := time.Now()
log.Printf("total time it took %v", end_time.Sub(start_time))
}
20 changes: 13 additions & 7 deletions pub.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
---
# ---
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
Expand All @@ -21,9 +21,9 @@ spec:
# name: pubsub
# namespace: gatekeeper-system
# spec:
# type: pubsub.in-memory
# version: v1
# metadata: []
# type: pubsub.in-memory
# version: v1
# metadata: []
---
apiVersion: apps/v1
kind: Deployment
Expand All @@ -49,10 +49,16 @@ spec:
spec:
containers:
- name: go-pub
image: docker.io/jaydipgabani/publisher:latest
image: docker.io/noreisch/test-publisher:0.0.13
imagePullPolicy: Always
env:
- name: NUMBER
value: "1000"
value: "20000"
- name: BROKERNAME
value: "Redis"
value: "Redis"
- name: TARGETURI
value: ""
- name: FUNCTIONKEY
value: ""
- name: INGESTIONSERVICEBATCHSIZE
value: "1000"
9 changes: 9 additions & 0 deletions vendor/github.com/google/uuid/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions vendor/github.com/google/uuid/CONTRIBUTING.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions vendor/github.com/google/uuid/CONTRIBUTORS

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions vendor/github.com/google/uuid/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading