Skip to content

Commit

Permalink
Merge pull request #179 from getanteon/feat/mongo-compression
Browse files Browse the repository at this point in the history
send compressed mongo events too
  • Loading branch information
fatihbaltaci authored Aug 8, 2024
2 parents ad36601 + ec84e1f commit d43951e
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 31 deletions.
73 changes: 42 additions & 31 deletions aggregator/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -1251,7 +1251,6 @@ func (a *Aggregator) processHttpEvent(ctx context.Context, d *l7_req.L7Event) {
func (a *Aggregator) processMongoEvent(ctx context.Context, d *l7_req.L7Event) {
query, err := a.parseMongoEvent(d)
if err != nil {
log.Logger.Error().AnErr("err", err)
return
}
addrPair := extractAddressPair(d)
Expand All @@ -1278,6 +1277,7 @@ func (a *Aggregator) processMongoEvent(ctx context.Context, d *l7_req.L7Event) {
return
}

log.Logger.Debug().Str("path", reqDto.Path).Msg("processmongoEvent persisting")
err = a.ds.PersistRequest(reqDto)
if err != nil {
log.Logger.Error().Err(err).Msg("error persisting request")
Expand Down Expand Up @@ -1555,6 +1555,9 @@ func (a *Aggregator) parsePostgresCommand(d *l7_req.L7Event) (string, error) {
return sqlCommand, nil
}

var MongoOpCompressed uint32 = 2012
var MongoOpMsg uint32 = 2013

func (a *Aggregator) parseMongoEvent(d *l7_req.L7Event) (string, error) {
defer func() {
if r := recover(); r != nil {
Expand All @@ -1565,41 +1568,49 @@ func (a *Aggregator) parseMongoEvent(d *l7_req.L7Event) (string, error) {

payload := d.Payload[:d.PayloadSize]

// cut mongo header, 4 bytes MessageLength, 4 bytes RequestID, 4 bytes ResponseTo, 4 bytes Opcode, 4 bytes MessageFlags
payload = payload[20:]

kind := payload[0]
payload = payload[1:] // cut kind
if kind == 0 { // body
docLenBytes := payload[:4] // document length
docLen := binary.LittleEndian.Uint32(docLenBytes)
payload = payload[4:docLen] // cut docLen
// parse Element
type_ := payload[0] // 2 means string
if type_ != 2 {
return "", fmt.Errorf("document element not a string")
}
payload = payload[1:] // cut type
// cut mongo header, 4 bytes MessageLength, 4 bytes RequestID, 4 bytes ResponseTo
payload = payload[12:]
// cut 4 bytes Opcode, 4 bytes MessageFlags
opcode := payload[:4]
payload = payload[8:]

opcodeInt := binary.LittleEndian.Uint32(opcode)

if opcodeInt == MongoOpCompressed {
return "compressed mongo event", nil
} else if opcodeInt == MongoOpMsg {
kind := payload[0]
payload = payload[1:] // cut kind
if kind == 0 { // body
docLenBytes := payload[:4] // document length
docLen := binary.LittleEndian.Uint32(docLenBytes)
payload = payload[4:docLen] // cut docLen
// parse Element
type_ := payload[0] // 2 means string
if type_ != 2 {
return "", fmt.Errorf("document element not a string")
}
payload = payload[1:] // cut type

// read until NULL
element := []uint8{}
for _, p := range payload {
if p == 0 {
break
// read until NULL
element := []uint8{}
for _, p := range payload {
if p == 0 {
break
}
element = append(element, p)
}
element = append(element, p)
}

// 1 byte NULL, 4 bytes len
elementLenBytes := payload[len(element)+1 : len(element)+1+4]
elementLength := binary.LittleEndian.Uint32(elementLenBytes)
// 1 byte NULL, 4 bytes len
elementLenBytes := payload[len(element)+1 : len(element)+1+4]
elementLength := binary.LittleEndian.Uint32(elementLenBytes)

payload = payload[len(element)+5:] // cut element + null + len
elementValue := payload[:elementLength-1] // myCollection, last byte is null
payload = payload[len(element)+5:] // cut element + null + len
elementValue := payload[:elementLength-1] // myCollection, last byte is null

result := fmt.Sprintf("%s %s", string(element), string(elementValue))
log.Logger.Debug().Str("result", result).Msg("mongo-elem-result")
return result, nil
result := fmt.Sprintf("%s %s", string(element), string(elementValue))
return result, nil
}
}

return "", fmt.Errorf("could not parse mongo event")
Expand Down
5 changes: 5 additions & 0 deletions ebpf/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,18 +243,23 @@ func (e *EbpfCollector) close() {
for pid := range e.sslWriteUprobes {
e.sslWriteUprobes[pid].Close()
}
log.Logger.Info().Msg("closed sslWriteUprobes")
for pid := range e.sslReadEnterUprobes {
e.sslReadEnterUprobes[pid].Close()
}
log.Logger.Info().Msg("closed sslReadEnterUprobes")
for pid := range e.sslReadURetprobes {
e.sslReadURetprobes[pid].Close()
}
log.Logger.Info().Msg("closed sslReadURetprobes")
for pid := range e.goTlsWriteUprobes {
e.goTlsWriteUprobes[pid].Close()
}
log.Logger.Info().Msg("closed goTlsWriteUprobes")
for pid := range e.goTlsReadUprobes {
e.goTlsReadUprobes[pid].Close()
}
log.Logger.Info().Msg("closed goTlsReadUprobes")
for pid := range e.goTlsReadUretprobes {
for _, l := range e.goTlsReadUretprobes[pid] {
l.Close()
Expand Down

0 comments on commit d43951e

Please sign in to comment.