-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ingest from Publisher side to track drop rate #2
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
package main | ||
|
||
import ( | ||
"bytes" | ||
"encoding/json" | ||
"fmt" | ||
"log" | ||
"net/http" | ||
"os" | ||
"strconv" | ||
"sync" | ||
) | ||
|
||
type RawMsgs struct { | ||
Records []PubsubMsg `json:"records,omitempty"` | ||
} | ||
|
||
type KustoIngestionClient struct { | ||
mutex sync.Mutex | ||
requestUri string | ||
pubSubMsgs []PubsubMsg | ||
counter int | ||
batchSize int | ||
} | ||
|
||
func NewKustoIngestionClient() KustoIngestionClient { | ||
targetUri := os.Getenv("TARGETURI") | ||
if targetUri == "" { | ||
log.Fatalf("Failed to set target uri env var") | ||
} | ||
|
||
functionKey := os.Getenv("FUNCTIONKEY") | ||
if functionKey == "" { | ||
log.Fatalf("Failed to set function key env var") | ||
} | ||
|
||
INGESTIONSERVICEBATCHSIZE := os.Getenv("INGESTIONSERVICEBATCHSIZE") | ||
batchSize, err := strconv.Atoi(INGESTIONSERVICEBATCHSIZE) | ||
if INGESTIONSERVICEBATCHSIZE == "" || err != nil { | ||
log.Fatalf("Failed to set INGESTIONSERVICEBATCHSIZE %v", err) | ||
} | ||
|
||
requestUri := targetUri + "?code=" + functionKey; | ||
|
||
pubSubMsgs := make([]PubsubMsg, 0, batchSize) | ||
|
||
return KustoIngestionClient { | ||
requestUri:requestUri, | ||
pubSubMsgs: pubSubMsgs, | ||
batchSize: batchSize, | ||
} | ||
} | ||
|
||
func (client *KustoIngestionClient) SendAsync(msg PubsubMsg) error { | ||
client.mutex.Lock() | ||
defer client.mutex.Unlock() | ||
|
||
client.pubSubMsgs = append(client.pubSubMsgs, msg); | ||
client.counter += 1 | ||
|
||
if client.counter == client.batchSize { | ||
msgs := make([]PubsubMsg, client.batchSize, client.batchSize) | ||
copy(msgs, client.pubSubMsgs) | ||
|
||
client.counter = 0 | ||
client.pubSubMsgs = make([]PubsubMsg, 0, client.batchSize) | ||
return client.SendAsyncBatch(msgs) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (client *KustoIngestionClient) SendAsyncBatch(msgs []PubsubMsg) error { | ||
rawMsgs := RawMsgs{Records: msgs} | ||
|
||
buf, err := json.Marshal(rawMsgs) | ||
if err != nil { | ||
log.Fatalf(err.Error()) | ||
} | ||
|
||
var resp *http.Response | ||
for i := 0; i < 3; i++ { | ||
resp, err = http.Post(client.requestUri, "application/json", bytes.NewBuffer(buf)) | ||
if err == nil { | ||
break | ||
} | ||
fmt.Printf("Error publishing count %d with error: %v \n", i, err) | ||
} | ||
|
||
if err != nil { | ||
return err | ||
} | ||
|
||
fmt.Println("Response Status:", resp.Status) | ||
|
||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
package main | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/google/uuid" | ||
) | ||
|
||
func getObj(i string, batchCorrelationId string) PubsubMsg { | ||
now := time.Now().UTC().Format("2006-01-02T15:04:05.000Z") | ||
return PubsubMsg{ | ||
Key: i, | ||
ID: now, | ||
Details: map[string]interface{}{"missing_labels": []interface{}{"test"}}, | ||
EventType: "violation_audited", | ||
Group: "constraints.gatekeeper.sh", | ||
Version: "v1beta1", | ||
Kind: "K8sRequiredLabels", | ||
Name: "pod-must-have-test", | ||
Namespace: "", | ||
Message: "you must provide labels: {\"test\"}", | ||
EnforcementAction: "deny", | ||
ConstraintAnnotations: map[string]string(nil), | ||
ResourceGroup: "", | ||
ResourceAPIVersion: "v1", | ||
ResourceKind: "Pod", | ||
ResourceNamespace: "nginx", | ||
ResourceName: "dywuperf-deployment-10kpods-69bd64c867-h2wdx", | ||
ResourceLabels: map[string]string{"app": "dywuperf-app-100kpods", "pod-template-hash": "69bd64c867"}, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here as well |
||
Timestamp: now, | ||
BrokerName: BROKERNAME, | ||
UserAgent: "Publisher", | ||
CorrelationId: uuid.New().String(), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just for my info - are guid not supported in Go? Looks like uuid is more universal with 128-bit but wondering if that is needed. Could maybe save us from importing a package .... |
||
BatchCorrelationId: batchCorrelationId} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,15 +4,16 @@ package main | |
import ( | ||
"context" | ||
"encoding/json" | ||
"fmt" | ||
"log" | ||
"os" | ||
"strconv" | ||
"time" | ||
|
||
"github.com/dapr/go-sdk/client" | ||
"github.com/google/uuid" | ||
) | ||
|
||
// code | ||
var ( | ||
PUBSUB_NAME = "pubsub" | ||
TOPIC_NAME = "audit" | ||
|
@@ -51,6 +52,11 @@ type PubsubMsg struct { | |
// Additional Metadata for benchmarking | ||
BrokerName string `json:"brokerName,omitempty"` | ||
Timestamp string `json:"timestamp,omitempty"` | ||
UserAgent string `json:"userAgent,omitempty"` | ||
CorrelationId string `json:"correlationId,omitempty"` | ||
PublishedTimestamp string `json:"publishedTimestamp,omitempty"` | ||
ReceivedTimestamp string `json:"receivedTimestamp,omitempty"` | ||
BatchCorrelationId string `json:"batchCorrelationId,omitempty"` | ||
} | ||
|
||
func main() { | ||
|
@@ -69,58 +75,57 @@ func main() { | |
time.Sleep(300 * time.Hour) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. configurable? In case we need to adjust intervals in the future There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch will add a forever loop |
||
} | ||
|
||
func getObj(i string) interface{} { | ||
now := time.Now().UTC().Format("2006-01-02T15:04:05.000Z") | ||
return PubsubMsg{ | ||
Key: i, | ||
ID: now, | ||
Details: map[string]interface{}{"missing_labels": []interface{}{"test"}}, | ||
EventType: "violation_audited", | ||
Group: "constraints.gatekeeper.sh", | ||
Version: "v1beta1", | ||
Kind: "K8sRequiredLabels", | ||
Name: "pod-must-have-test", | ||
Namespace: "", | ||
Message: "you must provide labels: {\"test\"}", | ||
EnforcementAction: "deny", | ||
ConstraintAnnotations: map[string]string(nil), | ||
ResourceGroup: "", | ||
ResourceAPIVersion: "v1", | ||
ResourceKind: "Pod", | ||
ResourceNamespace: "nginx", | ||
ResourceName: "dywuperf-deployment-10kpods-69bd64c867-h2wdx", | ||
ResourceLabels: map[string]string{"app": "dywuperf-app-100kpods", "pod-template-hash": "69bd64c867"}, | ||
Timestamp: now, | ||
BrokerName: BROKERNAME} | ||
} | ||
|
||
func (r *Dapr) Send() { | ||
var kustoIngestionClient = NewKustoIngestionClient() | ||
|
||
value := os.Getenv("NUMBER") | ||
log.Printf("sending %s messages", value) | ||
total, err := strconv.Atoi(value) | ||
if err != nil { | ||
log.Fatalf("error getting number: %v", err) | ||
total = 100000 | ||
total = 1000000 | ||
} | ||
ctx := context.Background() | ||
start_time := time.Now() | ||
log.Println("starting publish") | ||
|
||
for i := 0; i < total; i++ { | ||
test := getObj(strconv.Itoa(i)) | ||
jsonData, err := json.Marshal(test) | ||
if err != nil { | ||
log.Fatalf("error marshaling data: %v", err) | ||
} | ||
|
||
for _, c := range r.client { | ||
//Using Dapr SDK to publish a topic | ||
// log.Println("Published data: " + string(jsonData)) | ||
if err := c.client.PublishEvent(ctx, PUBSUB_NAME, TOPIC_NAME, jsonData); err != nil { | ||
panic(err) | ||
// Always run | ||
for true { | ||
log.Println("starting publish") | ||
start_time := time.Now() | ||
|
||
// Each batch has a separate correlation id | ||
batchCorrelationId := uuid.New().String(); | ||
|
||
for i := 0; i < total; i ++ { | ||
msg := getObj(strconv.Itoa(i), batchCorrelationId) | ||
|
||
for _, c := range r.client { | ||
|
||
// Send to ingestion service to track drop rate | ||
err = kustoIngestionClient.SendAsync(msg) | ||
if err != nil { | ||
// Continue if ingestion error, to do track count of these | ||
log.Printf("Ingestion error for batchCorrelationId: %s, %v", batchCorrelationId, err) | ||
} | ||
|
||
// Set the published timestamp | ||
msg.PublishedTimestamp = time.Now().UTC().Format("2006-01-02T15:04:05.000Z") | ||
jsonData, err := json.Marshal(msg) | ||
if err != nil { | ||
log.Fatalf("error marshaling data: %v", err) | ||
} | ||
|
||
// Using Dapr SDK to publish a topic and send to Subscriber | ||
// What is the publisher's streaming rate? Does the batching above significantly impact this rate | ||
if err := c.client.PublishEvent(ctx, PUBSUB_NAME, TOPIC_NAME, jsonData); err != nil { | ||
panic(err) | ||
} | ||
} | ||
} | ||
|
||
end_time := time.Now() | ||
log.Printf("total time it took %v", end_time.Sub(start_time)) | ||
|
||
fmt.Println("Sleep starting now: ", time.Now()) | ||
time.Sleep(15*time.Minute); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Configurable? Not sure if this 15 minute or the 5 minute inside main() actually toggles the interval simulating GK audit. |
||
} | ||
end_time := time.Now() | ||
log.Printf("total time it took %v", end_time.Sub(start_time)) | ||
} |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we parameterize this? We can also suffix with violation count 1 to n being the total number of pods in the test.