diff --git a/aggregator/data.go b/aggregator/data.go index 371298c..2406a21 100644 --- a/aggregator/data.go +++ b/aggregator/data.go @@ -236,6 +236,7 @@ func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{}, } go a.clearSocketLines(ctx) + go a.updateSocketMap(ctx) return a } @@ -689,7 +690,7 @@ func (a *Aggregator) processHttp2Frames() { } req.Latency = d.WriteTimeNs - req.Latency - req.StartTime = d.EventReadTime + req.StartTime = int64(convertKernelTimeToUserspaceTime(d.WriteTimeNs) / 1e6) // nano to milli req.Completed = true req.FromIP = skInfo.Saddr req.ToIP = skInfo.Daddr @@ -1019,11 +1020,13 @@ func (a *Aggregator) processL7(ctx context.Context, d *l7_req.L7Event) { log.Logger.Debug().Uint32("pid", d.Pid). Uint64("fd", d.Fd).Uint64("writeTime", d.WriteTimeNs). Str("protocol", d.Protocol).Any("payload", string(d.Payload[:d.PayloadSize])).Msg("socket not found") - return + + // go check pid-fd for the socket + a.fetchSocketOnNotFound(ctx, d) } reqDto := datastore.Request{ - StartTime: d.EventReadTime, + StartTime: int64(convertKernelTimeToUserspaceTime(d.WriteTimeNs) / 1e6), Latency: d.Duration, FromIP: skInfo.Saddr, ToIP: skInfo.Daddr, @@ -1245,6 +1248,73 @@ func (a *Aggregator) fetchSocketMap(pid uint32) *SocketMap { return sockMap } +// This is a mitigation for the case a tcp event is missed +func (a *Aggregator) updateSocketMap(ctx context.Context) { + ticker := time.NewTicker(3 * time.Minute) + + f := func() { + a.liveProcessesMu.RLock() + defer a.liveProcessesMu.RUnlock() + for pid := range a.liveProcesses { + sockMap := a.clusterInfo.SocketMaps[pid] + if sockMap.mu == nil { + continue + } + + sockMap.mu.Lock() + for _, skLine := range sockMap.M { + skLine.getConnectionInfo() + } + sockMap.mu.Unlock() + } + } + + for { + select { + case <-ticker.C: + f() + case <-ctx.Done(): + return + } + } +} + +func (a *Aggregator) fetchSocketOnNotFound(ctx context.Context, d *l7_req.L7Event) bool { + a.liveProcessesMu.Lock() + + a.liveProcesses[d.Pid] = struct{}{} + sockMap := a.clusterInfo.SocketMaps[d.Pid] + // pid does not exists + // acquire sockMap lock + + // in case of reference to mu is nil, pid exec event did not come yet + // create a new mutex for the pid + // to avoid race around the mutex, we need to lock the liveProcessesMu + if sockMap.mu == nil { + log.Logger.Debug().Uint32("pid", d.Pid).Uint64("fd", d.Fd).Msg("fetchSocketOnNotFound: pid not found") + + a.muIndex.Add(1) + a.muArray[(a.muIndex.Load())%uint64(len(a.muArray))] = &sync.RWMutex{} + a.clusterInfo.SocketMaps[d.Pid].mu = a.muArray[(a.muIndex.Load())%uint64(len(a.muArray))] + } + a.liveProcessesMu.Unlock() + + // creates sockMap.M + skInfo := a.findRelatedSocket(ctx, d) + if skInfo == nil { + // go try reading from kernel files + err := sockMap.M[d.Fd].getConnectionInfo() + if err != nil { + log.Logger.Debug().Uint32("pid", d.Pid).Uint64("fd", d.Fd).Err(err).Msg("fetchSocketOnNotFound: failed to get connection info") + return false + } else { + log.Logger.Debug().Uint32("pid", d.Pid).Uint64("fd", d.Fd).Msg("fetchSocketOnNotFound: connection info found") + return true + } + } + return true +} + func (a *Aggregator) findRelatedSocket(ctx context.Context, d *l7_req.L7Event) *SockInfo { sockMap := a.clusterInfo.SocketMaps[d.Pid] // acquire sockMap lock @@ -1261,7 +1331,7 @@ func (a *Aggregator) findRelatedSocket(ctx context.Context, d *l7_req.L7Event) * skLine, ok := sockMap.M[d.Fd] if !ok { - log.Logger.Debug().Uint32("pid", d.Pid).Uint64("fd", d.Fd).Msg("error finding skLine, go look for it") + log.Logger.Debug().Uint32("pid", d.Pid).Uint64("fd", d.Fd).Msg("create skLine...") // start new socket line, find already established connections skLine = NewSocketLine(d.Pid, d.Fd) sockMap.M[d.Fd] = skLine @@ -1475,3 +1545,12 @@ func getPidMax() (int, error) { } return pidMax, nil } + +func convertKernelTimeToUserspaceTime(writeTime uint64) uint64 { + // get first timestamp from kernel and corresponding userspace time + return l7_req.FirstUserspaceTime - (l7_req.FirstKernelTime - writeTime) +} + +func convertUserTimeToKernelTime(now uint64) uint64 { + return l7_req.FirstKernelTime - (l7_req.FirstUserspaceTime - now) +} diff --git a/aggregator/pg_test.go b/aggregator/pg_test.go index af7b121..7fe110e 100644 --- a/aggregator/pg_test.go +++ b/aggregator/pg_test.go @@ -31,7 +31,6 @@ func TestPostgresParseWithKnownStmt(t *testing.T) { WriteTimeNs: 0, Tid: 0, Seq: 0, - EventReadTime: 0, }) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -72,7 +71,6 @@ func TestPostgresParseWithKnownStmt(t *testing.T) { WriteTimeNs: 0, Tid: 0, Seq: 0, - EventReadTime: 0, }) if err != nil { @@ -109,7 +107,6 @@ func TestPostgresParseWithUnknownStmt(t *testing.T) { WriteTimeNs: 0, Tid: 0, Seq: 0, - EventReadTime: 0, }) if err != nil { diff --git a/aggregator/sock_num_line.go b/aggregator/sock_num_line.go index abc495d..6e91250 100644 --- a/aggregator/sock_num_line.go +++ b/aggregator/sock_num_line.go @@ -8,7 +8,9 @@ import ( "fmt" "net" "os" + "regexp" "sort" + "strconv" "strings" "sync" "time" @@ -45,6 +47,19 @@ func (nl *SocketLine) AddValue(timestamp uint64, sockInfo *SockInfo) { nl.mu.Lock() defer nl.mu.Unlock() + // ignore close events + if sockInfo == nil { + return + } + + // if last element is equal to the current element, ignore + if len(nl.Values) > 0 { + last := nl.Values[len(nl.Values)-1].SockInfo + if last != nil && last.Saddr == sockInfo.Saddr && last.Sport == sockInfo.Sport && last.Daddr == sockInfo.Daddr && last.Dport == sockInfo.Dport { + return + } + } + nl.Values = insertIntoSortedSlice(nl.Values, TimestampedSocket{Timestamp: timestamp, SockInfo: sockInfo}) } @@ -100,7 +115,6 @@ func (nl *SocketLine) DeleteUnused() { for i < len(nl.Values)-1 { if nl.Values[i].SockInfo != nil && nl.Values[i+1].SockInfo != nil { result = append(result, nl.Values[i+1]) - log.Logger.Debug().Msgf("deleting socket line %v", nl.Values[i]) i = i + 2 } else { result = append(result, nl.Values[i]) @@ -249,3 +263,107 @@ func insertIntoSortedSlice(sortedSlice []TimestampedSocket, newItem TimestampedS return sortedSlice } + +// reverse slice +func reverseSlice(s []string) []string { + for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 { + s[i], s[j] = s[j], s[i] + } + return s +} + +// convertHexToIP converts a hex string IP address to a human-readable IP address. +func convertHexToIP(hex string) string { + var ipParts []string + for i := 0; i < len(hex); i += 2 { + part, _ := strconv.ParseInt(hex[i:i+2], 16, 64) + ipParts = append(ipParts, fmt.Sprintf("%d", part)) + } + ipParts = reverseSlice(ipParts) + return strings.Join(ipParts, ".") +} + +// convertHexToPort converts a hex string port to a human-readable port. +func convertHexToPort(hex string) int { + port, _ := strconv.ParseInt(hex, 16, 64) + if port < 0 || port > 65535 { + return 0 + } + return int(port) +} + +func getInodeFromFD(pid, fd string) (string, error) { + fdPath := fmt.Sprintf("/proc/%s/fd/%s", pid, fd) + link, err := os.Readlink(fdPath) + if err != nil { + return "", err + } + + re := regexp.MustCompile(`socket:\[(\d+)\]`) + match := re.FindStringSubmatch(link) + if len(match) < 2 { + return "", fmt.Errorf("no inode found in link: %s", link) + } + + return match[1], nil +} + +func findTCPConnection(inode string, pid string) (string, error) { + tcpFile, err := os.Open(fmt.Sprintf("/proc/%s/net/tcp", pid)) + if err != nil { + return "", err + } + defer tcpFile.Close() + + scanner := bufio.NewScanner(tcpFile) + for scanner.Scan() { + line := scanner.Text() + if strings.Contains(line, inode) { + return line, nil + } + } + + return "", fmt.Errorf("no TCP connection found for inode %s", inode) +} + +func parseTcpLine(line string) (localIP string, localPort int, remoteIP string, remotePort int) { + fields := strings.Fields(line) + localAddress := fields[1] + remoteAddress := fields[2] + + localIP = convertHexToIP(localAddress[:8]) + localPort = convertHexToPort(localAddress[9:]) + remoteIP = convertHexToIP(remoteAddress[:8]) + remotePort = convertHexToPort(remoteAddress[9:]) + + return +} + +func (nl *SocketLine) getConnectionInfo() error { + inode, err := getInodeFromFD(fmt.Sprintf("%d", nl.pid), fmt.Sprintf("%d", nl.fd)) + if err != nil { + return err + } + + connectionInfo, err := findTCPConnection(inode, fmt.Sprintf("%d", nl.pid)) + if err != nil { + return err + } + + localIP, localPort, remoteIP, remotePort := parseTcpLine(connectionInfo) + + skInfo := &SockInfo{ + Pid: nl.pid, + Fd: nl.fd, + Saddr: localIP, + Sport: uint16(localPort), + Daddr: remoteIP, + Dport: uint16(remotePort), + } + + // add to socket line + // convert to bpf time + log.Logger.Debug().Msgf("Adding socket line read from user space %v", skInfo) + nl.AddValue(convertUserTimeToKernelTime(uint64(time.Now().UnixNano())), skInfo) + return nil +} diff --git a/ebpf/c/bpf_bpfeb.o b/ebpf/c/bpf_bpfeb.o index bd144ae..983b28c 100644 Binary files a/ebpf/c/bpf_bpfeb.o and b/ebpf/c/bpf_bpfeb.o differ diff --git a/ebpf/c/bpf_bpfel.o b/ebpf/c/bpf_bpfel.o index 34f78a0..42c345a 100644 Binary files a/ebpf/c/bpf_bpfel.o and b/ebpf/c/bpf_bpfel.o differ diff --git a/ebpf/c/tcp.c b/ebpf/c/tcp.c index 0d85ce5..25b7f0b 100644 --- a/ebpf/c/tcp.c +++ b/ebpf/c/tcp.c @@ -163,14 +163,15 @@ SEC("tracepoint/syscalls/sys_exit_connect") int sys_exit_connect(void *ctx) { __u64 id = bpf_get_current_pid_tgid(); - __u32 pid = id >> 32; - - __u8 *val = bpf_map_lookup_elem(&container_pids, &pid); - if (!val) - { - return 0; // not a container process, ignore - } + // __u32 pid = id >> 32; bpf_map_delete_elem(&fd_by_pid_tgid, &id); + + // __u8 *val = bpf_map_lookup_elem(&container_pids, &pid); + // if (!val) + // { + // return 0; // not a container process, ignore + // } + return 0; } diff --git a/ebpf/l7_req/l7.go b/ebpf/l7_req/l7.go index 7e4a71c..753b27d 100644 --- a/ebpf/l7_req/l7.go +++ b/ebpf/l7_req/l7.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "os" + "sync" "time" "unsafe" @@ -247,6 +248,9 @@ func (e RedisMethodConversion) String() string { } } +var FirstKernelTime uint64 = 0 // nanoseconds since boot +var FirstUserspaceTime uint64 = 0 + // $BPF_CLANG and $BPF_CFLAGS are set by the Makefile. // // go:generate go run github.com/cilium/ebpf/cmd/bpf2go -cc $BPF_CLANG -cflags $BPF_CFLAGS bpf l7.c -- -I../headers @@ -320,7 +324,6 @@ type L7Event struct { WriteTimeNs uint64 // start time of write syscall Tid uint32 Seq uint32 // tcp seq num - EventReadTime int64 } const L7_EVENT = "l7_event" @@ -567,6 +570,7 @@ func (l7p *L7Prog) Consume(ctx context.Context, ch chan interface{}) { } }() + readKernelTime := &sync.Once{} go func() { var record perf.Record droppedCount := 0 @@ -588,6 +592,12 @@ func (l7p *L7Prog) Consume(ctx context.Context, ch chan interface{}) { l7Event := (*bpfL7Event)(unsafe.Pointer(&record.RawSample[0])) + // runs once + readKernelTime.Do(func() { + FirstUserspaceTime = uint64(time.Now().UnixNano()) + FirstKernelTime = l7Event.WriteTimeNs + }) + protocol := L7ProtocolConversion(l7Event.Protocol).String() var method string switch protocol { @@ -624,7 +634,6 @@ func (l7p *L7Prog) Consume(ctx context.Context, ch chan interface{}) { WriteTimeNs: l7Event.WriteTimeNs, Tid: l7Event.Tid, Seq: l7Event.Seq, - EventReadTime: time.Now().UnixMilli(), } go func(l7Event *L7Event) { diff --git a/main_benchmark_test.go b/main_benchmark_test.go index ccbe46f..3857532 100644 --- a/main_benchmark_test.go +++ b/main_benchmark_test.go @@ -597,9 +597,8 @@ func (sim *Simulator) httpTraffic(ctx context.Context, t *Traffic) { WriteTimeNs: t.pod.OpenConnections[t.fd] + 10, // tracing purposes - Tid: 0, - Seq: 0, - EventReadTime: 0, + Tid: 0, + Seq: 0, } // select { // case