Skip to content

Commit

Permalink
Merge pull request #162 from getanteon/feat/mysql
Browse files Browse the repository at this point in the history
Feat/mysql
  • Loading branch information
fatihbaltaci authored Jul 23, 2024
2 parents 009f6e9 + 6a5f1e5 commit b5b0912
Show file tree
Hide file tree
Showing 12 changed files with 304 additions and 17 deletions.
2 changes: 1 addition & 1 deletion Dockerfile.default
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.22.4-bullseye AS builder
FROM golang:1.22.5-bullseye AS builder
WORKDIR /app
COPY . ./
RUN apt update
Expand Down
105 changes: 97 additions & 8 deletions aggregator/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ type Aggregator struct {
pgStmtsMu sync.RWMutex
pgStmts map[string]string // pid-fd-stmtname -> query

// postgres prepared stmt
mySqlStmtsMu sync.RWMutex
mySqlStmts map[string]string // pid-fd-stmtId -> query

liveProcessesMu sync.RWMutex
liveProcesses map[uint32]struct{} // pid -> struct{}

Expand Down Expand Up @@ -153,6 +157,7 @@ func NewAggregator(parentCtx context.Context, ct *cri.CRITool, k8sChan chan inte
liveProcesses: make(map[uint32]struct{}),
rateLimiters: make(map[uint32]*rate.Limiter),
pgStmts: make(map[string]string),
mySqlStmts: make(map[string]string),
}

var err error
Expand Down Expand Up @@ -381,6 +386,14 @@ func (a *Aggregator) processExit(pid uint32) {
}
}
a.pgStmtsMu.Unlock()

a.mySqlStmtsMu.Lock()
for key, _ := range a.pgStmts {
if strings.HasPrefix(key, fmt.Sprint(pid)) {
delete(a.mySqlStmts, key)
}
}
a.mySqlStmtsMu.Unlock()
}

func (a *Aggregator) signalTlsAttachment(pid uint32) {
Expand Down Expand Up @@ -1046,12 +1059,7 @@ func (a *Aggregator) processKafkaEvent(ctx context.Context, d *l7_req.L7Event) {
return
}

if event.Type == "CONSUME" {
// TODO: reverse the from and to
// do we show arrows originating from outbound services ?
}

log.Logger.Warn().Ctx(ctx).Any("kafkaEvent", event).Msg("persist kafka event")
log.Logger.Debug().Ctx(ctx).Any("kafkaEvent", event).Msg("persist kafka event")
err = a.ds.PersistKafkaEvent(event)
if err != nil {
log.Logger.Error().Err(err).Msg("error persisting kafka event")
Expand Down Expand Up @@ -1228,12 +1236,48 @@ func (a *Aggregator) processHttpEvent(ctx context.Context, d *l7_req.L7Event) {

}

func (a *Aggregator) processMySQLEvent(ctx context.Context, d *l7_req.L7Event) {
query, err := a.parseMySQLCommand(d)
if err != nil {
log.Logger.Error().AnErr("err", err)
return
}
addrPair := extractAddressPair(d)

reqDto := &datastore.Request{
StartTime: int64(convertKernelTimeToUserspaceTime(d.WriteTimeNs) / 1e6),
Latency: d.Duration,
FromIP: addrPair.Saddr,
ToIP: addrPair.Daddr,
Protocol: d.Protocol,
Tls: d.Tls,
Completed: true,
StatusCode: d.Status,
FailReason: "",
Method: d.Method,
Path: query,
Tid: d.Tid,
Seq: d.Seq,
}

err = a.setFromToV2(addrPair, d, reqDto, "")
if err != nil {
return
}

log.Logger.Debug().Any("event", reqDto).Msg("persisting mysql-event")
err = a.ds.PersistRequest(reqDto)
if err != nil {
log.Logger.Error().Err(err).Msg("error persisting request")
}
}

func (a *Aggregator) processPostgresEvent(ctx context.Context, d *l7_req.L7Event) {
// parse sql command from payload
// path = sql command
// method = sql message type

query, err := a.parseSqlCommand(d)
query, err := a.parsePostgresCommand(d)
if err != nil {
log.Logger.Error().AnErr("err", err)
return
Expand Down Expand Up @@ -1282,6 +1326,8 @@ func (a *Aggregator) processL7(ctx context.Context, d *l7_req.L7Event) {
a.processAmqpEvent(ctx, d)
case l7_req.L7_PROTOCOL_KAFKA:
a.processKafkaEvent(ctx, d)
case l7_req.L7_PROTOCOL_MYSQL:
a.processMySQLEvent(ctx, d)
}
}

Expand Down Expand Up @@ -1331,7 +1377,50 @@ func (a *Aggregator) findRelatedSocket(ctx context.Context, d *l7_req.L7Event) (
return skInfo, nil
}

func (a *Aggregator) parseSqlCommand(d *l7_req.L7Event) (string, error) {
func (a *Aggregator) parseMySQLCommand(d *l7_req.L7Event) (string, error) {
r := d.Payload[:d.PayloadSize]
var sqlCommand string
// 3 bytes len, 1 byte package number, 1 byte command type
if len(r) < 5 {
return "", fmt.Errorf("too short for a sql query")
}
r = r[5:]
sqlCommand = string(r)
if d.Method == l7_req.MYSQL_TEXT_QUERY {
if !containsSQLKeywords(sqlCommand) {
return "", fmt.Errorf("no sql command found")
}
} else if d.Method == l7_req.MYSQL_PREPARE_STMT {
a.mySqlStmtsMu.Lock()
a.mySqlStmts[fmt.Sprintf("%d-%d-%d", d.Pid, d.Fd, d.MySqlPrepStmtId)] = string(r)
a.mySqlStmtsMu.Unlock()
} else if d.Method == l7_req.MYSQL_EXEC_STMT {
a.mySqlStmtsMu.RLock()
// extract statementId from payload
stmtId := binary.LittleEndian.Uint32(r)
query, ok := a.mySqlStmts[fmt.Sprintf("%d-%d-%d", d.Pid, d.Fd, stmtId)]
a.mySqlStmtsMu.RUnlock()
if !ok || query == "" { // we don't have the query for the prepared statement
// Execute (name of prepared statement) [(parameter)]
return fmt.Sprintf("EXECUTE %d *values*", stmtId), nil
}
sqlCommand = query
} else if d.Method == l7_req.MYSQL_STMT_CLOSE { // deallocated stmt
a.mySqlStmtsMu.Lock()
// extract statementId from payload
stmtId := binary.LittleEndian.Uint32(r)
stmtKey := fmt.Sprintf("%d-%d-%d", d.Pid, d.Fd, stmtId)
_, ok := a.mySqlStmts[stmtKey]
if ok {
delete(a.mySqlStmts, stmtKey)
}
a.mySqlStmtsMu.Unlock()
return fmt.Sprintf("CLOSE STMT %d ", stmtId), nil
}
return sqlCommand, nil
}

func (a *Aggregator) parsePostgresCommand(d *l7_req.L7Event) (string, error) {
r := d.Payload[:d.PayloadSize]
var sqlCommand string
if d.Method == l7_req.SIMPLE_QUERY {
Expand Down
6 changes: 3 additions & 3 deletions datastore/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,9 +547,9 @@ func (b *BackendDS) sendToBackend(method string, payload interface{}, endpoint s
return
}

if endpoint == reqEndpoint {
log.Logger.Debug().Str("endpoint", endpoint).Any("payload", payload).Msg("sending batch to backend")
}
// if endpoint == reqEndpoint {
// log.Logger.Debug().Str("endpoint", endpoint).Any("payload", payload).Msg("sending batch to backend")
// }
err = b.DoRequest(httpReq)
if err != nil {
log.Logger.Error().Msgf("backend persist error at ep %s : %v", endpoint, err)
Expand Down
1 change: 1 addition & 0 deletions ebpf/c/bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "postgres.c"
#include "redis.c"
#include "kafka.c"
#include "mysql.c"
#include "openssl.c"
#include "http2.c"
#include "tcp_sock.c"
Expand Down
3 changes: 2 additions & 1 deletion ebpf/c/bpf_bpfeb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified ebpf/c/bpf_bpfeb.o
Binary file not shown.
3 changes: 2 additions & 1 deletion ebpf/c/bpf_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified ebpf/c/bpf_bpfel.o
Binary file not shown.
54 changes: 53 additions & 1 deletion ebpf/c/l7.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#define PROTOCOL_HTTP2 4
#define PROTOCOL_REDIS 5
#define PROTOCOL_KAFKA 6
#define PROTOCOL_MYSQL 7



#define MAX_PAYLOAD_SIZE 1024
Expand All @@ -29,10 +31,12 @@ struct l7_event {
__u8 is_tls;

__u32 seq; // tcp sequence number
__u32 tid;
__u32 tid; // thread id

__s16 kafka_api_version; // used only for kafka
__u32 prep_statement_id; // used only for mysql

// socket pair
__u32 saddr;
__u16 sport;
__u32 daddr;
Expand Down Expand Up @@ -281,6 +285,44 @@ int process_enter_of_syscalls_write_sendto(void* ctx, __u64 fd, __u8 is_tls, cha
args.fd = fd;
args.write_start_ns = timestamp;
bpf_map_update_elem(&active_writes, &id, &args, BPF_ANY);
}else if (is_mysql_query(buf,count,&req->request_type)){
if (req->request_type == MYSQL_COM_STMT_CLOSE) { // stmtID will be extracted on userspace
struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
if (!e) {
return 0;
}
e->protocol = PROTOCOL_MYSQL;
e->method = METHOD_MYSQL_STMT_CLOSE;
bpf_probe_read(e->payload, MAX_PAYLOAD_SIZE, buf);
if(count > MAX_PAYLOAD_SIZE){
// will not be able to copy all of it
e->payload_size = MAX_PAYLOAD_SIZE;
e->payload_read_complete = 0;
}else{
e->payload_size = count;
e->payload_read_complete = 1;
}

struct sock* sk = get_sock(fd);
if (sk != NULL) {
__u32 saddr = BPF_CORE_READ(sk,sk_rcv_saddr);
__u16 sport = BPF_CORE_READ(sk,sk_num);
__u32 daddr = BPF_CORE_READ(sk,sk_daddr);
__u16 dport = BPF_CORE_READ(sk,sk_dport);

e->saddr = bpf_htonl(saddr);
e->sport = sport;
e->daddr = bpf_htonl(daddr);
e->dport = bpf_htons(dport);
}
long r = bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
if (r < 0) {
unsigned char log_msg[] = "failed write to l7_events -- res|fd|psize";
log_to_userspace(ctx, WARN, func_name, log_msg, r, e->fd, e->payload_size);
}
return 0;
}
req->protocol = PROTOCOL_MYSQL;
}else if (is_http2_frame(buf, count)){
struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
if (!e) {
Expand Down Expand Up @@ -811,6 +853,16 @@ int process_exit_of_syscalls_read_recvfrom(void* ctx, __u64 id, __u32 pid, __s64
}
e->kafka_api_version = active_req->api_version;
}
}else if (e->protocol == PROTOCOL_MYSQL) {
e->status = is_mysql_response(read_info->buf, ret, active_req->request_type, &(e->prep_statement_id));
e->method = METHOD_UNKNOWN;
if (active_req->request_type == MYSQL_COM_STMT_PREPARE) {
e->method = METHOD_MYSQL_PREPARE_STMT;
}else if(active_req->request_type == MYSQL_COM_STMT_EXECUTE){
e->method = METHOD_MYSQL_EXEC_STMT;
}else if(active_req->request_type == MYSQL_COM_QUERY){
e->method = METHOD_MYSQL_TEXT_QUERY;
}
}
}else{
bpf_map_delete_elem(&active_reads, &id);
Expand Down
Loading

0 comments on commit b5b0912

Please sign in to comment.