Skip to content

Commit

Permalink
Merge pull request #152 from getanteon/fix/socket-match
Browse files Browse the repository at this point in the history
refactor socket matching
  • Loading branch information
fatihbaltaci authored Jun 11, 2024
2 parents 30ddfd3 + 898bf3a commit 0b50fc1
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 20 deletions.
87 changes: 83 additions & 4 deletions aggregator/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{},
}

go a.clearSocketLines(ctx)
go a.updateSocketMap(ctx)
return a
}

Expand Down Expand Up @@ -689,7 +690,7 @@ func (a *Aggregator) processHttp2Frames() {
}

req.Latency = d.WriteTimeNs - req.Latency
req.StartTime = d.EventReadTime
req.StartTime = int64(convertKernelTimeToUserspaceTime(d.WriteTimeNs) / 1e6) // nano to milli
req.Completed = true
req.FromIP = skInfo.Saddr
req.ToIP = skInfo.Daddr
Expand Down Expand Up @@ -1019,11 +1020,13 @@ func (a *Aggregator) processL7(ctx context.Context, d *l7_req.L7Event) {
log.Logger.Debug().Uint32("pid", d.Pid).
Uint64("fd", d.Fd).Uint64("writeTime", d.WriteTimeNs).
Str("protocol", d.Protocol).Any("payload", string(d.Payload[:d.PayloadSize])).Msg("socket not found")
return

// go check pid-fd for the socket
a.fetchSocketOnNotFound(ctx, d)
}

reqDto := datastore.Request{
StartTime: d.EventReadTime,
StartTime: int64(convertKernelTimeToUserspaceTime(d.WriteTimeNs) / 1e6),
Latency: d.Duration,
FromIP: skInfo.Saddr,
ToIP: skInfo.Daddr,
Expand Down Expand Up @@ -1245,6 +1248,73 @@ func (a *Aggregator) fetchSocketMap(pid uint32) *SocketMap {
return sockMap
}

// This is a mitigation for the case a tcp event is missed
func (a *Aggregator) updateSocketMap(ctx context.Context) {
ticker := time.NewTicker(3 * time.Minute)

f := func() {
a.liveProcessesMu.RLock()
defer a.liveProcessesMu.RUnlock()
for pid := range a.liveProcesses {
sockMap := a.clusterInfo.SocketMaps[pid]
if sockMap.mu == nil {
continue
}

sockMap.mu.Lock()
for _, skLine := range sockMap.M {
skLine.getConnectionInfo()
}
sockMap.mu.Unlock()
}
}

for {
select {
case <-ticker.C:
f()
case <-ctx.Done():
return
}
}
}

func (a *Aggregator) fetchSocketOnNotFound(ctx context.Context, d *l7_req.L7Event) bool {
a.liveProcessesMu.Lock()

a.liveProcesses[d.Pid] = struct{}{}
sockMap := a.clusterInfo.SocketMaps[d.Pid]
// pid does not exists
// acquire sockMap lock

// in case of reference to mu is nil, pid exec event did not come yet
// create a new mutex for the pid
// to avoid race around the mutex, we need to lock the liveProcessesMu
if sockMap.mu == nil {
log.Logger.Debug().Uint32("pid", d.Pid).Uint64("fd", d.Fd).Msg("fetchSocketOnNotFound: pid not found")

a.muIndex.Add(1)
a.muArray[(a.muIndex.Load())%uint64(len(a.muArray))] = &sync.RWMutex{}
a.clusterInfo.SocketMaps[d.Pid].mu = a.muArray[(a.muIndex.Load())%uint64(len(a.muArray))]
}
a.liveProcessesMu.Unlock()

// creates sockMap.M
skInfo := a.findRelatedSocket(ctx, d)
if skInfo == nil {
// go try reading from kernel files
err := sockMap.M[d.Fd].getConnectionInfo()
if err != nil {
log.Logger.Debug().Uint32("pid", d.Pid).Uint64("fd", d.Fd).Err(err).Msg("fetchSocketOnNotFound: failed to get connection info")
return false
} else {
log.Logger.Debug().Uint32("pid", d.Pid).Uint64("fd", d.Fd).Msg("fetchSocketOnNotFound: connection info found")
return true
}
}
return true
}

func (a *Aggregator) findRelatedSocket(ctx context.Context, d *l7_req.L7Event) *SockInfo {
sockMap := a.clusterInfo.SocketMaps[d.Pid]
// acquire sockMap lock
Expand All @@ -1261,7 +1331,7 @@ func (a *Aggregator) findRelatedSocket(ctx context.Context, d *l7_req.L7Event) *

skLine, ok := sockMap.M[d.Fd]
if !ok {
log.Logger.Debug().Uint32("pid", d.Pid).Uint64("fd", d.Fd).Msg("error finding skLine, go look for it")
log.Logger.Debug().Uint32("pid", d.Pid).Uint64("fd", d.Fd).Msg("create skLine...")
// start new socket line, find already established connections
skLine = NewSocketLine(d.Pid, d.Fd)
sockMap.M[d.Fd] = skLine
Expand Down Expand Up @@ -1475,3 +1545,12 @@ func getPidMax() (int, error) {
}
return pidMax, nil
}

func convertKernelTimeToUserspaceTime(writeTime uint64) uint64 {
// get first timestamp from kernel and corresponding userspace time
return l7_req.FirstUserspaceTime - (l7_req.FirstKernelTime - writeTime)
}

func convertUserTimeToKernelTime(now uint64) uint64 {
return l7_req.FirstKernelTime - (l7_req.FirstUserspaceTime - now)
}
3 changes: 0 additions & 3 deletions aggregator/pg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ func TestPostgresParseWithKnownStmt(t *testing.T) {
WriteTimeNs: 0,
Tid: 0,
Seq: 0,
EventReadTime: 0,
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
Expand Down Expand Up @@ -72,7 +71,6 @@ func TestPostgresParseWithKnownStmt(t *testing.T) {
WriteTimeNs: 0,
Tid: 0,
Seq: 0,
EventReadTime: 0,
})

if err != nil {
Expand Down Expand Up @@ -109,7 +107,6 @@ func TestPostgresParseWithUnknownStmt(t *testing.T) {
WriteTimeNs: 0,
Tid: 0,
Seq: 0,
EventReadTime: 0,
})

if err != nil {
Expand Down
120 changes: 119 additions & 1 deletion aggregator/sock_num_line.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"fmt"
"net"
"os"
"regexp"
"sort"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -45,6 +47,19 @@ func (nl *SocketLine) AddValue(timestamp uint64, sockInfo *SockInfo) {
nl.mu.Lock()
defer nl.mu.Unlock()

// ignore close events
if sockInfo == nil {
return
}

// if last element is equal to the current element, ignore
if len(nl.Values) > 0 {
last := nl.Values[len(nl.Values)-1].SockInfo
if last != nil && last.Saddr == sockInfo.Saddr && last.Sport == sockInfo.Sport && last.Daddr == sockInfo.Daddr && last.Dport == sockInfo.Dport {
return
}
}

nl.Values = insertIntoSortedSlice(nl.Values, TimestampedSocket{Timestamp: timestamp, SockInfo: sockInfo})
}

Expand Down Expand Up @@ -100,7 +115,6 @@ func (nl *SocketLine) DeleteUnused() {
for i < len(nl.Values)-1 {
if nl.Values[i].SockInfo != nil && nl.Values[i+1].SockInfo != nil {
result = append(result, nl.Values[i+1])
log.Logger.Debug().Msgf("deleting socket line %v", nl.Values[i])
i = i + 2
} else {
result = append(result, nl.Values[i])
Expand Down Expand Up @@ -249,3 +263,107 @@ func insertIntoSortedSlice(sortedSlice []TimestampedSocket, newItem TimestampedS

return sortedSlice
}

// reverse slice
func reverseSlice(s []string) []string {
for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 {
s[i], s[j] = s[j], s[i]
}
return s
}

// convertHexToIP converts a hex string IP address to a human-readable IP address.
func convertHexToIP(hex string) string {
var ipParts []string
for i := 0; i < len(hex); i += 2 {
part, _ := strconv.ParseInt(hex[i:i+2], 16, 64)
ipParts = append(ipParts, fmt.Sprintf("%d", part))
}
ipParts = reverseSlice(ipParts)
return strings.Join(ipParts, ".")
}

// convertHexToPort converts a hex string port to a human-readable port.
func convertHexToPort(hex string) int {
port, _ := strconv.ParseInt(hex, 16, 64)
if port < 0 || port > 65535 {
return 0
}
return int(port)
}

func getInodeFromFD(pid, fd string) (string, error) {
fdPath := fmt.Sprintf("/proc/%s/fd/%s", pid, fd)
link, err := os.Readlink(fdPath)
if err != nil {
return "", err
}

re := regexp.MustCompile(`socket:\[(\d+)\]`)
match := re.FindStringSubmatch(link)
if len(match) < 2 {
return "", fmt.Errorf("no inode found in link: %s", link)
}

return match[1], nil
}

func findTCPConnection(inode string, pid string) (string, error) {
tcpFile, err := os.Open(fmt.Sprintf("/proc/%s/net/tcp", pid))
if err != nil {
return "", err
}
defer tcpFile.Close()

scanner := bufio.NewScanner(tcpFile)
for scanner.Scan() {
line := scanner.Text()
if strings.Contains(line, inode) {
return line, nil
}
}

return "", fmt.Errorf("no TCP connection found for inode %s", inode)
}

func parseTcpLine(line string) (localIP string, localPort int, remoteIP string, remotePort int) {
fields := strings.Fields(line)
localAddress := fields[1]
remoteAddress := fields[2]

localIP = convertHexToIP(localAddress[:8])
localPort = convertHexToPort(localAddress[9:])
remoteIP = convertHexToIP(remoteAddress[:8])
remotePort = convertHexToPort(remoteAddress[9:])

return
}

func (nl *SocketLine) getConnectionInfo() error {
inode, err := getInodeFromFD(fmt.Sprintf("%d", nl.pid), fmt.Sprintf("%d", nl.fd))
if err != nil {
return err
}

connectionInfo, err := findTCPConnection(inode, fmt.Sprintf("%d", nl.pid))
if err != nil {
return err
}

localIP, localPort, remoteIP, remotePort := parseTcpLine(connectionInfo)

skInfo := &SockInfo{
Pid: nl.pid,
Fd: nl.fd,
Saddr: localIP,
Sport: uint16(localPort),
Daddr: remoteIP,
Dport: uint16(remotePort),
}

// add to socket line
// convert to bpf time
log.Logger.Debug().Msgf("Adding socket line read from user space %v", skInfo)
nl.AddValue(convertUserTimeToKernelTime(uint64(time.Now().UnixNano())), skInfo)
return nil
}
Binary file modified ebpf/c/bpf_bpfeb.o
Binary file not shown.
Binary file modified ebpf/c/bpf_bpfel.o
Binary file not shown.
15 changes: 8 additions & 7 deletions ebpf/c/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,15 @@ SEC("tracepoint/syscalls/sys_exit_connect")
int sys_exit_connect(void *ctx)
{
__u64 id = bpf_get_current_pid_tgid();
__u32 pid = id >> 32;

__u8 *val = bpf_map_lookup_elem(&container_pids, &pid);
if (!val)
{
return 0; // not a container process, ignore
}
// __u32 pid = id >> 32;

bpf_map_delete_elem(&fd_by_pid_tgid, &id);

// __u8 *val = bpf_map_lookup_elem(&container_pids, &pid);
// if (!val)
// {
// return 0; // not a container process, ignore
// }

return 0;
}
13 changes: 11 additions & 2 deletions ebpf/l7_req/l7.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"os"
"sync"
"time"
"unsafe"

Expand Down Expand Up @@ -247,6 +248,9 @@ func (e RedisMethodConversion) String() string {
}
}

var FirstKernelTime uint64 = 0 // nanoseconds since boot
var FirstUserspaceTime uint64 = 0

// $BPF_CLANG and $BPF_CFLAGS are set by the Makefile.
// // go:generate go run github.com/cilium/ebpf/cmd/bpf2go -cc $BPF_CLANG -cflags $BPF_CFLAGS bpf l7.c -- -I../headers

Expand Down Expand Up @@ -320,7 +324,6 @@ type L7Event struct {
WriteTimeNs uint64 // start time of write syscall
Tid uint32
Seq uint32 // tcp seq num
EventReadTime int64
}

const L7_EVENT = "l7_event"
Expand Down Expand Up @@ -567,6 +570,7 @@ func (l7p *L7Prog) Consume(ctx context.Context, ch chan interface{}) {
}
}()

readKernelTime := &sync.Once{}
go func() {
var record perf.Record
droppedCount := 0
Expand All @@ -588,6 +592,12 @@ func (l7p *L7Prog) Consume(ctx context.Context, ch chan interface{}) {

l7Event := (*bpfL7Event)(unsafe.Pointer(&record.RawSample[0]))

// runs once
readKernelTime.Do(func() {
FirstUserspaceTime = uint64(time.Now().UnixNano())
FirstKernelTime = l7Event.WriteTimeNs
})

protocol := L7ProtocolConversion(l7Event.Protocol).String()
var method string
switch protocol {
Expand Down Expand Up @@ -624,7 +634,6 @@ func (l7p *L7Prog) Consume(ctx context.Context, ch chan interface{}) {
WriteTimeNs: l7Event.WriteTimeNs,
Tid: l7Event.Tid,
Seq: l7Event.Seq,
EventReadTime: time.Now().UnixMilli(),
}

go func(l7Event *L7Event) {
Expand Down
5 changes: 2 additions & 3 deletions main_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,9 +597,8 @@ func (sim *Simulator) httpTraffic(ctx context.Context, t *Traffic) {
WriteTimeNs: t.pod.OpenConnections[t.fd] + 10,

// tracing purposes
Tid: 0,
Seq: 0,
EventReadTime: 0,
Tid: 0,
Seq: 0,
}
// select {
// case
Expand Down

0 comments on commit 0b50fc1

Please sign in to comment.