diff --git a/Dockerfile.default b/Dockerfile.default index 3e53d97..eb407a5 100644 --- a/Dockerfile.default +++ b/Dockerfile.default @@ -1,4 +1,4 @@ -FROM golang:1.22.4-bullseye AS builder +FROM golang:1.22.5-bullseye AS builder WORKDIR /app COPY . ./ RUN apt update diff --git a/README.md b/README.md index 6249734..ef30759 100644 --- a/README.md +++ b/README.md @@ -167,6 +167,9 @@ Alaz supports the following protocols: - Postgres - RabbitMQ - gRPC +- Redis (RESP) +- Kafka +- MySQL Other protocols will be supported soon. diff --git a/aggregator/data.go b/aggregator/data.go index 9b92091..2d8bf4e 100644 --- a/aggregator/data.go +++ b/aggregator/data.go @@ -78,6 +78,10 @@ type Aggregator struct { pgStmtsMu sync.RWMutex pgStmts map[string]string // pid-fd-stmtname -> query + // postgres prepared stmt + mySqlStmtsMu sync.RWMutex + mySqlStmts map[string]string // pid-fd-stmtId -> query + liveProcessesMu sync.RWMutex liveProcesses map[uint32]struct{} // pid -> struct{} @@ -153,6 +157,7 @@ func NewAggregator(parentCtx context.Context, ct *cri.CRITool, k8sChan chan inte liveProcesses: make(map[uint32]struct{}), rateLimiters: make(map[uint32]*rate.Limiter), pgStmts: make(map[string]string), + mySqlStmts: make(map[string]string), } var err error @@ -381,6 +386,14 @@ func (a *Aggregator) processExit(pid uint32) { } } a.pgStmtsMu.Unlock() + + a.mySqlStmtsMu.Lock() + for key, _ := range a.pgStmts { + if strings.HasPrefix(key, fmt.Sprint(pid)) { + delete(a.mySqlStmts, key) + } + } + a.mySqlStmtsMu.Unlock() } func (a *Aggregator) signalTlsAttachment(pid uint32) { @@ -1046,12 +1059,7 @@ func (a *Aggregator) processKafkaEvent(ctx context.Context, d *l7_req.L7Event) { return } - if event.Type == "CONSUME" { - // TODO: reverse the from and to - // do we show arrows originating from outbound services ? - } - - log.Logger.Warn().Ctx(ctx).Any("kafkaEvent", event).Msg("persist kafka event") + log.Logger.Debug().Ctx(ctx).Any("kafkaEvent", event).Msg("persist kafka event") err = a.ds.PersistKafkaEvent(event) if err != nil { log.Logger.Error().Err(err).Msg("error persisting kafka event") @@ -1228,12 +1236,48 @@ func (a *Aggregator) processHttpEvent(ctx context.Context, d *l7_req.L7Event) { } +func (a *Aggregator) processMySQLEvent(ctx context.Context, d *l7_req.L7Event) { + query, err := a.parseMySQLCommand(d) + if err != nil { + log.Logger.Error().AnErr("err", err) + return + } + addrPair := extractAddressPair(d) + + reqDto := &datastore.Request{ + StartTime: int64(convertKernelTimeToUserspaceTime(d.WriteTimeNs) / 1e6), + Latency: d.Duration, + FromIP: addrPair.Saddr, + ToIP: addrPair.Daddr, + Protocol: d.Protocol, + Tls: d.Tls, + Completed: true, + StatusCode: d.Status, + FailReason: "", + Method: d.Method, + Path: query, + Tid: d.Tid, + Seq: d.Seq, + } + + err = a.setFromToV2(addrPair, d, reqDto, "") + if err != nil { + return + } + + log.Logger.Debug().Any("event", reqDto).Msg("persisting mysql-event") + err = a.ds.PersistRequest(reqDto) + if err != nil { + log.Logger.Error().Err(err).Msg("error persisting request") + } +} + func (a *Aggregator) processPostgresEvent(ctx context.Context, d *l7_req.L7Event) { // parse sql command from payload // path = sql command // method = sql message type - query, err := a.parseSqlCommand(d) + query, err := a.parsePostgresCommand(d) if err != nil { log.Logger.Error().AnErr("err", err) return @@ -1282,6 +1326,8 @@ func (a *Aggregator) processL7(ctx context.Context, d *l7_req.L7Event) { a.processAmqpEvent(ctx, d) case l7_req.L7_PROTOCOL_KAFKA: a.processKafkaEvent(ctx, d) + case l7_req.L7_PROTOCOL_MYSQL: + a.processMySQLEvent(ctx, d) } } @@ -1331,7 +1377,50 @@ func (a *Aggregator) findRelatedSocket(ctx context.Context, d *l7_req.L7Event) ( return skInfo, nil } -func (a *Aggregator) parseSqlCommand(d *l7_req.L7Event) (string, error) { +func (a *Aggregator) parseMySQLCommand(d *l7_req.L7Event) (string, error) { + r := d.Payload[:d.PayloadSize] + var sqlCommand string + // 3 bytes len, 1 byte package number, 1 byte command type + if len(r) < 5 { + return "", fmt.Errorf("too short for a sql query") + } + r = r[5:] + sqlCommand = string(r) + if d.Method == l7_req.MYSQL_TEXT_QUERY { + if !containsSQLKeywords(sqlCommand) { + return "", fmt.Errorf("no sql command found") + } + } else if d.Method == l7_req.MYSQL_PREPARE_STMT { + a.mySqlStmtsMu.Lock() + a.mySqlStmts[fmt.Sprintf("%d-%d-%d", d.Pid, d.Fd, d.MySqlPrepStmtId)] = string(r) + a.mySqlStmtsMu.Unlock() + } else if d.Method == l7_req.MYSQL_EXEC_STMT { + a.mySqlStmtsMu.RLock() + // extract statementId from payload + stmtId := binary.LittleEndian.Uint32(r) + query, ok := a.mySqlStmts[fmt.Sprintf("%d-%d-%d", d.Pid, d.Fd, stmtId)] + a.mySqlStmtsMu.RUnlock() + if !ok || query == "" { // we don't have the query for the prepared statement + // Execute (name of prepared statement) [(parameter)] + return fmt.Sprintf("EXECUTE %d *values*", stmtId), nil + } + sqlCommand = query + } else if d.Method == l7_req.MYSQL_STMT_CLOSE { // deallocated stmt + a.mySqlStmtsMu.Lock() + // extract statementId from payload + stmtId := binary.LittleEndian.Uint32(r) + stmtKey := fmt.Sprintf("%d-%d-%d", d.Pid, d.Fd, stmtId) + _, ok := a.mySqlStmts[stmtKey] + if ok { + delete(a.mySqlStmts, stmtKey) + } + a.mySqlStmtsMu.Unlock() + return fmt.Sprintf("CLOSE STMT %d ", stmtId), nil + } + return sqlCommand, nil +} + +func (a *Aggregator) parsePostgresCommand(d *l7_req.L7Event) (string, error) { r := d.Payload[:d.PayloadSize] var sqlCommand string if d.Method == l7_req.SIMPLE_QUERY { diff --git a/cri/cri.go b/cri/cri.go index 6f5548d..638baa1 100644 --- a/cri/cri.go +++ b/cri/cri.go @@ -37,10 +37,17 @@ type CRITool struct { } func NewCRITool(ctx context.Context) (*CRITool, error) { + var runtimeEndpointPaths = defaultRuntimeEndpoints var res internalapi.RuntimeService var err error + + // prepend CRI_RUNTIME_ENDPOINT if set as ENV-var to make sure ENV-var has priority + if os.Getenv("CRI_RUNTIME_ENDPOINT") != "" { + runtimeEndpointPaths = append([]string{os.Getenv("CRI_RUNTIME_ENDPOINT")}, runtimeEndpointPaths...) + } + t := 10 * time.Second - for _, endPoint := range defaultRuntimeEndpoints { + for _, endPoint := range runtimeEndpointPaths { res, err = remote.NewRemoteRuntimeService(endPoint, t, nil) if err != nil { continue diff --git a/datastore/backend.go b/datastore/backend.go index 74bd15d..11fdac4 100644 --- a/datastore/backend.go +++ b/datastore/backend.go @@ -547,9 +547,9 @@ func (b *BackendDS) sendToBackend(method string, payload interface{}, endpoint s return } - if endpoint == reqEndpoint { - log.Logger.Debug().Str("endpoint", endpoint).Any("payload", payload).Msg("sending batch to backend") - } + // if endpoint == reqEndpoint { + // log.Logger.Debug().Str("endpoint", endpoint).Any("payload", payload).Msg("sending batch to backend") + // } err = b.DoRequest(httpReq) if err != nil { log.Logger.Error().Msgf("backend persist error at ep %s : %v", endpoint, err) diff --git a/ebpf/c/bpf.c b/ebpf/c/bpf.c index 618b336..5973e7b 100644 --- a/ebpf/c/bpf.c +++ b/ebpf/c/bpf.c @@ -35,6 +35,7 @@ #include "postgres.c" #include "redis.c" #include "kafka.c" +#include "mysql.c" #include "openssl.c" #include "http2.c" #include "tcp_sock.c" diff --git a/ebpf/c/bpf_bpfeb.go b/ebpf/c/bpf_bpfeb.go index 6e78b88..312dc70 100644 --- a/ebpf/c/bpf_bpfeb.go +++ b/ebpf/c/bpf_bpfeb.go @@ -52,12 +52,13 @@ type bpfL7Event struct { Tid uint32 KafkaApiVersion int16 _ [2]byte + PrepStatementId uint32 Saddr uint32 Sport uint16 _ [2]byte Daddr uint32 Dport uint16 - _ [2]byte + _ [6]byte } type bpfL7Request struct { diff --git a/ebpf/c/bpf_bpfeb.o b/ebpf/c/bpf_bpfeb.o index f3733f9..7689dc9 100644 Binary files a/ebpf/c/bpf_bpfeb.o and b/ebpf/c/bpf_bpfeb.o differ diff --git a/ebpf/c/bpf_bpfel.go b/ebpf/c/bpf_bpfel.go index 7c9150f..33f49fd 100644 --- a/ebpf/c/bpf_bpfel.go +++ b/ebpf/c/bpf_bpfel.go @@ -52,12 +52,13 @@ type bpfL7Event struct { Tid uint32 KafkaApiVersion int16 _ [2]byte + PrepStatementId uint32 Saddr uint32 Sport uint16 _ [2]byte Daddr uint32 Dport uint16 - _ [2]byte + _ [6]byte } type bpfL7Request struct { diff --git a/ebpf/c/bpf_bpfel.o b/ebpf/c/bpf_bpfel.o index 31a63c9..a10e643 100644 Binary files a/ebpf/c/bpf_bpfel.o and b/ebpf/c/bpf_bpfel.o differ diff --git a/ebpf/c/l7.c b/ebpf/c/l7.c index 04b34af..3fdbabe 100644 --- a/ebpf/c/l7.c +++ b/ebpf/c/l7.c @@ -6,6 +6,8 @@ #define PROTOCOL_HTTP2 4 #define PROTOCOL_REDIS 5 #define PROTOCOL_KAFKA 6 +#define PROTOCOL_MYSQL 7 + #define MAX_PAYLOAD_SIZE 1024 @@ -29,10 +31,12 @@ struct l7_event { __u8 is_tls; __u32 seq; // tcp sequence number - __u32 tid; + __u32 tid; // thread id __s16 kafka_api_version; // used only for kafka + __u32 prep_statement_id; // used only for mysql + // socket pair __u32 saddr; __u16 sport; __u32 daddr; @@ -281,6 +285,44 @@ int process_enter_of_syscalls_write_sendto(void* ctx, __u64 fd, __u8 is_tls, cha args.fd = fd; args.write_start_ns = timestamp; bpf_map_update_elem(&active_writes, &id, &args, BPF_ANY); + }else if (is_mysql_query(buf,count,&req->request_type)){ + if (req->request_type == MYSQL_COM_STMT_CLOSE) { // stmtID will be extracted on userspace + struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero); + if (!e) { + return 0; + } + e->protocol = PROTOCOL_MYSQL; + e->method = METHOD_MYSQL_STMT_CLOSE; + bpf_probe_read(e->payload, MAX_PAYLOAD_SIZE, buf); + if(count > MAX_PAYLOAD_SIZE){ + // will not be able to copy all of it + e->payload_size = MAX_PAYLOAD_SIZE; + e->payload_read_complete = 0; + }else{ + e->payload_size = count; + e->payload_read_complete = 1; + } + + struct sock* sk = get_sock(fd); + if (sk != NULL) { + __u32 saddr = BPF_CORE_READ(sk,sk_rcv_saddr); + __u16 sport = BPF_CORE_READ(sk,sk_num); + __u32 daddr = BPF_CORE_READ(sk,sk_daddr); + __u16 dport = BPF_CORE_READ(sk,sk_dport); + + e->saddr = bpf_htonl(saddr); + e->sport = sport; + e->daddr = bpf_htonl(daddr); + e->dport = bpf_htons(dport); + } + long r = bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e)); + if (r < 0) { + unsigned char log_msg[] = "failed write to l7_events -- res|fd|psize"; + log_to_userspace(ctx, WARN, func_name, log_msg, r, e->fd, e->payload_size); + } + return 0; + } + req->protocol = PROTOCOL_MYSQL; }else if (is_http2_frame(buf, count)){ struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero); if (!e) { @@ -811,6 +853,16 @@ int process_exit_of_syscalls_read_recvfrom(void* ctx, __u64 id, __u32 pid, __s64 } e->kafka_api_version = active_req->api_version; } + }else if (e->protocol == PROTOCOL_MYSQL) { + e->status = is_mysql_response(read_info->buf, ret, active_req->request_type, &(e->prep_statement_id)); + e->method = METHOD_UNKNOWN; + if (active_req->request_type == MYSQL_COM_STMT_PREPARE) { + e->method = METHOD_MYSQL_PREPARE_STMT; + }else if(active_req->request_type == MYSQL_COM_STMT_EXECUTE){ + e->method = METHOD_MYSQL_EXEC_STMT; + }else if(active_req->request_type == MYSQL_COM_QUERY){ + e->method = METHOD_MYSQL_TEXT_QUERY; + } } }else{ bpf_map_delete_elem(&active_reads, &id); diff --git a/ebpf/c/mysql.c b/ebpf/c/mysql.c new file mode 100644 index 0000000..ed6b075 --- /dev/null +++ b/ebpf/c/mysql.c @@ -0,0 +1,99 @@ +//go:build ignore +// https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_command_phase.html + +// 01 00 00 00 01 +// ^^- command-byte +// ^^---- sequence-id == 0 + +// https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_query.html +#define MYSQL_COM_QUERY 0x03 // Text Protocol + +// https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_command_phase_ps.html +#define MYSQL_COM_STMT_PREPARE 0x16 // Creates a prepared statement for the passed query string. +// The server returns a COM_STMT_PREPARE Response which contains a statement-id which is ised to identify the prepared statement. + +// https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_stmt_execute.html +#define MYSQL_COM_STMT_EXECUTE 0x17 // COM_STMT_EXECUTE asks the server to execute a prepared statement as identified by statement_id. + + +// https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_stmt_close.html +#define MYSQL_COM_STMT_CLOSE 0x19 // COM_STMT_CLOSE deallocates a prepared statement. +// No response packet is sent back to the client. + + +#define MYSQL_RESPONSE_OK 0x00 +#define MYSQL_RESPONSE_EOF 0xfe +#define MYSQL_RESPONSE_ERROR 0xff + +#define METHOD_UNKNOWN 0 +#define METHOD_MYSQL_TEXT_QUERY 1 +#define METHOD_MYSQL_PREPARE_STMT 2 +#define METHOD_MYSQL_EXEC_STMT 3 +#define METHOD_MYSQL_STMT_CLOSE 4 + + +#define MYSQL_STATUS_OK 1 +#define MYSQL_STATUS_FAILED 2 + +static __always_inline +int is_mysql_query(char *buf, __u64 buf_size, __u8 *request_type) { + if (buf_size < 5) { + return 0; + } + __u8 b[5]; // first 5 bytes, first 3 represents length, 4th is packet number, 5th is command type + if (bpf_probe_read(&b, sizeof(b), (void *)((char *)buf)) < 0) { + return 0; + } + int len = (int)b[0] | (int)b[1] << 8 | (int)b[2] << 16; + // command byte is inside the packet + if (len+4 != buf_size || b[3] != 0) { // packet number must be 0 + return 0; + } + + if (b[4] == MYSQL_COM_QUERY || b[4] == MYSQL_COM_STMT_EXECUTE) { + *request_type = b[4]; + return 1; + } + + if (b[4] == MYSQL_COM_STMT_CLOSE) { + *request_type = MYSQL_COM_STMT_CLOSE; + return 1; + } + + if (b[4] == MYSQL_COM_STMT_PREPARE) { + *request_type = MYSQL_COM_STMT_PREPARE; + return 1; + } + return 0; +} + +// __u32 *statement_id +static __always_inline +int is_mysql_response(char *buf, __u64 buf_size, __u8 request_type, __u32 *statement_id) { + __u8 b[5]; // first 5 bytes, first 3 represents length, 4th is packet number, 5th is response code + if (bpf_probe_read(&b, sizeof(b), (void *)((char *)buf)) < 0) { + return 0; + } + if (b[3] <= 0) { // sequence must be > 0 + return 0; + } + int length = (int)b[0] | (int)b[1] << 8 | (int)b[2] << 16; + + if (length == 1 || b[4] == MYSQL_RESPONSE_EOF) { + return MYSQL_STATUS_OK; + } + if (b[4] == MYSQL_RESPONSE_OK) { + if (request_type == MYSQL_COM_STMT_PREPARE) { + // 6-9th bytes returns statement id + if (bpf_probe_read(statement_id, sizeof(*statement_id), (void *)((char *)buf+5)) < 0) { + return 0; + } + } + return MYSQL_STATUS_OK; + } + if (b[4] == MYSQL_RESPONSE_ERROR) { + // *status = STATUS_FAILED; + return MYSQL_STATUS_FAILED; + } + return 0; +} \ No newline at end of file diff --git a/ebpf/c/tcp_sock.c b/ebpf/c/tcp_sock.c index 05ee8f0..26eb349 100644 --- a/ebpf/c/tcp_sock.c +++ b/ebpf/c/tcp_sock.c @@ -95,7 +95,6 @@ struct sock { #define sk_daddr __sk_common.skc_daddr #define sk_num __sk_common.skc_num #define sk_dport __sk_common.skc_dport - }; static __always_inline diff --git a/ebpf/l7_req/l7.go b/ebpf/l7_req/l7.go index 28157f5..7e0ba90 100644 --- a/ebpf/l7_req/l7.go +++ b/ebpf/l7_req/l7.go @@ -24,6 +24,7 @@ const ( BPF_L7_PROTOCOL_HTTP2 BPF_L7_PROTOCOL_REDIS BPF_L7_PROTOCOL_KAFKA + BPF_L7_PROTOCOL_MYSQL ) // for user space @@ -34,6 +35,7 @@ const ( L7_PROTOCOL_POSTGRES = "POSTGRES" L7_PROTOCOL_REDIS = "REDIS" L7_PROTOCOL_KAFKA = "KAFKA" + L7_PROTOCOL_MYSQL = "MYSQL" L7_PROTOCOL_UNKNOWN = "UNKNOWN" ) @@ -55,6 +57,8 @@ func (e L7ProtocolConversion) String() string { return L7_PROTOCOL_REDIS case BPF_L7_PROTOCOL_KAFKA: return L7_PROTOCOL_KAFKA + case BPF_L7_PROTOCOL_MYSQL: + return L7_PROTOCOL_MYSQL case BPF_L7_PROTOCOL_UNKNOWN: return L7_PROTOCOL_UNKNOWN default: @@ -127,6 +131,15 @@ const ( METHOD_KAFKA_FETCH_RESPONSE ) +// match with values in l7.c, order is important +const ( + BPF_MYSQL_METHOD_UNKNOWN = iota + METHOD_MYSQL_TEXT_QUERY + METHOD_MYSQL_PREPARE_STMT + METHOD_MYSQL_EXEC_STMT + METHOD_MYSQL_STMT_CLOSE +) + // for http, user space const ( GET = "GET" @@ -172,6 +185,14 @@ const ( KAFKA_FETCH_RESPONSE = "FETCH_RESPONSE" ) +// for mysql, user space +const ( + MYSQL_TEXT_QUERY = "TEXT_QUERY" + MYSQL_PREPARE_STMT = "PREPARE_STMT" + MYSQL_EXEC_STMT = "EXEC_STMT" + MYSQL_STMT_CLOSE = "STMT_CLOSE" +) + // Custom type for the enumeration type HTTPMethodConversion uint32 @@ -280,6 +301,25 @@ func (e KafkaMethodConversion) String() string { } } +// Custom type for the enumeration +type MySQLMethodConversion uint32 + +// String representation of the enumeration values +func (e MySQLMethodConversion) String() string { + switch e { + case METHOD_MYSQL_TEXT_QUERY: + return MYSQL_TEXT_QUERY + case METHOD_MYSQL_PREPARE_STMT: + return MYSQL_PREPARE_STMT + case METHOD_MYSQL_EXEC_STMT: + return MYSQL_EXEC_STMT + case METHOD_MYSQL_STMT_CLOSE: + return MYSQL_STMT_CLOSE + default: + return "Unknown" + } +} + var FirstKernelTime uint64 = 0 // nanoseconds since boot var FirstUserspaceTime uint64 = 0 @@ -317,12 +357,13 @@ type bpfL7Event struct { Tid uint32 KafkaApiVersion int16 _ [2]byte + PrepStatementId uint32 // for mysql Saddr uint32 Sport uint16 _ [2]byte Daddr uint32 Dport uint16 - _ [2]byte + _ [6]byte } type bpfTraceEvent struct { @@ -365,6 +406,7 @@ type L7Event struct { Tid uint32 Seq uint32 // tcp seq num KafkaApiVersion int16 + MySqlPrepStmtId uint32 Saddr uint32 Sport uint16 Daddr uint32 @@ -676,6 +718,8 @@ func (l7p *L7Prog) Consume(ctx context.Context, ch chan interface{}) { method = RedisMethodConversion(l7Event.Method).String() case L7_PROTOCOL_KAFKA: method = KafkaMethodConversion(l7Event.Method).String() + case L7_PROTOCOL_MYSQL: + method = MySQLMethodConversion(l7Event.Method).String() // no method set for kafka on kernel side default: method = "Unknown" @@ -701,6 +745,7 @@ func (l7p *L7Prog) Consume(ctx context.Context, ch chan interface{}) { Tid: l7Event.Tid, Seq: l7Event.Seq, KafkaApiVersion: l7Event.KafkaApiVersion, + MySqlPrepStmtId: l7Event.PrepStatementId, Saddr: l7Event.Saddr, Sport: l7Event.Sport, Daddr: l7Event.Daddr,