From 98b7863af1a1dbfbfb3188e9d5e67369f04d6ebd Mon Sep 17 00:00:00 2001 From: kenanfarukcakir Date: Tue, 20 Feb 2024 21:30:57 +0300 Subject: [PATCH 1/2] filter localhost tcp close events --- aggregator/data.go | 54 ++++++++++++++++++++++++++++------------------ 1 file changed, 33 insertions(+), 21 deletions(-) diff --git a/aggregator/data.go b/aggregator/data.go index b52c31e..09a8b21 100644 --- a/aggregator/data.go +++ b/aggregator/data.go @@ -133,10 +133,10 @@ type ClusterInfo struct { var ( // default exponential backoff (*2) - // when retryLimit is increased, we are blocking the events that we wait it to be processed more - retryInterval = 50 * time.Millisecond - retryLimit = 2 - // 400 + 800 + 1600 + 3200 + 6400 = 12400 ms + // when attemptLimit is increased, we are blocking the events that we wait it to be processed more + retryInterval = 20 * time.Millisecond + attemptLimit = 3 // total attempt + // 1st try - 20ms - 2nd try - 40ms - 3rd try defaultExpiration = 5 * time.Minute purgeTime = 10 * time.Minute @@ -368,7 +368,6 @@ func (a *Aggregator) processEbpfTcp(ctx context.Context) { a.processTcpConnect(d) } } - } } @@ -460,6 +459,7 @@ func (a *Aggregator) signalTlsAttachment(pid uint32) { func (a *Aggregator) processTcpConnect(d *tcp_state.TcpConnectEvent) { go a.signalTlsAttachment(d.Pid) if d.Type_ == tcp_state.EVENT_TCP_ESTABLISHED { + // filter out localhost connections if d.SAddr == "127.0.0.1" || d.DAddr == "127.0.0.1" { return @@ -500,6 +500,11 @@ func (a *Aggregator) processTcpConnect(d *tcp_state.TcpConnectEvent) { var sockMap *SocketMap var ok bool + // filter out localhost connections + if d.SAddr == "127.0.0.1" || d.DAddr == "127.0.0.1" { + return + } + sockMap = a.clusterInfo.SocketMaps[d.Pid] var skLine *SocketLine @@ -934,14 +939,27 @@ func (a *Aggregator) processL7(ctx context.Context, d *l7_req.L7Event) { return } + var path string + if d.Protocol == l7_req.L7_PROTOCOL_POSTGRES { + // parse sql command from payload + // path = sql command + // method = sql message type + var err error + path, err = a.parseSqlCommand(d) + if err != nil { + log.Logger.Error().AnErr("err", err) + return + } + } + skInfo := a.findRelatedSocket(ctx, d) if skInfo == nil { + log.Logger.Debug().Uint32("pid", d.Pid). + Uint64("fd", d.Fd).Uint64("writeTime", d.WriteTimeNs). + Str("protocol", d.Protocol).Any("payload", string(d.Payload[:])).Msg("socket not found") return } - // Since we process events concurrently - // TCP events and L7 events can be processed out of order - reqDto := datastore.Request{ StartTime: d.EventReadTime, Latency: d.Duration, @@ -957,21 +975,13 @@ func (a *Aggregator) processL7(ctx context.Context, d *l7_req.L7Event) { Seq: d.Seq, } - if d.Protocol == l7_req.L7_PROTOCOL_POSTGRES { - // parse sql command from payload - // path = sql command - // method = sql message type - var err error - reqDto.Path, err = a.parseSqlCommand(d) - if err != nil { - log.Logger.Error().AnErr("err", err) - return - } - } + // Since we process events concurrently + // TCP events and L7 events can be processed out of order + var reqHostHeader string // parse http payload, extract path, query params, headers if d.Protocol == l7_req.L7_PROTOCOL_HTTP { - _, reqDto.Path, _, reqHostHeader = parseHttpPayload(string(d.Payload[0:d.PayloadSize])) + _, path, _, reqHostHeader = parseHttpPayload(string(d.Payload[0:d.PayloadSize])) } err := a.setFromTo(skInfo, d, &reqDto, reqHostHeader) @@ -979,6 +989,7 @@ func (a *Aggregator) processL7(ctx context.Context, d *l7_req.L7Event) { return } + reqDto.Path = path reqDto.Completed = !d.Failed // In AMQP-DELIVER event, we are capturing from read syscall, @@ -1122,7 +1133,7 @@ func (a *Aggregator) getAlreadyExistingSockets(pid uint32) { } func (a *Aggregator) fetchSkInfo(ctx context.Context, skLine *SocketLine, d *l7_req.L7Event) *SockInfo { - rc := retryLimit + rc := attemptLimit rt := retryInterval var skInfo *SockInfo var err error @@ -1132,6 +1143,7 @@ func (a *Aggregator) fetchSkInfo(ctx context.Context, skLine *SocketLine, d *l7_ if err == nil && skInfo != nil { break } + log.Logger.Debug().Err(err).Uint32("pid", d.Pid).Uint64("fd", d.Fd).Uint64("writeTime", d.WriteTimeNs).Msg("retry to get skInfo...") rc-- if rc == 0 { break From d3dd7a4c8d06f3d93fa3dc43a8bc1b21f35f289a Mon Sep 17 00:00:00 2001 From: kenanfarukcakir Date: Tue, 20 Feb 2024 21:31:18 +0300 Subject: [PATCH 2/2] change log level --- ebpf/collector.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/ebpf/collector.go b/ebpf/collector.go index 3f8c7d5..b9223bf 100644 --- a/ebpf/collector.go +++ b/ebpf/collector.go @@ -9,6 +9,7 @@ import ( "io" "io/fs" "os" + "strings" "sync" "time" "unsafe" @@ -186,10 +187,10 @@ func (e *EbpfCollector) AttachUprobesForEncrypted() { e.tlsPidMap[pid] = struct{}{} e.mu.Unlock() - go func() { + go func(pid uint32) { // attach to libssl uprobes if process is using libssl errs := e.AttachSslUprobesOnProcess("/proc", pid) - if errs != nil && len(errs) > 0 { + if len(errs) > 0 { for _, err := range errs { if errorspkg.Is(err, fs.ErrNotExist) { // no such file or directory error @@ -203,7 +204,7 @@ func (e *EbpfCollector) AttachUprobesForEncrypted() { } go_errs := e.AttachGoTlsUprobesOnProcess("/proc", pid) - if go_errs != nil && len(go_errs) > 0 { + if len(go_errs) > 0 { for _, err := range go_errs { if errorspkg.Is(err, fs.ErrNotExist) { // no such file or directory error @@ -211,12 +212,12 @@ func (e *EbpfCollector) AttachUprobesForEncrypted() { // it's probably a kernel thread, or a very short lived process continue } - log.Logger.Error().Err(err).Uint32("pid", pid). + log.Logger.Error().Err(err). Msgf("error attaching go tls for pid: %d", pid) } } - }() + }(pid) } } @@ -261,7 +262,10 @@ func (e *EbpfCollector) AttachGoTlsUprobesOnProcess(procfs string, pid uint32) [ // read build info of a go executable bi, err := buildinfo.ReadFile(path) if err != nil { - // TODO: check if error is "not a Go executable" + if strings.HasSuffix(err.Error(), "not a Go executable") || strings.Contains(err.Error(), "no such file or directory") { + log.Logger.Debug().Str("reason", "gotls").Uint32("pid", pid).Msg("not a Go executable") + return errors + } log.Logger.Debug().Err(err).Uint32("pid", pid).Msg("error reading build info") errors = append(errors, err) return errors