From cd5f15e6a286b3a46a5dcc9f57dae9d2f6dc88c0 Mon Sep 17 00:00:00 2001 From: kenanfarukcakir Date: Sun, 29 Oct 2023 17:56:23 +0000 Subject: [PATCH] get timestamps first at ebpf progs --- aggregator/data.go | 9 ++-- ebpf/l7_req/l7.c | 99 ++++++++++++++++++++++++++++-------- ebpf/tcp_state/tcp_sockets.c | 5 +- 3 files changed, 84 insertions(+), 29 deletions(-) diff --git a/aggregator/data.go b/aggregator/data.go index 6e8e349..8ca3d8c 100644 --- a/aggregator/data.go +++ b/aggregator/data.go @@ -387,7 +387,7 @@ func (a *Aggregator) processL7(ctx context.Context, d l7_req.L7Event) { sockMap, ok = a.clusterInfo.PidToSocketMap[d.Pid] a.clusterInfo.mu.RUnlock() // unlock for reading if !ok { - log.Logger.Info().Uint32("pid", d.Pid).Msg("error finding socket map, initializing...") + log.Logger.Info().Uint32("pid", d.Pid).Msg("initializing socket map...") // initialize socket map sockMap = &SocketMap{ M: make(map[uint64]*SocketLine), @@ -443,15 +443,14 @@ func (a *Aggregator) processL7(ctx context.Context, d l7_req.L7Event) { break } rc-- + if rc == 0 { + break + } time.Sleep(rt) rt *= 2 // exponential backoff log.Logger.Debug().Uint32("pid", d.Pid).Uint64("fd", d.Fd).Uint64("writeTimeNs", d.WriteTimeNs). Msg("retrying getting socket info from skLine") - if rc == 0 { - break - } - select { case <-ctx.Done(): log.Logger.Debug().Msg("processL7 exiting, stop retrying...") diff --git a/ebpf/l7_req/l7.c b/ebpf/l7_req/l7.c index 6806455..217dfa7 100644 --- a/ebpf/l7_req/l7.c +++ b/ebpf/l7_req/l7.c @@ -76,6 +76,14 @@ struct go_read_key { // __u64 fd; can't have fd at exit of read, because it is not available }; +struct read_enter_args { + __u64 id; + __u64 fd; + char* buf; + __u64 size; + __u64 time; +}; + // Instead of allocating on bpf stack, we allocate on a per-CPU array map struct { __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY); @@ -172,6 +180,7 @@ struct { // Processing enter of write and sendto syscalls static __always_inline int process_enter_of_syscalls_write_sendto(void* ctx, __u64 fd, __u8 is_tls, char* buf, __u64 count){ + __u64 timestamp = bpf_ktime_get_ns(); unsigned char func_name[] = "process_enter_of_syscalls_write_sendto"; __u64 id = bpf_get_current_pid_tgid(); @@ -187,7 +196,7 @@ int process_enter_of_syscalls_write_sendto(void* ctx, __u64 fd, __u8 is_tls, cha req->protocol = PROTOCOL_UNKNOWN; req->method = METHOD_UNKNOWN; req->request_type = 0; - req->write_time_ns = bpf_ktime_get_ns(); + req->write_time_ns = timestamp; // TODO: If socket is not tcp (SOCK_STREAM), we are not interested in it @@ -217,7 +226,7 @@ int process_enter_of_syscalls_write_sendto(void* ctx, __u64 fd, __u8 is_tls, cha req->method = METHOD_PUBLISH; struct write_args args = {}; args.fd = fd; - args.write_start_ns = bpf_ktime_get_ns(); + args.write_start_ns = timestamp; bpf_map_update_elem(&active_writes, &id, &args, BPF_ANY); }else if (parse_client_postgres_data(buf, count, &req->request_type)){ // TODO: should wait for CloseComplete message in case of statement close @@ -226,7 +235,7 @@ int process_enter_of_syscalls_write_sendto(void* ctx, __u64 fd, __u8 is_tls, cha req->method = METHOD_STATEMENT_CLOSE_OR_CONN_TERMINATE; struct write_args args = {}; args.fd = fd; - args.write_start_ns = bpf_ktime_get_ns(); + args.write_start_ns = timestamp; bpf_map_update_elem(&active_writes, &id, &args, BPF_ANY); } req->protocol = PROTOCOL_POSTGRES; @@ -261,9 +270,10 @@ int process_enter_of_syscalls_write_sendto(void* ctx, __u64 fd, __u8 is_tls, cha return 0; } + // Processing enter of read, recv, recvfrom syscalls static __always_inline -int process_enter_of_syscalls_read_recvfrom(void *ctx, __u64 id, __u64 fd, char* buf, __u64 size) { +int process_enter_of_syscalls_read_recvfrom(void *ctx, struct read_enter_args * params) { unsigned char func_name[] = "process_enter_of_syscalls_read_recvfrom"; // __u64 id = bpf_get_current_pid_tgid(); @@ -282,12 +292,12 @@ int process_enter_of_syscalls_read_recvfrom(void *ctx, __u64 id, __u64 fd, char* struct read_args args = {}; - args.fd = fd; - args.buf = buf; - args.size = size; - args.read_start_ns = bpf_ktime_get_ns(); + args.fd = params->fd; + args.buf = params->buf; + args.size = params->size; + args.read_start_ns = params->time; - long res = bpf_map_update_elem(&active_reads, &id, &args, BPF_ANY); + long res = bpf_map_update_elem(&active_reads, &(params->id), &args, BPF_ANY); if(res < 0) { unsigned char log_msg[] = "write to active_reads failed -- err||"; @@ -300,6 +310,7 @@ int process_enter_of_syscalls_read_recvfrom(void *ctx, __u64 id, __u64 fd, char* static __always_inline int process_exit_of_syscalls_write_sendto(void* ctx, __s64 ret){ // unsigned char func_name[] = "process_exit_of_syscalls_write_sendto"; + __u64 timestamp = bpf_ktime_get_ns(); __u64 id = bpf_get_current_pid_tgid(); // we only used this func for amqp, others will only be in active_l7_requests @@ -343,7 +354,7 @@ int process_exit_of_syscalls_write_sendto(void* ctx, __s64 ret){ } e->failed = 0; // success - e->duration = bpf_ktime_get_ns()- active_write->write_start_ns; // total write time + e->duration = timestamp - active_write->write_start_ns; // total write time // request payload e->payload_size = active_req->payload_size; @@ -367,6 +378,7 @@ int process_exit_of_syscalls_write_sendto(void* ctx, __s64 ret){ static __always_inline int process_exit_of_syscalls_read_recvfrom(void* ctx, __u64 id, __u32 pid, __s64 ret, __u8 is_tls) { + __u64 timestamp = bpf_ktime_get_ns(); unsigned char func_name[] = "process_exit_of_syscalls_read_recvfrom"; if (ret < 0) { // read failed // -ERRNO @@ -425,7 +437,7 @@ int process_exit_of_syscalls_read_recvfrom(void* ctx, __u64 id, __u32 pid, __s64 if (is_rabbitmq_consume(read_info->buf, ret)) { e->protocol = PROTOCOL_AMQP; e->method = METHOD_DELIVER; - e->duration = bpf_ktime_get_ns()- read_info->read_start_ns; + e->duration = timestamp - read_info->read_start_ns; e->write_time_ns = read_info->read_start_ns; // TODO: it is not write time, but start of read time e->payload_size = 0; e->payload_read_complete = 0; @@ -456,7 +468,7 @@ int process_exit_of_syscalls_read_recvfrom(void* ctx, __u64 id, __u32 pid, __s64 e->method = active_req->method; e->protocol = active_req->protocol; - e->duration = bpf_ktime_get_ns() - active_req->write_time_ns; + e->duration = timestamp - active_req->write_time_ns; e->write_time_ns = active_req->write_time_ns; @@ -576,6 +588,7 @@ void ssl_uprobe_write_v_1_0_2(struct pt_regs *ctx, void* ssl, void* buffer, int static __always_inline void ssl_uprobe_read_enter_v1_0_2(struct pt_regs *ctx, __u64 id, __u32 pid, void* ssl, void* buffer, int num, size_t *count_ptr) { + __u64 time = bpf_ktime_get_ns(); unsigned char func_name[] = "ssl_uprobe_read_enter_v1_0_2"; struct ssl_st_v1_0_2 ssl_st; long r = bpf_probe_read_user(&ssl_st, sizeof(ssl_st), ssl); @@ -597,7 +610,15 @@ void ssl_uprobe_read_enter_v1_0_2(struct pt_regs *ctx, __u64 id, __u32 pid, voi char* buf_ptr = (char*) buffer; __u64 buf_size = num; - process_enter_of_syscalls_read_recvfrom(ctx, id, fd, buf_ptr, buf_size); + + struct read_enter_args params = { + .id = id, + .fd = fd, + .buf = buf_ptr, + .size = buf_size, + .time = time + }; + process_enter_of_syscalls_read_recvfrom(ctx, ¶ms); } static __always_inline @@ -628,6 +649,7 @@ void ssl_uprobe_write_v_1_1_1(struct pt_regs *ctx, void* ssl, void* buffer, int static __always_inline void ssl_uprobe_read_enter_v1_1_1(struct pt_regs *ctx, __u64 id, __u32 pid, void* ssl, void* buffer, int num, size_t *count_ptr) { + __u64 time = bpf_ktime_get_ns(); unsigned char func_name[] = "ssl_uprobe_read_enter_v1_1_1"; struct ssl_st_v1_1_1 ssl_st; long r = bpf_probe_read_user(&ssl_st, sizeof(ssl_st), ssl); @@ -649,7 +671,14 @@ void ssl_uprobe_read_enter_v1_1_1(struct pt_regs *ctx, __u64 id, __u32 pid, voi char* buf_ptr = (char*) buffer; __u64 buf_size = num; - process_enter_of_syscalls_read_recvfrom(ctx, id, fd, buf_ptr, buf_size); + struct read_enter_args params = { + .id = id, + .fd = fd, + .buf = buf_ptr, + .size = buf_size, + .time = time + }; + process_enter_of_syscalls_read_recvfrom(ctx, ¶ms); } @@ -681,6 +710,7 @@ void ssl_uprobe_write_v_3(struct pt_regs *ctx, void* ssl, void* buffer, int num, static __always_inline void ssl_uprobe_read_enter_v3(struct pt_regs *ctx, __u64 id, __u32 pid, void* ssl, void* buffer, int num, size_t *count_ptr) { + __u64 time = bpf_ktime_get_ns(); unsigned char func_name[] = "ssl_uprobe_read_enter_v3"; struct ssl_st_v3_0_0 ssl_st; long r = bpf_probe_read_user(&ssl_st, sizeof(ssl_st), ssl); @@ -702,7 +732,15 @@ void ssl_uprobe_read_enter_v3(struct pt_regs *ctx, __u64 id, __u32 pid, void* s char* buf_ptr = (char*) buffer; __u64 buf_size = num; - process_enter_of_syscalls_read_recvfrom(ctx, id, fd, buf_ptr, buf_size); + struct read_enter_args params = { + .id = id, + .fd = fd, + .buf = buf_ptr, + .size = buf_size, + .time = time + + }; + process_enter_of_syscalls_read_recvfrom(ctx, ¶ms); } SEC("tracepoint/syscalls/sys_enter_write") @@ -727,14 +765,30 @@ int sys_exit_sendto(struct trace_event_raw_sys_exit_sendto* ctx) { SEC("tracepoint/syscalls/sys_enter_read") int sys_enter_read(struct trace_event_raw_sys_enter_read* ctx) { + __u64 time = bpf_ktime_get_ns(); __u64 id = bpf_get_current_pid_tgid(); - return process_enter_of_syscalls_read_recvfrom(ctx, id, ctx->fd, ctx->buf, ctx->count); + struct read_enter_args params = { + .id = id, + .fd = ctx->fd, + .buf = ctx->buf, + .size = ctx->count, + .time = time + }; + return process_enter_of_syscalls_read_recvfrom(ctx, ¶ms); } SEC("tracepoint/syscalls/sys_enter_recvfrom") int sys_enter_recvfrom(struct trace_event_raw_sys_enter_recvfrom* ctx) { + __u64 time = bpf_ktime_get_ns(); __u64 id = bpf_get_current_pid_tgid(); - return process_enter_of_syscalls_read_recvfrom(ctx, id, ctx->fd, ctx->ubuf, ctx->size); + struct read_enter_args params = { + .id = id, + .fd = ctx->fd, + .buf = ctx->ubuf, + .size = ctx->size, + .time = time + }; + return process_enter_of_syscalls_read_recvfrom(ctx, ¶ms); } SEC("tracepoint/syscalls/sys_exit_read") @@ -822,6 +876,7 @@ struct go_interface { static __always_inline int process_enter_of_go_conn_write(void *ctx, __u32 pid, __u32 fd, char *buf_ptr, __u64 count) { + __u64 timestamp = bpf_ktime_get_ns(); unsigned char func_name[] = "process_enter_of_go_conn_write"; // parse and write to go_active_l7_req map struct go_req_key k = {}; @@ -837,7 +892,7 @@ int process_enter_of_go_conn_write(void *ctx, __u32 pid, __u32 fd, char *buf_ptr req->protocol = PROTOCOL_UNKNOWN; req->payload_size = 0; req->payload_read_complete = 0; - req->write_time_ns = bpf_ktime_get_ns(); + req->write_time_ns = timestamp; req->request_type = 0; if(buf_ptr){ @@ -911,8 +966,8 @@ int BPF_UPROBE(go_tls_conn_write_enter) { // func (c *Conn) Read(b []byte) (int, error) SEC("uprobe/go_tls_conn_read_enter") int BPF_UPROBE(go_tls_conn_read_enter) { + __u64 timestamp = bpf_ktime_get_ns(); unsigned char func_name[] = "go_tls_conn_read_enter"; - __u32 fd; struct go_interface conn; @@ -937,7 +992,7 @@ int BPF_UPROBE(go_tls_conn_read_enter) { args.fd = fd; args.buf = buf_ptr; args.size = buf_size; - args.read_start_ns = bpf_ktime_get_ns(); + args.read_start_ns = timestamp; struct go_read_key k = {}; k.goid = GOROUTINE(ctx); @@ -955,8 +1010,8 @@ int BPF_UPROBE(go_tls_conn_read_enter) { // attached to all RET instructions since uretprobe crashes go applications SEC("uprobe/go_tls_conn_read_exit") int BPF_UPROBE(go_tls_conn_read_exit) { + __u64 timestamp = bpf_ktime_get_ns(); unsigned char func_name[] = "go_tls_conn_read_exit"; - // can't access to register we've access on read_enter here, // registers are changed. long int ret = GO_PARAM1(ctx); @@ -992,7 +1047,7 @@ int BPF_UPROBE(go_tls_conn_read_exit) { return 0; } - e->duration = bpf_ktime_get_ns() - req->write_time_ns; + e->duration = timestamp - req->write_time_ns; e->write_time_ns = req->write_time_ns; e->failed = 0; // success diff --git a/ebpf/tcp_state/tcp_sockets.c b/ebpf/tcp_state/tcp_sockets.c index ab51ee8..c8ac92b 100644 --- a/ebpf/tcp_state/tcp_sockets.c +++ b/ebpf/tcp_state/tcp_sockets.c @@ -79,6 +79,7 @@ struct SEC("tracepoint/sock/inet_sock_set_state") int inet_sock_set_state(void *ctx) { + __u64 timestamp = bpf_ktime_get_ns(); struct trace_event_raw_inet_sock_set_state args = {}; if (bpf_core_read(&args, sizeof(args), ctx) < 0) { @@ -86,7 +87,7 @@ int inet_sock_set_state(void *ctx) } // if not tcp protocol, ignore -if (BPF_CORE_READ(&args, protocol) != IPPROTO_TCP) + if (BPF_CORE_READ(&args, protocol) != IPPROTO_TCP) { return 0; } @@ -127,7 +128,7 @@ if (BPF_CORE_READ(&args, protocol) != IPPROTO_TCP) __u64 fd = 0; __u32 type = 0; - __u64 timestamp = bpf_ktime_get_ns(); + void *map = &tcp_connect_events; if (oldstate == BPF_TCP_SYN_SENT) {