Skip to content

Commit

Permalink
Merge pull request #178 from getanteon/develop
Browse files Browse the repository at this point in the history
MongoDB protocol support
  • Loading branch information
fatihbaltaci authored Aug 7, 2024
2 parents cf52349 + 4311fb6 commit eb23447
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 1 deletion.
96 changes: 95 additions & 1 deletion aggregator/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,13 @@ func (a *Aggregator) decodeKafkaPayload(d *l7_req.L7Event) ([]*KafkaMessage, err
// var message protocol.Message
// var err error

defer func() {
if r := recover(); r != nil {
log.Logger.Debug().Any("r", r).
Msg("recovered from kafka event,probably slice out of bounds") // since we read 1024 bytes at most from ebpf, slice out of bounds can occur
}
}()

result := make([]*KafkaMessage, 0)

if d.Method == l7_req.KAFKA_PRODUCE_REQUEST {
Expand Down Expand Up @@ -1241,6 +1248,42 @@ func (a *Aggregator) processHttpEvent(ctx context.Context, d *l7_req.L7Event) {

}

func (a *Aggregator) processMongoEvent(ctx context.Context, d *l7_req.L7Event) {
query, err := a.parseMongoEvent(d)
if err != nil {
log.Logger.Error().AnErr("err", err)
return
}
addrPair := extractAddressPair(d)

reqDto := &datastore.Request{
StartTime: int64(convertKernelTimeToUserspaceTime(d.WriteTimeNs) / 1e6),
Latency: d.Duration,
FromIP: addrPair.Saddr,
ToIP: addrPair.Daddr,
Protocol: d.Protocol,
Tls: d.Tls,
Completed: true,
StatusCode: d.Status,
FailReason: "",
Method: d.Method,
Path: query,
// dist tracing disabled by default temporarily
// Tid: d.Tid,
// Seq: d.Seq,
}

err = a.setFromToV2(addrPair, d, reqDto, "")
if err != nil {
return
}

err = a.ds.PersistRequest(reqDto)
if err != nil {
log.Logger.Error().Err(err).Msg("error persisting request")
}
}

func (a *Aggregator) processMySQLEvent(ctx context.Context, d *l7_req.L7Event) {
query, err := a.parseMySQLCommand(d)
if err != nil {
Expand Down Expand Up @@ -1271,7 +1314,6 @@ func (a *Aggregator) processMySQLEvent(ctx context.Context, d *l7_req.L7Event) {
return
}

log.Logger.Debug().Any("event", reqDto).Msg("persisting mysql-event")
err = a.ds.PersistRequest(reqDto)
if err != nil {
log.Logger.Error().Err(err).Msg("error persisting request")
Expand Down Expand Up @@ -1335,6 +1377,8 @@ func (a *Aggregator) processL7(ctx context.Context, d *l7_req.L7Event) {
a.processKafkaEvent(ctx, d)
case l7_req.L7_PROTOCOL_MYSQL:
a.processMySQLEvent(ctx, d)
case l7_req.L7_PROTOCOL_MONGO:
a.processMongoEvent(ctx, d)
}
}

Expand Down Expand Up @@ -1511,6 +1555,56 @@ func (a *Aggregator) parsePostgresCommand(d *l7_req.L7Event) (string, error) {
return sqlCommand, nil
}

func (a *Aggregator) parseMongoEvent(d *l7_req.L7Event) (string, error) {
defer func() {
if r := recover(); r != nil {
log.Logger.Debug().Any("r", r).
Msg("recovered from mongo event,probably slice out of bounds")
}
}()

payload := d.Payload[:d.PayloadSize]

// cut mongo header, 4 bytes MessageLength, 4 bytes RequestID, 4 bytes ResponseTo, 4 bytes Opcode, 4 bytes MessageFlags
payload = payload[20:]

kind := payload[0]
payload = payload[1:] // cut kind
if kind == 0 { // body
docLenBytes := payload[:4] // document length
docLen := binary.LittleEndian.Uint32(docLenBytes)
payload = payload[4:docLen] // cut docLen
// parse Element
type_ := payload[0] // 2 means string
if type_ != 2 {
return "", fmt.Errorf("document element not a string")
}
payload = payload[1:] // cut type

// read until NULL
element := []uint8{}
for _, p := range payload {
if p == 0 {
break
}
element = append(element, p)
}

// 1 byte NULL, 4 bytes len
elementLenBytes := payload[len(element)+1 : len(element)+1+4]
elementLength := binary.LittleEndian.Uint32(elementLenBytes)

payload = payload[len(element)+5:] // cut element + null + len
elementValue := payload[:elementLength-1] // myCollection, last byte is null

result := fmt.Sprintf("%s %s", string(element), string(elementValue))
log.Logger.Debug().Str("result", result).Msg("mongo-elem-result")
return result, nil
}

return "", fmt.Errorf("could not parse mongo event")
}

func (a *Aggregator) getPgStmtKey(pid uint32, fd uint64, stmtName string) string {
return fmt.Sprintf("%s-%s", a.getConnKey(pid, fd), stmtName)
}
Expand Down
1 change: 1 addition & 0 deletions ebpf/c/bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "redis.c"
#include "kafka.c"
#include "mysql.c"
#include "mongo.c"
#include "openssl.c"
#include "http2.c"
#include "tcp_sock.c"
Expand Down
Binary file modified ebpf/c/bpf_bpfeb.o
Binary file not shown.
Binary file modified ebpf/c/bpf_bpfel.o
Binary file not shown.
9 changes: 9 additions & 0 deletions ebpf/c/l7.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#define PROTOCOL_REDIS 5
#define PROTOCOL_KAFKA 6
#define PROTOCOL_MYSQL 7
#define PROTOCOL_MONGO 8



Expand Down Expand Up @@ -329,6 +330,8 @@ int process_enter_of_syscalls_write_sendto(void* ctx, __u64 fd, __u8 is_tls, cha
return 0;
}
req->protocol = PROTOCOL_MYSQL;
}else if(is_mongo_request(buf,count)){
req->protocol = PROTOCOL_MONGO;
}else if (is_http2_frame(buf, count)){
struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
if (!e) {
Expand Down Expand Up @@ -875,11 +878,17 @@ int process_exit_of_syscalls_read_recvfrom(void* ctx, __u64 id, __u32 pid, __s64
}else if(active_req->request_type == MYSQL_COM_QUERY){
e->method = METHOD_MYSQL_TEXT_QUERY;
}
}else if (e->protocol == PROTOCOL_MONGO){
e->status = is_mongo_reply(read_info->buf, ret);
}
}else{
bpf_map_delete_elem(&active_reads, &id);
return 0;
}

if (e->status == 0){
return 0;
}

bpf_map_delete_elem(&active_reads, &id);
bpf_map_delete_elem(&active_l7_requests, &k);
Expand Down
93 changes: 93 additions & 0 deletions ebpf/c/mongo.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
//go:build ignore
// https://www.mongodb.com/docs/manual/reference/mongodb-wire-protocol/
// Mongo Request Query
// 4 bytes message len
// 4 bytes request id
// 4 bytes response to
// 4 bytes opcode (2004 for Query)
// 4 bytes query flags
// fullCollectionName : ?
// 4 bytes number to skip
// 4 bytes number to return
// 4 bytes Document Length
// Elements

// Extensible Message Format
// 4 bytes len
// 4 bytes request id
// 4 bytes response to
// 4 bytes opcode (2013 for extensible message format)
// 4 bytes message flags
// Section
// 1 byte Kind (0 for body)
// BodyDocument
// 4 bytes document length
// Elements
// Section
// Kind : Document Sequence (1)
// SeqId: "documents"
// DocumentSequence
// Document
// 4 bytes doc len

// For response:
// same with above

#define MONGO_OP_COMPRESSED 2012 // Wraps other opcodes using compression
#define MONGO_OP_MSG 2013 // Send a message using the standard format. Used for both client requests and database replies.

// https://www.mongodb.com/docs/manual/reference/mongodb-wire-protocol/#standard-message-header
struct mongo_header {
__s32 length; // total message size, including this
__s32 request_id; // identifier for this message
__s32 response_to; // requestID from the original request (used in responses from the database)
__s32 opcode; // message type
};

struct mongo_header_wout_len {
// __s32 length; // total message size, including this
__s32 request_id; // identifier for this message
__s32 response_to; // requestID from the original request (used in responses from the database)
__s32 opcode; // message type
};

static __always_inline
int is_mongo_request(char *buf, __u64 buf_size) {
struct mongo_header h = {};
if (bpf_probe_read(&h, sizeof(h), (void *)((char *)buf)) < 0) {
return 0;
}
if (h.response_to == 0 && (h.opcode == MONGO_OP_MSG || h.opcode == MONGO_OP_COMPRESSED)) {
bpf_printk("this is a mongo_request\n");
return 1;
}
return 0;
}

// mongo replies read in 2 parts
// [pid 286873] read(7, "\x2d\x00\x00\x00", 4) = 4 // these 4 bytes are length
// [pid 286873] read(7, "\xe1\x0b\x00\x00 \x09\x00\x00\x00 \xdd\x07\x00\x00 // request_id - response_to - opcode
// \x00\x00\x00\x00\x00\x18\x00\x00\x00\x10
// \x6e\x00
// \x01\x00\x00\x00\x01\x6f\x6b\x00"..., 41) = 41static __always_inline
// (ok)
static __always_inline
int is_mongo_reply(char *buf, __u64 buf_size) {
struct mongo_header_wout_len h = {};
if (bpf_probe_read(&h, sizeof(h), (void *)((char *)buf)) < 0) {
bpf_printk("this is a mongo_reply_header_fail\n");
return 0;
}
if (h.response_to == 0) {
bpf_printk("this is a mongo_reply_response_to0, - %d\n",h.opcode);
return 0;
}
if (h.opcode == MONGO_OP_MSG || h.opcode == MONGO_OP_COMPRESSED) {
bpf_printk("this is a mongo_reply\n");
return 1;
}

bpf_printk("this is a mongo_reply-fail - %d\n",h.opcode);
return 0;
}

6 changes: 6 additions & 0 deletions ebpf/l7_req/l7.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
BPF_L7_PROTOCOL_REDIS
BPF_L7_PROTOCOL_KAFKA
BPF_L7_PROTOCOL_MYSQL
BPF_L7_PROTOCOL_MONGO
)

// for user space
Expand All @@ -36,6 +37,7 @@ const (
L7_PROTOCOL_REDIS = "REDIS"
L7_PROTOCOL_KAFKA = "KAFKA"
L7_PROTOCOL_MYSQL = "MYSQL"
L7_PROTOCOL_MONGO = "MONGO"
L7_PROTOCOL_UNKNOWN = "UNKNOWN"
)

Expand All @@ -59,6 +61,8 @@ func (e L7ProtocolConversion) String() string {
return L7_PROTOCOL_KAFKA
case BPF_L7_PROTOCOL_MYSQL:
return L7_PROTOCOL_MYSQL
case BPF_L7_PROTOCOL_MONGO:
return L7_PROTOCOL_MONGO
case BPF_L7_PROTOCOL_UNKNOWN:
return L7_PROTOCOL_UNKNOWN
default:
Expand Down Expand Up @@ -723,6 +727,8 @@ func (l7p *L7Prog) Consume(ctx context.Context, ch chan interface{}) {
case L7_PROTOCOL_MYSQL:
method = MySQLMethodConversion(l7Event.Method).String()
// no method set for kafka on kernel side
// no method set for mongo on kernel side

default:
method = "Unknown"
}
Expand Down

0 comments on commit eb23447

Please sign in to comment.