Skip to content

Commit

Permalink
Merge pull request #113 from ddosify/develop
Browse files Browse the repository at this point in the history
consume k8s events faster
  • Loading branch information
fatihbaltaci authored Mar 19, 2024
2 parents d5a1da6 + 66fc825 commit c69c6c2
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 18 deletions.
36 changes: 22 additions & 14 deletions datastore/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ var tag string
var kernelVersion string
var cloudProvider CloudProvider

var resourceBatchSize int64 = 1000 // maximum batch size for resources, it must be bigger or at least equal to chan sizes in order to avoid blocking
var innerMetricsPort int = 8182
var innerGpuMetricsPort int = 8183

func init() {

TestMode := os.Getenv("TEST_MODE")
Expand Down Expand Up @@ -128,10 +132,6 @@ func getCloudProvider() CloudProvider {
return CloudProviderUnknown
}

var resourceBatchSize int64 = 50
var innerMetricsPort int = 8182
var innerGpuMetricsPort int = 8183

// BackendDS is a backend datastore
type BackendDS struct {
ctx context.Context
Expand Down Expand Up @@ -206,7 +206,7 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendDSConfig) *Backe
retryClient.Backoff = retryablehttp.DefaultBackoff
retryClient.RetryWaitMin = 1 * time.Second
retryClient.RetryWaitMax = 5 * time.Second
retryClient.RetryMax = 4
retryClient.RetryMax = 2

retryClient.CheckRetry = func(ctx context.Context, resp *http.Response, err error) (bool, error) {
var shouldRetry bool
Expand Down Expand Up @@ -278,6 +278,8 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendDSConfig) *Backe
bs = defaultBatchSize
}

resourceChanSize := 200

ds := &BackendDS{
ctx: ctx,
host: conf.Host,
Expand All @@ -288,21 +290,27 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendDSConfig) *Backe
traceInfoPool: newTraceInfoPool(func() *TraceInfo { return &TraceInfo{} }, func(r *TraceInfo) {}),
reqChanBuffer: make(chan *ReqInfo, conf.ReqBufferSize),
connChanBuffer: make(chan *ConnInfo, conf.ConnBufferSize),
podEventChan: make(chan interface{}, 100),
svcEventChan: make(chan interface{}, 100),
rsEventChan: make(chan interface{}, 100),
depEventChan: make(chan interface{}, 50),
epEventChan: make(chan interface{}, 100),
containerEventChan: make(chan interface{}, 100),
dsEventChan: make(chan interface{}, 20),
podEventChan: make(chan interface{}, 5*resourceChanSize),
svcEventChan: make(chan interface{}, 2*resourceChanSize),
rsEventChan: make(chan interface{}, 2*resourceChanSize),
depEventChan: make(chan interface{}, 2*resourceChanSize),
epEventChan: make(chan interface{}, resourceChanSize),
containerEventChan: make(chan interface{}, 5*resourceChanSize),
dsEventChan: make(chan interface{}, resourceChanSize),
traceEventQueue: list.New(),
}

go ds.sendReqsInBatch(bs)
go ds.sendConnsInBatch(bs)
go ds.sendTraceEventsInBatch(10 * bs)

eventsInterval := 10 * time.Second
// events are resynced every 60 seconds on k8s informers
// resourceBatchSize ~ burst size, if more than resourceBatchSize events are sent in a moment, blocking can occur
// resync period / event interval = 60 / 5 = 12
// 12 * resourceBatchSize = 12 * 1000 = 12000
// it can send upto 12k events in 60 seconds
// seems safe enough, if not, we can increase the buffer size
eventsInterval := 5 * time.Second
go ds.sendEventsInBatch(ds.podEventChan, podEndpoint, eventsInterval)
go ds.sendEventsInBatch(ds.svcEventChan, svcEndpoint, eventsInterval)
go ds.sendEventsInBatch(ds.rsEventChan, rsEndpoint, eventsInterval)
Expand Down Expand Up @@ -641,7 +649,7 @@ func (b *BackendDS) send(ch <-chan interface{}, endpoint string) {
select {
case ev := <-ch:
batch = append(batch, ev)
case <-time.After(1 * time.Second):
case <-time.After(100 * time.Millisecond):
loop = false
}
}
Expand Down
2 changes: 0 additions & 2 deletions ebpf/l7_req/l7.c
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,6 @@ int process_enter_of_syscalls_write_sendto(void* ctx, __u64 fd, __u8 is_tls, cha
args.write_start_ns = timestamp;
bpf_map_update_elem(&active_writes, &id, &args, BPF_ANY);
}
unsigned char log_msg[] = "parse_client_postgres_data -- count||";
log_to_userspace(ctx, DEBUG, func_name, log_msg, count, 0, 0);
req->protocol = PROTOCOL_POSTGRES;
}else if (is_rabbitmq_publish(buf,count)){
req->protocol = PROTOCOL_AMQP;
Expand Down
4 changes: 2 additions & 2 deletions k8s/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const (
)

var k8sVersion string
var resynPeriod time.Duration = 60 * time.Second
var resyncPeriod time.Duration = 120 * time.Second

type K8sCollector struct {
ctx context.Context
Expand Down Expand Up @@ -189,7 +189,7 @@ func NewK8sCollector(parentCtx context.Context) (*K8sCollector, error) {

k8sVersion = version.String()

factory := informers.NewSharedInformerFactory(clientset, resynPeriod)
factory := informers.NewSharedInformerFactory(clientset, resyncPeriod)

collector := &K8sCollector{
ctx: ctx,
Expand Down

0 comments on commit c69c6c2

Please sign in to comment.