Skip to content

Commit

Permalink
Merge pull request #98 from ddosify/develop
Browse files Browse the repository at this point in the history
refactor socket map mutexes
  • Loading branch information
fatihbaltaci authored Feb 21, 2024
2 parents 369d355 + 31a7dbe commit 8cb8674
Showing 1 changed file with 92 additions and 38 deletions.
130 changes: 92 additions & 38 deletions aggregator/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ type Aggregator struct {
// Used to rate limit and drop trace events based on pid
rateLimiters map[uint32]*rate.Limiter // pid -> rateLimiter
rateLimitMu sync.RWMutex

// Used to find the correct mutex for the pid, some pids can share the same mutex
muIndex int
muArray []*sync.RWMutex
}

// We need to keep track of the following
Expand Down Expand Up @@ -106,7 +110,7 @@ type http2Parser struct {

// type SocketMap
type SocketMap struct {
mu sync.RWMutex
mu *sync.RWMutex
M map[uint64]*SocketLine `json:"fdToSockLine"` // fd -> SockLine
}

Expand Down Expand Up @@ -166,22 +170,6 @@ func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{},
ServiceIPToServiceUid: map[string]types.UID{},
}

maxPid, err := getPidMax()
if err != nil {
log.Logger.Fatal().Err(err).Msg("error getting max pid")
}
sockMaps := make([]*SocketMap, maxPid+1) // index=pid

// initialize sockMaps
for i := range sockMaps {
sockMaps[i] = &SocketMap{
M: nil, // initialized on demand later
mu: sync.RWMutex{},
}
}

clusterInfo.SocketMaps = sockMaps

a := &Aggregator{
ctx: ctx,
k8sChan: k8sChan,
Expand All @@ -197,14 +185,61 @@ func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{},
liveProcesses: make(map[uint32]struct{}),
rateLimiters: make(map[uint32]*rate.Limiter),
pgStmts: make(map[string]string),
muIndex: 0,
muArray: nil,
}

go a.clearSocketLines(ctx)
maxPid, err := getPidMax()
if err != nil {
log.Logger.Fatal().Err(err).Msg("error getting max pid")
}
sockMaps := make([]*SocketMap, maxPid+1) // index=pid
// initialize sockMaps
for i := range sockMaps {
sockMaps[i] = &SocketMap{
M: nil, // initialized on demand later
mu: nil,
}
}
clusterInfo.SocketMaps = sockMaps

a.getLiveProcesses()

a.liveProcessesMu.RLock()
countLiveProcesses := len(a.liveProcesses)
a.liveProcessesMu.RUnlock()

// normally, mutex per pid is straightforward solution
// on regular systems, maxPid is around 32768
// so, we allocate 32768 mutexes, which is 32768 * 24 bytes = 786KB
// but on 64-bit systems, maxPid can be 4194304
// and we don't want to allocate 4194304 mutexes, it adds up to 4194304 * 24 bytes = 100MB
// So, some process will have to share the mutex

// assume liveprocesses can increase up to 100 times of current count
// if processes exceeds the count of mutex, they will share the mutex
countMuArray := countLiveProcesses * 100
if countMuArray > maxPid {
countMuArray = maxPid
}
// for 2k processes, 200k mutex => 200k * 24 bytes = 4.80MB
// in case of maxPid is 32678, 32678 * 24 bytes = 784KB, pick the smaller one
a.muArray = make([]*sync.RWMutex, countMuArray)

// set distinct mutex for every live process
a.muIndex = 0
for pid := range a.liveProcesses {
a.muArray[a.muIndex] = &sync.RWMutex{}
sockMaps[pid].mu = a.muArray[a.muIndex]
a.muIndex++
a.getAlreadyExistingSockets(pid)
}

go a.clearSocketLines(ctx)
return a
}

func (a *Aggregator) Run() {
func (a *Aggregator) getLiveProcesses() {
// get all alive processes, populate liveProcesses
cmd := exec.Command("ps", "-e", "-o", "pid=")
output, err := cmd.Output()
Expand All @@ -225,11 +260,12 @@ func (a *Aggregator) Run() {
continue
}
a.liveProcesses[uint32(pidInt)] = struct{}{}
a.getAlreadyExistingSockets(uint32(pidInt))
}
}
}
}

func (a *Aggregator) Run() {
go func() {
// every 2 minutes, check alive processes, and clear the ones left behind
// since we process events concurrently, some short-lived processes exit event can come before exec events
Expand Down Expand Up @@ -415,16 +451,21 @@ func (a *Aggregator) getRateLimiterForPid(pid uint32) *rate.Limiter {

func (a *Aggregator) processExec(d *proc.ProcEvent) {
a.liveProcessesMu.Lock()
defer a.liveProcessesMu.Unlock()

a.liveProcesses[d.Pid] = struct{}{}
a.liveProcessesMu.Unlock()

// create lock on demand
a.muArray[a.muIndex%len(a.muArray)] = &sync.RWMutex{}
a.muIndex++
a.clusterInfo.SocketMaps[d.Pid].mu = a.muArray[a.muIndex%len(a.muArray)]
}

func (a *Aggregator) processExit(pid uint32) {
a.liveProcessesMu.Lock()
delete(a.liveProcesses, pid)
a.liveProcessesMu.Unlock()

a.removeFromClusterInfo(pid)
a.liveProcessesMu.Unlock()

a.h2ParserMu.Lock()
pid_s := fmt.Sprint(pid)
Expand Down Expand Up @@ -471,6 +512,10 @@ func (a *Aggregator) processTcpConnect(d *tcp_state.TcpConnectEvent) {
sockMap = a.clusterInfo.SocketMaps[d.Pid]
var skLine *SocketLine

if sockMap.mu == nil {
return
}

sockMap.mu.Lock() // lock for reading
if sockMap.M == nil {
sockMap.M = make(map[uint64]*SocketLine)
Expand Down Expand Up @@ -509,6 +554,10 @@ func (a *Aggregator) processTcpConnect(d *tcp_state.TcpConnectEvent) {

var skLine *SocketLine

if sockMap.mu == nil {
return
}

sockMap.mu.Lock() // lock for reading
if sockMap.M == nil {
sockMap.M = make(map[uint64]*SocketLine)
Expand Down Expand Up @@ -1036,21 +1085,6 @@ func getHostnameFromIP(ipAddr string) (string, error) {
}
}

func (a *Aggregator) fetchSkLine(sockMap *SocketMap, pid uint32, fd uint64) *SocketLine {
sockMap.mu.Lock() // lock for reading
skLine, ok := sockMap.M[fd]

if !ok {
log.Logger.Debug().Uint32("pid", pid).Uint64("fd", fd).Msg("error finding skLine, go look for it")
// start new socket line, find already established connections
skLine = NewSocketLine(pid, fd)
sockMap.M[fd] = skLine
}
sockMap.mu.Unlock() // unlock for writing

return skLine
}

// get all tcp sockets for the pid
// iterate through all sockets
// create a new socket line for each socket
Expand Down Expand Up @@ -1121,6 +1155,10 @@ func (a *Aggregator) getAlreadyExistingSockets(pid uint32) {
skLine := NewSocketLine(pid, fd.Fd)
skLine.AddValue(0, sockInfo)

if sockMap.mu == nil {
return
}

sockMap.mu.Lock()
if sockMap.M == nil {
sockMap.M = make(map[uint64]*SocketLine)
Expand Down Expand Up @@ -1165,13 +1203,21 @@ func (a *Aggregator) fetchSkInfo(ctx context.Context, skLine *SocketLine, d *l7_

func (a *Aggregator) removeFromClusterInfo(pid uint32) {
sockMap := a.clusterInfo.SocketMaps[pid]
if sockMap.mu == nil {
return
}
sockMap.mu.Lock()
sockMap.M = nil
sockMap.mu.Unlock()
}

func (a *Aggregator) fetchSocketMap(pid uint32) *SocketMap {
sockMap := a.clusterInfo.SocketMaps[pid]

if sockMap.mu == nil {
return nil
}

sockMap.mu.Lock()
if sockMap.M == nil {
sockMap.M = make(map[uint64]*SocketLine)
Expand All @@ -1184,6 +1230,11 @@ func (a *Aggregator) fetchSocketMap(pid uint32) *SocketMap {
func (a *Aggregator) findRelatedSocket(ctx context.Context, d *l7_req.L7Event) *SockInfo {
sockMap := a.clusterInfo.SocketMaps[d.Pid]
// acquire sockMap lock

if sockMap.mu == nil {
return nil
}

sockMap.mu.Lock()

if sockMap.M == nil {
Expand Down Expand Up @@ -1371,6 +1422,9 @@ func (a *Aggregator) clearSocketLines(ctx context.Context) {

for range ticker.C {
for _, sockMap := range a.clusterInfo.SocketMaps {
if sockMap.mu == nil {
continue
}
sockMap.mu.Lock()
if sockMap.M != nil {
for _, skLine := range sockMap.M {
Expand Down

0 comments on commit 8cb8674

Please sign in to comment.