Skip to content

Commit

Permalink
Merge pull request #96 from ddosify/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
fatihbaltaci authored Feb 21, 2024
2 parents b589bfb + c63835d commit 369d355
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 27 deletions.
54 changes: 33 additions & 21 deletions aggregator/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ type ClusterInfo struct {

var (
// default exponential backoff (*2)
// when retryLimit is increased, we are blocking the events that we wait it to be processed more
retryInterval = 50 * time.Millisecond
retryLimit = 2
// 400 + 800 + 1600 + 3200 + 6400 = 12400 ms
// when attemptLimit is increased, we are blocking the events that we wait it to be processed more
retryInterval = 20 * time.Millisecond
attemptLimit = 3 // total attempt
// 1st try - 20ms - 2nd try - 40ms - 3rd try

defaultExpiration = 5 * time.Minute
purgeTime = 10 * time.Minute
Expand Down Expand Up @@ -368,7 +368,6 @@ func (a *Aggregator) processEbpfTcp(ctx context.Context) {
a.processTcpConnect(d)
}
}

}
}

Expand Down Expand Up @@ -460,6 +459,7 @@ func (a *Aggregator) signalTlsAttachment(pid uint32) {
func (a *Aggregator) processTcpConnect(d *tcp_state.TcpConnectEvent) {
go a.signalTlsAttachment(d.Pid)
if d.Type_ == tcp_state.EVENT_TCP_ESTABLISHED {

// filter out localhost connections
if d.SAddr == "127.0.0.1" || d.DAddr == "127.0.0.1" {
return
Expand Down Expand Up @@ -500,6 +500,11 @@ func (a *Aggregator) processTcpConnect(d *tcp_state.TcpConnectEvent) {
var sockMap *SocketMap
var ok bool

// filter out localhost connections
if d.SAddr == "127.0.0.1" || d.DAddr == "127.0.0.1" {
return
}

sockMap = a.clusterInfo.SocketMaps[d.Pid]

var skLine *SocketLine
Expand Down Expand Up @@ -934,14 +939,27 @@ func (a *Aggregator) processL7(ctx context.Context, d *l7_req.L7Event) {
return
}

var path string
if d.Protocol == l7_req.L7_PROTOCOL_POSTGRES {
// parse sql command from payload
// path = sql command
// method = sql message type
var err error
path, err = a.parseSqlCommand(d)
if err != nil {
log.Logger.Error().AnErr("err", err)
return
}
}

skInfo := a.findRelatedSocket(ctx, d)
if skInfo == nil {
log.Logger.Debug().Uint32("pid", d.Pid).
Uint64("fd", d.Fd).Uint64("writeTime", d.WriteTimeNs).
Str("protocol", d.Protocol).Any("payload", string(d.Payload[:])).Msg("socket not found")
return
}

// Since we process events concurrently
// TCP events and L7 events can be processed out of order

reqDto := datastore.Request{
StartTime: d.EventReadTime,
Latency: d.Duration,
Expand All @@ -957,28 +975,21 @@ func (a *Aggregator) processL7(ctx context.Context, d *l7_req.L7Event) {
Seq: d.Seq,
}

if d.Protocol == l7_req.L7_PROTOCOL_POSTGRES {
// parse sql command from payload
// path = sql command
// method = sql message type
var err error
reqDto.Path, err = a.parseSqlCommand(d)
if err != nil {
log.Logger.Error().AnErr("err", err)
return
}
}
// Since we process events concurrently
// TCP events and L7 events can be processed out of order

var reqHostHeader string
// parse http payload, extract path, query params, headers
if d.Protocol == l7_req.L7_PROTOCOL_HTTP {
_, reqDto.Path, _, reqHostHeader = parseHttpPayload(string(d.Payload[0:d.PayloadSize]))
_, path, _, reqHostHeader = parseHttpPayload(string(d.Payload[0:d.PayloadSize]))
}

err := a.setFromTo(skInfo, d, &reqDto, reqHostHeader)
if err != nil {
return
}

reqDto.Path = path
reqDto.Completed = !d.Failed

// In AMQP-DELIVER event, we are capturing from read syscall,
Expand Down Expand Up @@ -1122,7 +1133,7 @@ func (a *Aggregator) getAlreadyExistingSockets(pid uint32) {
}

func (a *Aggregator) fetchSkInfo(ctx context.Context, skLine *SocketLine, d *l7_req.L7Event) *SockInfo {
rc := retryLimit
rc := attemptLimit
rt := retryInterval
var skInfo *SockInfo
var err error
Expand All @@ -1132,6 +1143,7 @@ func (a *Aggregator) fetchSkInfo(ctx context.Context, skLine *SocketLine, d *l7_
if err == nil && skInfo != nil {
break
}
log.Logger.Debug().Err(err).Uint32("pid", d.Pid).Uint64("fd", d.Fd).Uint64("writeTime", d.WriteTimeNs).Msg("retry to get skInfo...")
rc--
if rc == 0 {
break
Expand Down
16 changes: 10 additions & 6 deletions ebpf/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"io/fs"
"os"
"strings"
"sync"
"time"
"unsafe"
Expand Down Expand Up @@ -186,10 +187,10 @@ func (e *EbpfCollector) AttachUprobesForEncrypted() {
e.tlsPidMap[pid] = struct{}{}
e.mu.Unlock()

go func() {
go func(pid uint32) {
// attach to libssl uprobes if process is using libssl
errs := e.AttachSslUprobesOnProcess("/proc", pid)
if errs != nil && len(errs) > 0 {
if len(errs) > 0 {
for _, err := range errs {
if errorspkg.Is(err, fs.ErrNotExist) {
// no such file or directory error
Expand All @@ -203,20 +204,20 @@ func (e *EbpfCollector) AttachUprobesForEncrypted() {
}

go_errs := e.AttachGoTlsUprobesOnProcess("/proc", pid)
if go_errs != nil && len(go_errs) > 0 {
if len(go_errs) > 0 {
for _, err := range go_errs {
if errorspkg.Is(err, fs.ErrNotExist) {
// no such file or directory error
// executable is not found,
// it's probably a kernel thread, or a very short lived process
continue
}
log.Logger.Error().Err(err).Uint32("pid", pid).
log.Logger.Error().Err(err).
Msgf("error attaching go tls for pid: %d", pid)
}
}

}()
}(pid)

}
}
Expand Down Expand Up @@ -261,7 +262,10 @@ func (e *EbpfCollector) AttachGoTlsUprobesOnProcess(procfs string, pid uint32) [
// read build info of a go executable
bi, err := buildinfo.ReadFile(path)
if err != nil {
// TODO: check if error is "not a Go executable"
if strings.HasSuffix(err.Error(), "not a Go executable") || strings.Contains(err.Error(), "no such file or directory") {
log.Logger.Debug().Str("reason", "gotls").Uint32("pid", pid).Msg("not a Go executable")
return errors
}
log.Logger.Debug().Err(err).Uint32("pid", pid).Msg("error reading build info")
errors = append(errors, err)
return errors
Expand Down

0 comments on commit 369d355

Please sign in to comment.