From a913155935a0c6caabe1d212b0ad745c68e005f2 Mon Sep 17 00:00:00 2001 From: kenanfarukcakir Date: Wed, 21 Feb 2024 15:55:40 +0300 Subject: [PATCH 1/2] refactor socket map mutexes --- aggregator/data.go | 131 ++++++++++++++++++++++++++++++++------------- 1 file changed, 93 insertions(+), 38 deletions(-) diff --git a/aggregator/data.go b/aggregator/data.go index 09a8b21..54b54cc 100644 --- a/aggregator/data.go +++ b/aggregator/data.go @@ -77,6 +77,10 @@ type Aggregator struct { // Used to rate limit and drop trace events based on pid rateLimiters map[uint32]*rate.Limiter // pid -> rateLimiter rateLimitMu sync.RWMutex + + // Used to find the correct mutex for the pid, some pids can share the same mutex + muIndex int + muArray []*sync.RWMutex } // We need to keep track of the following @@ -106,7 +110,7 @@ type http2Parser struct { // type SocketMap type SocketMap struct { - mu sync.RWMutex + mu *sync.RWMutex M map[uint64]*SocketLine `json:"fdToSockLine"` // fd -> SockLine } @@ -166,22 +170,6 @@ func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{}, ServiceIPToServiceUid: map[string]types.UID{}, } - maxPid, err := getPidMax() - if err != nil { - log.Logger.Fatal().Err(err).Msg("error getting max pid") - } - sockMaps := make([]*SocketMap, maxPid+1) // index=pid - - // initialize sockMaps - for i := range sockMaps { - sockMaps[i] = &SocketMap{ - M: nil, // initialized on demand later - mu: sync.RWMutex{}, - } - } - - clusterInfo.SocketMaps = sockMaps - a := &Aggregator{ ctx: ctx, k8sChan: k8sChan, @@ -197,14 +185,62 @@ func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{}, liveProcesses: make(map[uint32]struct{}), rateLimiters: make(map[uint32]*rate.Limiter), pgStmts: make(map[string]string), + muIndex: 0, + muArray: nil, } - go a.clearSocketLines(ctx) + maxPid, err := getPidMax() + if err != nil { + log.Logger.Fatal().Err(err).Msg("error getting max pid") + } + sockMaps := make([]*SocketMap, maxPid+1) // index=pid + // initialize sockMaps + for i := range sockMaps { + sockMaps[i] = &SocketMap{ + M: nil, // initialized on demand later + mu: nil, + } + } + clusterInfo.SocketMaps = sockMaps + + a.getLiveProcesses() + + a.liveProcessesMu.RLock() + countLiveProcesses := len(a.liveProcesses) + a.liveProcessesMu.RUnlock() + + // normally, mutex per pid is straightforward solution + // on regular systems, maxPid is around 32768 + // so, we allocate 32768 mutexes, which is 32768 * 24 bytes = 786KB + // but on 64-bit systems, maxPid can be 4194304 + // and we don't want to allocate 4194304 mutexes, it adds up to 4194304 * 24 bytes = 100MB + // So, some process will have to share the mutex + // 1 mutex per 100 pid is a good trade-off + // 4194304 / 100 = 41943 + // 41943 * 24 bytes = 1MB + + countMuArray := countLiveProcesses * 100 + if countMuArray > maxPid { + countMuArray = maxPid + } + // for 2k processes, 200k mutex => 200k * 24 bytes = 4.80MB + // in case of maxPid is 32678, 32678 * 24 bytes = 784KB, pick the smaller one + a.muArray = make([]*sync.RWMutex, countMuArray) + + // set distinct mutex for every live process + a.muIndex = 0 + for pid := range a.liveProcesses { + a.muArray[a.muIndex] = &sync.RWMutex{} + sockMaps[pid].mu = a.muArray[a.muIndex] + a.muIndex++ + a.getAlreadyExistingSockets(pid) + } + go a.clearSocketLines(ctx) return a } -func (a *Aggregator) Run() { +func (a *Aggregator) getLiveProcesses() { // get all alive processes, populate liveProcesses cmd := exec.Command("ps", "-e", "-o", "pid=") output, err := cmd.Output() @@ -225,11 +261,12 @@ func (a *Aggregator) Run() { continue } a.liveProcesses[uint32(pidInt)] = struct{}{} - a.getAlreadyExistingSockets(uint32(pidInt)) } } } +} +func (a *Aggregator) Run() { go func() { // every 2 minutes, check alive processes, and clear the ones left behind // since we process events concurrently, some short-lived processes exit event can come before exec events @@ -415,16 +452,21 @@ func (a *Aggregator) getRateLimiterForPid(pid uint32) *rate.Limiter { func (a *Aggregator) processExec(d *proc.ProcEvent) { a.liveProcessesMu.Lock() + defer a.liveProcessesMu.Unlock() + a.liveProcesses[d.Pid] = struct{}{} - a.liveProcessesMu.Unlock() + + // create lock on demand + a.muArray[a.muIndex%len(a.muArray)] = &sync.RWMutex{} + a.muIndex++ + a.clusterInfo.SocketMaps[d.Pid].mu = a.muArray[a.muIndex%len(a.muArray)] } func (a *Aggregator) processExit(pid uint32) { a.liveProcessesMu.Lock() delete(a.liveProcesses, pid) - a.liveProcessesMu.Unlock() - a.removeFromClusterInfo(pid) + a.liveProcessesMu.Unlock() a.h2ParserMu.Lock() pid_s := fmt.Sprint(pid) @@ -471,6 +513,10 @@ func (a *Aggregator) processTcpConnect(d *tcp_state.TcpConnectEvent) { sockMap = a.clusterInfo.SocketMaps[d.Pid] var skLine *SocketLine + if sockMap.mu == nil { + return + } + sockMap.mu.Lock() // lock for reading if sockMap.M == nil { sockMap.M = make(map[uint64]*SocketLine) @@ -509,6 +555,10 @@ func (a *Aggregator) processTcpConnect(d *tcp_state.TcpConnectEvent) { var skLine *SocketLine + if sockMap.mu == nil { + return + } + sockMap.mu.Lock() // lock for reading if sockMap.M == nil { sockMap.M = make(map[uint64]*SocketLine) @@ -1036,21 +1086,6 @@ func getHostnameFromIP(ipAddr string) (string, error) { } } -func (a *Aggregator) fetchSkLine(sockMap *SocketMap, pid uint32, fd uint64) *SocketLine { - sockMap.mu.Lock() // lock for reading - skLine, ok := sockMap.M[fd] - - if !ok { - log.Logger.Debug().Uint32("pid", pid).Uint64("fd", fd).Msg("error finding skLine, go look for it") - // start new socket line, find already established connections - skLine = NewSocketLine(pid, fd) - sockMap.M[fd] = skLine - } - sockMap.mu.Unlock() // unlock for writing - - return skLine -} - // get all tcp sockets for the pid // iterate through all sockets // create a new socket line for each socket @@ -1121,6 +1156,10 @@ func (a *Aggregator) getAlreadyExistingSockets(pid uint32) { skLine := NewSocketLine(pid, fd.Fd) skLine.AddValue(0, sockInfo) + if sockMap.mu == nil { + return + } + sockMap.mu.Lock() if sockMap.M == nil { sockMap.M = make(map[uint64]*SocketLine) @@ -1165,6 +1204,9 @@ func (a *Aggregator) fetchSkInfo(ctx context.Context, skLine *SocketLine, d *l7_ func (a *Aggregator) removeFromClusterInfo(pid uint32) { sockMap := a.clusterInfo.SocketMaps[pid] + if sockMap.mu == nil { + return + } sockMap.mu.Lock() sockMap.M = nil sockMap.mu.Unlock() @@ -1172,6 +1214,11 @@ func (a *Aggregator) removeFromClusterInfo(pid uint32) { func (a *Aggregator) fetchSocketMap(pid uint32) *SocketMap { sockMap := a.clusterInfo.SocketMaps[pid] + + if sockMap.mu == nil { + return nil + } + sockMap.mu.Lock() if sockMap.M == nil { sockMap.M = make(map[uint64]*SocketLine) @@ -1184,6 +1231,11 @@ func (a *Aggregator) fetchSocketMap(pid uint32) *SocketMap { func (a *Aggregator) findRelatedSocket(ctx context.Context, d *l7_req.L7Event) *SockInfo { sockMap := a.clusterInfo.SocketMaps[d.Pid] // acquire sockMap lock + + if sockMap.mu == nil { + return nil + } + sockMap.mu.Lock() if sockMap.M == nil { @@ -1371,6 +1423,9 @@ func (a *Aggregator) clearSocketLines(ctx context.Context) { for range ticker.C { for _, sockMap := range a.clusterInfo.SocketMaps { + if sockMap.mu == nil { + continue + } sockMap.mu.Lock() if sockMap.M != nil { for _, skLine := range sockMap.M { From f22b956f90c6b59076dde5aa889c4c6b021878d9 Mon Sep 17 00:00:00 2001 From: kenanfarukcakir Date: Wed, 21 Feb 2024 15:58:43 +0300 Subject: [PATCH 2/2] add log for mutex usage --- aggregator/data.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/aggregator/data.go b/aggregator/data.go index 54b54cc..097230d 100644 --- a/aggregator/data.go +++ b/aggregator/data.go @@ -215,10 +215,9 @@ func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{}, // but on 64-bit systems, maxPid can be 4194304 // and we don't want to allocate 4194304 mutexes, it adds up to 4194304 * 24 bytes = 100MB // So, some process will have to share the mutex - // 1 mutex per 100 pid is a good trade-off - // 4194304 / 100 = 41943 - // 41943 * 24 bytes = 1MB + // assume liveprocesses can increase up to 100 times of current count + // if processes exceeds the count of mutex, they will share the mutex countMuArray := countLiveProcesses * 100 if countMuArray > maxPid { countMuArray = maxPid