diff --git a/aggregator/data.go b/aggregator/data.go index 2d8bf4e..abb99ea 100644 --- a/aggregator/data.go +++ b/aggregator/data.go @@ -324,12 +324,13 @@ func (a *Aggregator) processEbpf(ctx context.Context) { ctxPid := context.WithValue(a.ctx, log.LOG_CONTEXT, fmt.Sprint(d.Pid)) go a.signalTlsAttachment(d.Pid) a.processL7(ctxPid, d) - case l7_req.TRACE_EVENT: - d := data.(*l7_req.TraceEvent) - rateLimiter := a.getRateLimiterForPid(d.Pid) - if rateLimiter.Allow() { - a.ds.PersistTraceEvent(d) - } + // dist tracing disabled by default temporarily + // case l7_req.TRACE_EVENT: + // d := data.(*l7_req.TraceEvent) + // rateLimiter := a.getRateLimiterForPid(d.Pid) + // if rateLimiter.Allow() { + // a.ds.PersistTraceEvent(d) + // } } } } @@ -1050,8 +1051,9 @@ func (a *Aggregator) processKafkaEvent(ctx context.Context, d *l7_req.L7Event) { Key: msg.Key, Value: msg.Value, Type: msg.Type, - Tid: d.Tid, - Seq: d.Seq, + // dist tracing disabled by default temporarily + // Tid: d.Tid, + // Seq: d.Seq, } err := a.setFromToV2(addrPair, d, event, "") @@ -1084,8 +1086,9 @@ func (a *Aggregator) processAmqpEvent(ctx context.Context, d *l7_req.L7Event) { FailReason: "", Method: d.Method, Path: "", - Tid: d.Tid, - Seq: d.Seq, + // dist tracing disabled by default temporarily + // Tid: d.Tid, + // Seq: d.Seq, } err := a.setFromToV2(addrPair, d, reqDto, "") @@ -1124,8 +1127,9 @@ func (a *Aggregator) processRedisEvent(ctx context.Context, d *l7_req.L7Event) { FailReason: "", Method: d.Method, Path: query, - Tid: d.Tid, - Seq: d.Seq, + // dist tracing disabled by default temporarily + // Tid: d.Tid, + // Seq: d.Seq, } err := a.setFromToV2(addrPair, d, reqDto, "") @@ -1216,8 +1220,9 @@ func (a *Aggregator) processHttpEvent(ctx context.Context, d *l7_req.L7Event) { FailReason: "", Method: d.Method, Path: path, - Tid: d.Tid, - Seq: d.Seq, + // dist tracing disabled by default temporarily + // Tid: d.Tid, + // Seq: d.Seq, } err := a.setFromToV2(addrPair, d, reqDto, reqHostHeader) @@ -1256,8 +1261,9 @@ func (a *Aggregator) processMySQLEvent(ctx context.Context, d *l7_req.L7Event) { FailReason: "", Method: d.Method, Path: query, - Tid: d.Tid, - Seq: d.Seq, + // dist tracing disabled by default temporarily + // Tid: d.Tid, + // Seq: d.Seq, } err = a.setFromToV2(addrPair, d, reqDto, "") @@ -1297,8 +1303,9 @@ func (a *Aggregator) processPostgresEvent(ctx context.Context, d *l7_req.L7Event FailReason: "", Method: d.Method, Path: query, - Tid: d.Tid, - Seq: d.Seq, + // dist tracing disabled by default temporarily + // Tid: d.Tid, + // Seq: d.Seq, } err = a.setFromToV2(addrPair, d, reqDto, "") diff --git a/datastore/backend.go b/datastore/backend.go index 11fdac4..dbc7928 100644 --- a/datastore/backend.go +++ b/datastore/backend.go @@ -2,7 +2,6 @@ package datastore import ( "bytes" - "container/list" "context" "encoding/json" "fmt" @@ -13,11 +12,9 @@ import ( "sort" "strconv" "strings" - "sync" "time" "github.com/ddosify/alaz/config" - "github.com/ddosify/alaz/ebpf/l7_req" "github.com/ddosify/alaz/gpu" "github.com/ddosify/alaz/log" @@ -148,10 +145,10 @@ type BackendDS struct { aliveConnPool *poolutil.Pool[*ConnInfo] kafkaEventInfoPool *poolutil.Pool[*KafkaEventInfo] - traceEventQueue *list.List - traceEventMu sync.RWMutex - - traceInfoPool *poolutil.Pool[*TraceInfo] + // dist tracing disabled by default temporarily + // traceEventQueue *list.List + // traceEventMu sync.RWMutex + // traceInfoPool *poolutil.Pool[*TraceInfo] metricsExport bool gpuMetricsExport bool @@ -172,21 +169,21 @@ type BackendDS struct { } const ( - podEndpoint = "/pod/" - svcEndpoint = "/svc/" - rsEndpoint = "/replicaset/" - depEndpoint = "/deployment/" - epEndpoint = "/endpoint/" - containerEndpoint = "/container/" - dsEndpoint = "/daemonset/" - ssEndpoint = "/statefulset/" - reqEndpoint = "/requests/" - connEndpoint = "/connections/" - kafkaEventEndpoint = "/events/kafka/" - - traceEventEndpoint = "/dist_tracing/traffic/" - + podEndpoint = "/pod/" + svcEndpoint = "/svc/" + rsEndpoint = "/replicaset/" + depEndpoint = "/deployment/" + epEndpoint = "/endpoint/" + containerEndpoint = "/container/" + dsEndpoint = "/daemonset/" + ssEndpoint = "/statefulset/" + reqEndpoint = "/requests/" + connEndpoint = "/connections/" + kafkaEventEndpoint = "/events/kafka/" healthCheckEndpoint = "/healthcheck/" + + // dist tracing disabled by default temporarily + // traceEventEndpoint = "/dist_tracing/traffic/" ) type LeveledLogger struct { @@ -296,7 +293,6 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendDSConfig) *Backe reqInfoPool: newReqInfoPool(func() *ReqInfo { return &ReqInfo{} }, func(r *ReqInfo) {}), aliveConnPool: newAliveConnPool(func() *ConnInfo { return &ConnInfo{} }, func(r *ConnInfo) {}), kafkaEventInfoPool: newKafkaEventPool(func() *KafkaEventInfo { return &KafkaEventInfo{} }, func(r *KafkaEventInfo) {}), - traceInfoPool: newTraceInfoPool(func() *TraceInfo { return &TraceInfo{} }, func(r *TraceInfo) {}), reqChanBuffer: make(chan *ReqInfo, conf.ReqBufferSize), connChanBuffer: make(chan *ConnInfo, conf.ConnBufferSize), kafkaChanBuffer: make(chan *KafkaEventInfo, conf.ReqBufferSize), @@ -308,10 +304,11 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendDSConfig) *Backe containerEventChan: make(chan interface{}, 5*resourceChanSize), dsEventChan: make(chan interface{}, resourceChanSize), ssEventChan: make(chan interface{}, resourceChanSize), - traceEventQueue: list.New(), metricsExport: conf.MetricsExport, gpuMetricsExport: conf.GpuMetricsExport, metricsExportInterval: conf.MetricsExportInterval, + // traceEventQueue: list.New(), + // traceInfoPool: newTraceInfoPool(func() *TraceInfo { return &TraceInfo{} }, func(r *TraceInfo) {}), } return ds @@ -321,7 +318,8 @@ func (ds *BackendDS) Start() { go ds.sendReqsInBatch(ds.batchSize) go ds.sendConnsInBatch(ds.batchSize / 2) go ds.sendKafkaEventsInBatch(ds.batchSize / 2) - go ds.sendTraceEventsInBatch(10 * ds.batchSize) + + // go ds.sendTraceEventsInBatch(10 * ds.batchSize) // events are resynced every 60 seconds on k8s informers // resourceBatchSize ~ burst size, if more than resourceBatchSize events are sent in a moment, blocking can occur @@ -402,32 +400,32 @@ func (ds *BackendDS) Start() { }() } -func (b *BackendDS) enqueueTraceInfo(traceInfo *TraceInfo) { - b.traceEventMu.Lock() - defer b.traceEventMu.Unlock() - b.traceEventQueue.PushBack(traceInfo) -} +// func (b *BackendDS) enqueueTraceInfo(traceInfo *TraceInfo) { +// b.traceEventMu.Lock() +// defer b.traceEventMu.Unlock() +// b.traceEventQueue.PushBack(traceInfo) +// } -func (b *BackendDS) dequeueTraceEvents(batchSize uint64) []*TraceInfo { - b.traceEventMu.Lock() - defer b.traceEventMu.Unlock() +// func (b *BackendDS) dequeueTraceEvents(batchSize uint64) []*TraceInfo { +// b.traceEventMu.Lock() +// defer b.traceEventMu.Unlock() - batch := make([]*TraceInfo, 0, batchSize) +// batch := make([]*TraceInfo, 0, batchSize) - for i := 0; i < int(batchSize); i++ { - if b.traceEventQueue.Len() == 0 { - return batch - } +// for i := 0; i < int(batchSize); i++ { +// if b.traceEventQueue.Len() == 0 { +// return batch +// } - elem := b.traceEventQueue.Front() - b.traceEventQueue.Remove(elem) - tInfo, _ := elem.Value.(*TraceInfo) +// elem := b.traceEventQueue.Front() +// b.traceEventQueue.Remove(elem) +// tInfo, _ := elem.Value.(*TraceInfo) - batch = append(batch, tInfo) - } +// batch = append(batch, tInfo) +// } - return batch -} +// return batch +// } func (b *BackendDS) DoRequest(req *http.Request) error { req.Header.Set("Content-Type", "application/json") @@ -489,17 +487,18 @@ func convertConnsToPayload(batch []*ConnInfo) ConnInfoPayload { } } -func convertTraceEventsToPayload(batch []*TraceInfo) TracePayload { - return TracePayload{ - Metadata: Metadata{ - MonitoringID: MonitoringID, - IdempotencyKey: string(uuid.NewUUID()), - NodeID: NodeID, - AlazVersion: tag, - }, - Traces: batch, - } -} +// dist tracing disabled +// func convertTraceEventsToPayload(batch []*TraceInfo) TracePayload { +// return TracePayload{ +// Metadata: Metadata{ +// MonitoringID: MonitoringID, +// IdempotencyKey: string(uuid.NewUUID()), +// NodeID: NodeID, +// AlazVersion: tag, +// }, +// Traces: batch, +// } +// } func (b *BackendDS) sendMetricsToBackend(r io.Reader) { req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/metrics/scrape/?instance=%s&monitoring_id=%s", b.host, NodeID, MonitoringID), r) @@ -556,37 +555,38 @@ func (b *BackendDS) sendToBackend(method string, payload interface{}, endpoint s } } -func (b *BackendDS) sendTraceEventsInBatch(batchSize uint64) { - t := time.NewTicker(1 * time.Second) - defer t.Stop() +// dist tracing disabled by default temporarily +// func (b *BackendDS) sendTraceEventsInBatch(batchSize uint64) { +// t := time.NewTicker(1 * time.Second) +// defer t.Stop() - send := func() { - batch := b.dequeueTraceEvents(batchSize) +// send := func() { +// batch := b.dequeueTraceEvents(batchSize) - if len(batch) == 0 { - return - } +// if len(batch) == 0 { +// return +// } - tracePayload := convertTraceEventsToPayload(batch) - go b.sendToBackend(http.MethodPost, tracePayload, traceEventEndpoint) +// tracePayload := convertTraceEventsToPayload(batch) +// go b.sendToBackend(http.MethodPost, tracePayload, traceEventEndpoint) - // return reqInfoss to the pool - for _, trace := range batch { - b.traceInfoPool.Put(trace) - } - } +// // return reqInfoss to the pool +// for _, trace := range batch { +// b.traceInfoPool.Put(trace) +// } +// } - for { - select { - case <-b.ctx.Done(): - log.Logger.Info().Msg("stopping sending trace events to backend") - return - case <-t.C: - send() - } - } +// for { +// select { +// case <-b.ctx.Done(): +// log.Logger.Info().Msg("stopping sending trace events to backend") +// return +// case <-t.C: +// send() +// } +// } -} +// } func (b *BackendDS) sendReqsInBatch(batchSize uint64) { t := time.NewTicker(5 * time.Second) @@ -780,13 +780,13 @@ func newAliveConnPool(factory func() *ConnInfo, close func(*ConnInfo)) *poolutil } } -func newTraceInfoPool(factory func() *TraceInfo, close func(*TraceInfo)) *poolutil.Pool[*TraceInfo] { - return &poolutil.Pool[*TraceInfo]{ - Items: make(chan *TraceInfo, 50000), - Factory: factory, - Close: close, - } -} +// func newTraceInfoPool(factory func() *TraceInfo, close func(*TraceInfo)) *poolutil.Pool[*TraceInfo] { +// return &poolutil.Pool[*TraceInfo]{ +// Items: make(chan *TraceInfo, 50000), +// Factory: factory, +// Close: close, +// } +// } func newKafkaEventPool(factory func() *KafkaEventInfo, close func(*KafkaEventInfo)) *poolutil.Pool[*KafkaEventInfo] { return &poolutil.Pool[*KafkaEventInfo]{ @@ -837,8 +837,9 @@ func (b *BackendDS) PersistRequest(request *Request) error { reqInfo[13] = request.Method reqInfo[14] = request.Path reqInfo[15] = request.Tls - reqInfo[16] = request.Seq - reqInfo[17] = request.Tid + // dist tracing disabled + // reqInfo[16] = request.Seq + // reqInfo[17] = request.Tid b.reqChanBuffer <- reqInfo @@ -866,35 +867,37 @@ func (b *BackendDS) PersistKafkaEvent(ke *KafkaEvent) error { kafkaInfo[13] = ke.Value kafkaInfo[14] = ke.Type kafkaInfo[15] = ke.Tls - kafkaInfo[16] = ke.Seq - kafkaInfo[17] = ke.Tid + // dist tracing disabled + // kafkaInfo[16] = ke.Seq + // kafkaInfo[17] = ke.Tid b.kafkaChanBuffer <- kafkaInfo return nil } -func (b *BackendDS) PersistTraceEvent(trace *l7_req.TraceEvent) error { - if trace == nil { - return fmt.Errorf("trace event is nil") - } +// dist tracing disabled by default temporarily +// func (b *BackendDS) PersistTraceEvent(trace *l7_req.TraceEvent) error { +// if trace == nil { +// return fmt.Errorf("trace event is nil") +// } - t := b.traceInfoPool.Get() +// t := b.traceInfoPool.Get() - t[0] = trace.Tx - t[1] = trace.Seq - t[2] = trace.Tid +// t[0] = trace.Tx +// t[1] = trace.Seq +// t[2] = trace.Tid - ingress := false // EGRESS - if trace.Type_ == 0 { // INGRESS - ingress = true - } +// ingress := false // EGRESS +// if trace.Type_ == 0 { // INGRESS +// ingress = true +// } - t[3] = ingress +// t[3] = ingress - b.enqueueTraceInfo(t) - return nil -} +// b.enqueueTraceInfo(t) +// return nil +// } func (b *BackendDS) PersistPod(pod Pod, eventType string) error { podEvent := convertPodToPodEvent(pod, eventType) diff --git a/datastore/datastore.go b/datastore/datastore.go index 5de1829..9881ab8 100644 --- a/datastore/datastore.go +++ b/datastore/datastore.go @@ -1,9 +1,5 @@ package datastore -import ( - "github.com/ddosify/alaz/ebpf/l7_req" -) - type DataStore interface { PersistPod(pod Pod, eventType string) error PersistService(service Service, eventType string) error @@ -18,7 +14,7 @@ type DataStore interface { PersistKafkaEvent(request *KafkaEvent) error - PersistTraceEvent(trace *l7_req.TraceEvent) error + // PersistTraceEvent(trace *l7_req.TraceEvent) error PersistAliveConnection(trace *AliveConnection) error } diff --git a/datastore/dto.go b/datastore/dto.go index d03a1a7..75dd8c7 100644 --- a/datastore/dto.go +++ b/datastore/dto.go @@ -136,8 +136,9 @@ type KafkaEvent struct { Value string Type string // PUBLISH or CONSUME Tls bool - Tid uint32 - Seq uint32 + // dist tracing disabled by default temporarily + // Tid uint32 + // Seq uint32 } func (ke *KafkaEvent) SetFromUID(uid string) { @@ -191,8 +192,9 @@ type Request struct { FailReason string Method string Path string - Tid uint32 - Seq uint32 + // dist tracing disabled by default temporarily + // Tid uint32 + // Seq uint32 } func (r *Request) SetFromUID(uid string) { diff --git a/datastore/payload.go b/datastore/payload.go index 85c351f..9c37cd8 100644 --- a/datastore/payload.go +++ b/datastore/payload.go @@ -122,9 +122,11 @@ type ContainerEvent struct { // 13) Method // 14) Path // 15) Encrypted (bool) +type ReqInfo [16]interface{} + +// dist tracing disabled // 16) Seq // 17) Tid -type ReqInfo [18]interface{} type RequestsPayload struct { Metadata Metadata `json:"metadata"` @@ -174,9 +176,11 @@ type TracePayload struct { // 13) Value // 14) Type // 15) Encrypted (bool) +type KafkaEventInfo [16]interface{} + +// dist tracing disabled // 16) Seq // 17) Tid -type KafkaEventInfo [18]interface{} type KafkaEventInfoPayload struct { Metadata Metadata `json:"metadata"` diff --git a/ebpf/c/bpf.c b/ebpf/c/bpf.c index 5973e7b..6d4de53 100644 --- a/ebpf/c/bpf.c +++ b/ebpf/c/bpf.c @@ -16,6 +16,7 @@ #endif #define FILTER_OUT_NON_CONTAINER +// #define DIST_TRACING_ENABLED // disabled by default #include #include "../headers/pt_regs.h" diff --git a/ebpf/c/bpf_bpfeb.go b/ebpf/c/bpf_bpfeb.go index 312dc70..73789a8 100644 --- a/ebpf/c/bpf_bpfeb.go +++ b/ebpf/c/bpf_bpfeb.go @@ -48,8 +48,6 @@ type bpfL7Event struct { Failed uint8 IsTls uint8 _ [1]byte - Seq uint32 - Tid uint32 KafkaApiVersion int16 _ [2]byte PrepStatementId uint32 @@ -71,8 +69,6 @@ type bpfL7Request struct { PayloadReadComplete uint8 RequestType uint8 _ [2]byte - Seq uint32 - Tid uint32 CorrelationId int32 ApiKey int16 ApiVersion int16 diff --git a/ebpf/c/bpf_bpfeb.o b/ebpf/c/bpf_bpfeb.o index 7689dc9..3269dc4 100644 Binary files a/ebpf/c/bpf_bpfeb.o and b/ebpf/c/bpf_bpfeb.o differ diff --git a/ebpf/c/bpf_bpfel.go b/ebpf/c/bpf_bpfel.go index 33f49fd..3bde8c6 100644 --- a/ebpf/c/bpf_bpfel.go +++ b/ebpf/c/bpf_bpfel.go @@ -48,8 +48,6 @@ type bpfL7Event struct { Failed uint8 IsTls uint8 _ [1]byte - Seq uint32 - Tid uint32 KafkaApiVersion int16 _ [2]byte PrepStatementId uint32 @@ -71,8 +69,6 @@ type bpfL7Request struct { PayloadReadComplete uint8 RequestType uint8 _ [2]byte - Seq uint32 - Tid uint32 CorrelationId int32 ApiKey int16 ApiVersion int16 diff --git a/ebpf/c/bpf_bpfel.o b/ebpf/c/bpf_bpfel.o index a10e643..edb47f6 100644 Binary files a/ebpf/c/bpf_bpfel.o and b/ebpf/c/bpf_bpfel.o differ diff --git a/ebpf/c/l7.c b/ebpf/c/l7.c index 3fdbabe..2097045 100644 --- a/ebpf/c/l7.c +++ b/ebpf/c/l7.c @@ -30,8 +30,10 @@ struct l7_event { __u8 failed; __u8 is_tls; + #ifdef DIST_TRACING_ENABLED __u32 seq; // tcp sequence number __u32 tid; // thread id + #endif __s16 kafka_api_version; // used only for kafka __u32 prep_statement_id; // used only for mysql @@ -51,8 +53,12 @@ struct l7_request { __u32 payload_size; __u8 payload_read_complete; __u8 request_type; + + #ifdef DIST_TRACING_ENABLED __u32 seq; __u32 tid; + #endif + __s32 correlation_id; // used only for kafka __s16 api_key; // used only for kafka __s16 api_version; // used only for kafka @@ -389,12 +395,12 @@ int process_enter_of_syscalls_write_sendto(void* ctx, __u64 fd, __u8 is_tls, cha req->payload_read_complete = 1; } + #ifdef DIST_TRACING_ENABLED __u32 tid = id & 0xFFFFFFFF; __u32 seq = process_for_dist_trace_write(ctx,fd); - - // for distributed tracing req->seq = seq; req->tid = tid; + #endif struct sock* sk = get_sock(fd); @@ -452,7 +458,9 @@ int process_enter_of_syscalls_read_recvfrom(void *ctx, struct read_enter_args * // } // for distributed tracing + #ifdef DIST_TRACING_ENABLED process_for_dist_trace_read(ctx,params->fd); + #endif struct read_args args = {}; @@ -541,8 +549,10 @@ int process_exit_of_syscalls_write_sendto(void* ctx, __s64 ret){ bpf_map_delete_elem(&active_writes, &id); // for distributed tracing + #ifdef DIST_TRACING_ENABLED e->seq = active_req->seq; e->tid = active_req->tid; + #endif e->saddr = active_req->saddr; @@ -641,9 +651,10 @@ int process_exit_of_syscalls_read_recvfrom(void* ctx, __u64 id, __u32 pid, __s64 e->fd = k.fd; e->pid = k.pid; - // for distributed tracing + #ifdef DIST_TRACING_ENABLED e->seq = 0; // default value e->tid = bpf_get_current_pid_tgid() & 0xFFFFFFFF; + #endif // reset payload for (int i = 0; i < MAX_PAYLOAD_SIZE; i++) { @@ -737,9 +748,10 @@ int process_exit_of_syscalls_read_recvfrom(void* ctx, __u64 id, __u32 pid, __s64 e->fd = k.fd; e->pid = k.pid; - // for distributed tracing + #ifdef DIST_TRACING_ENABLED e->seq = 0; // default value e->tid = bpf_get_current_pid_tgid() & 0xFFFFFFFF; + #endif struct sock* sk = get_sock(read_info->fd); @@ -783,10 +795,10 @@ int process_exit_of_syscalls_read_recvfrom(void* ctx, __u64 id, __u32 pid, __s64 e->failed = 0; // success - // for distributed tracing + #ifdef DIST_TRACING_ENABLED e->seq = active_req->seq; e->tid = active_req->tid; - + #endif e->saddr = active_req->saddr; e->sport = active_req->sport; @@ -1314,8 +1326,9 @@ int process_enter_of_go_conn_write(void *ctx, __u32 pid, __u32 fd, char *buf_ptr log_to_userspace(ctx, WARN, func_name, log_msg, r, e->fd, e->payload_size); } - // TODO: we will add tracing for http2 requests + #ifdef DIST_TRACING_ENABLED process_for_dist_trace_write(ctx,fd); + #endif return 0; }else{ req->protocol = PROTOCOL_UNKNOWN; @@ -1336,9 +1349,10 @@ int process_enter_of_go_conn_write(void *ctx, __u32 pid, __u32 fd, char *buf_ptr } + #ifdef DIST_TRACING_ENABLED req->seq = process_for_dist_trace_write(ctx,fd); - __u32 tid = bpf_get_current_pid_tgid() & 0xFFFFFFFF; - req->tid = tid; + req->tid = bpf_get_current_pid_tgid() & 0xFFFFFFFF; + #endif struct sock* sk = get_sock(fd); @@ -1419,8 +1433,9 @@ int BPF_UPROBE(go_tls_conn_read_enter) { return 1; } - // for distributed tracing + #ifdef DIST_TRACING_ENABLED process_for_dist_trace_read(ctx,fd); + #endif // X1(arm64) register contains the byte ptr, pointing to first byte of the slice char *buf_ptr = (char*)GO_PARAM2(ctx); diff --git a/ebpf/l7_req/l7.go b/ebpf/l7_req/l7.go index 7e0ba90..c1b66a0 100644 --- a/ebpf/l7_req/l7.go +++ b/ebpf/l7_req/l7.go @@ -353,8 +353,6 @@ type bpfL7Event struct { Failed uint8 IsTls uint8 _ [1]byte - Seq uint32 - Tid uint32 KafkaApiVersion int16 _ [2]byte PrepStatementId uint32 // for mysql @@ -366,28 +364,29 @@ type bpfL7Event struct { _ [6]byte } -type bpfTraceEvent struct { - Pid uint32 - Tid uint32 - Tx uint64 - Type_ uint8 - _ [3]byte - Seq uint32 -} - -type TraceEvent struct { - Pid uint32 - Tid uint32 - Tx int64 - Type_ uint8 - Seq uint32 -} - -const TRACE_EVENT = "trace_event" - -func (e TraceEvent) Type() string { - return TRACE_EVENT -} +// dist tracing disabled by default temporarily +// type bpfTraceEvent struct { +// Pid uint32 +// Tid uint32 +// Tx uint64 +// Type_ uint8 +// _ [3]byte +// Seq uint32 +// } + +// type TraceEvent struct { +// Pid uint32 +// Tid uint32 +// Tx int64 +// Type_ uint8 +// Seq uint32 +// } + +// const TRACE_EVENT = "trace_event" + +// func (e TraceEvent) Type() string { +// return TRACE_EVENT +// } // for user space type L7Event struct { @@ -439,13 +438,15 @@ type L7Prog struct { // links represent a program attached to a hook links map[string]link.Link // key : hook name - l7Events *perf.Reader - logs *perf.Reader - traffic *perf.Reader // ingress-egress calls - + l7Events *perf.Reader l7EventsMapSize uint32 - trafficMapSize uint32 - logsMapsSize uint32 + + logs *perf.Reader + logsMapsSize uint32 + + // dist tracing disabled by default temporarily + // traffic *perf.Reader // ingress-egress calls + // trafficMapSize uint32 } func InitL7Prog(conf *L7ProgConfig) *L7Prog { @@ -456,8 +457,8 @@ func InitL7Prog(conf *L7ProgConfig) *L7Prog { return &L7Prog{ links: map[string]link.Link{}, l7EventsMapSize: conf.L7EventsBpfMapSize, - trafficMapSize: conf.TrafficBpfMapSize, - logsMapsSize: conf.LogsBpfMapSize, + // trafficMapSize: conf.TrafficBpfMapSize, + logsMapsSize: conf.LogsBpfMapSize, } } @@ -548,14 +549,15 @@ func (l7p *L7Prog) InitMaps() { log.Logger.Fatal().Err(err).Msg("error creating perf event array reader") } - l7p.traffic, err = perf.NewReaderWithOptions(c.BpfObjs.IngressEgressCalls, int(l7p.trafficMapSize)*os.Getpagesize(), - perf.ReaderOptions{ - Watermark: 4 * os.Getpagesize(), - Overwritable: false, - }) - if err != nil { - log.Logger.Fatal().Err(err).Msg("error creating perf reader") - } + // dist tracing disabled by default temporarily + // l7p.traffic, err = perf.NewReaderWithOptions(c.BpfObjs.IngressEgressCalls, int(l7p.trafficMapSize)*os.Getpagesize(), + // perf.ReaderOptions{ + // Watermark: 4 * os.Getpagesize(), + // Overwritable: false, + // }) + // if err != nil { + // log.Logger.Fatal().Err(err).Msg("error creating perf reader") + // } } // returns when program is detached @@ -742,14 +744,15 @@ func (l7p *L7Prog) Consume(ctx context.Context, ch chan interface{}) { PayloadReadComplete: uint8ToBool(l7Event.PayloadReadComplete), Failed: uint8ToBool(l7Event.Failed), WriteTimeNs: l7Event.WriteTimeNs, - Tid: l7Event.Tid, - Seq: l7Event.Seq, KafkaApiVersion: l7Event.KafkaApiVersion, MySqlPrepStmtId: l7Event.PrepStatementId, Saddr: l7Event.Saddr, Sport: l7Event.Sport, Daddr: l7Event.Daddr, Dport: l7Event.Dport, + // dist tracing disabled by default temporarily + // Tid: l7Event.Tid, + // Seq: l7Event.Seq, } go func(l7Event *L7Event) { @@ -770,60 +773,61 @@ func (l7p *L7Prog) Consume(ctx context.Context, ch chan interface{}) { } }() - go func() { - var record perf.Record - droppedCount := 0 - - go func() { - t := time.NewTicker(1 * time.Minute) - for range t.C { - log.Logger.Debug().Int("count", droppedCount).Msg("dropped trace events") - } - }() - - read := func() { - err := l7p.traffic.ReadInto(&record) - if err != nil { - log.Logger.Warn().Err(err).Msg("error reading from dist-trace calls") - } - - if record.LostSamples != 0 { - log.Logger.Warn().Msgf("lost samples dist-trace %d", record.LostSamples) - } - - if record.RawSample == nil || len(record.RawSample) == 0 { - log.Logger.Warn().Msgf("read sample dist-trace nil or empty") - return - } - - bpfTraceEvent := (*bpfTraceEvent)(unsafe.Pointer(&record.RawSample[0])) - - traceEvent := &TraceEvent{ - Pid: bpfTraceEvent.Pid, - Tid: bpfTraceEvent.Tid, - Tx: time.Now().UnixMilli(), - Type_: bpfTraceEvent.Type_, - Seq: bpfTraceEvent.Seq, - } - - go func(traceEvent *TraceEvent) { - select { - case ch <- traceEvent: - default: - droppedCount++ - } - }(traceEvent) - - } - for { - select { - case <-stop: - return - default: - read() - } - } - }() + // dist tracing disabled by default temporarily + // go func() { + // var record perf.Record + // droppedCount := 0 + + // go func() { + // t := time.NewTicker(1 * time.Minute) + // for range t.C { + // log.Logger.Debug().Int("count", droppedCount).Msg("dropped trace events") + // } + // }() + + // read := func() { + // err := l7p.traffic.ReadInto(&record) + // if err != nil { + // log.Logger.Warn().Err(err).Msg("error reading from dist-trace calls") + // } + + // if record.LostSamples != 0 { + // log.Logger.Warn().Msgf("lost samples dist-trace %d", record.LostSamples) + // } + + // if record.RawSample == nil || len(record.RawSample) == 0 { + // log.Logger.Warn().Msgf("read sample dist-trace nil or empty") + // return + // } + + // bpfTraceEvent := (*bpfTraceEvent)(unsafe.Pointer(&record.RawSample[0])) + + // traceEvent := &TraceEvent{ + // Pid: bpfTraceEvent.Pid, + // Tid: bpfTraceEvent.Tid, + // Tx: time.Now().UnixMilli(), + // Type_: bpfTraceEvent.Type_, + // Seq: bpfTraceEvent.Seq, + // } + + // go func(traceEvent *TraceEvent) { + // select { + // case ch <- traceEvent: + // default: + // droppedCount++ + // } + // }(traceEvent) + + // } + // for { + // select { + // case <-stop: + // return + // default: + // read() + // } + // } + // }() <-ctx.Done() // wait for context to be cancelled close(stop) diff --git a/main.go b/main.go index 61358d1..dc318fb 100644 --- a/main.go +++ b/main.go @@ -70,13 +70,6 @@ func main() { } tracingEnabled, err := strconv.ParseBool(os.Getenv("TRACING_ENABLED")) - if err != nil { - // for backwards compatibility - ebpfEnabled, _ := strconv.ParseBool(os.Getenv("SERVICE_MAP_ENABLED")) - distTracingEnabled, _ := strconv.ParseBool(os.Getenv("DIST_TRACING_ENABLED")) - tracingEnabled = ebpfEnabled || distTracingEnabled - } - metricsEnabled, _ := strconv.ParseBool(os.Getenv("METRICS_ENABLED")) logsEnabled, _ := strconv.ParseBool(os.Getenv("LOGS_ENABLED")) @@ -105,7 +98,7 @@ func main() { a := aggregator.NewAggregator(ctx, ct, kubeEvents, ec.EbpfEvents(), ec.EbpfProcEvents(), ec.EbpfTcpEvents(), ec.TlsAttachQueue(), dsBackend) a.Run() - a.AdvertiseDebugData() + // a.AdvertiseDebugData() ec.Init() go ec.ListenEvents()