Skip to content

Commit

Permalink
Merge pull request #42 from ddosify/refactor/event-timestamps
Browse files Browse the repository at this point in the history
get timestamps first at ebpf progs
  • Loading branch information
fatihbaltaci authored Oct 29, 2023
2 parents a99926f + cd5f15e commit 3c89e4b
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 29 deletions.
9 changes: 4 additions & 5 deletions aggregator/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ func (a *Aggregator) processL7(ctx context.Context, d l7_req.L7Event) {
sockMap, ok = a.clusterInfo.PidToSocketMap[d.Pid]
a.clusterInfo.mu.RUnlock() // unlock for reading
if !ok {
log.Logger.Info().Uint32("pid", d.Pid).Msg("error finding socket map, initializing...")
log.Logger.Info().Uint32("pid", d.Pid).Msg("initializing socket map...")
// initialize socket map
sockMap = &SocketMap{
M: make(map[uint64]*SocketLine),
Expand Down Expand Up @@ -443,15 +443,14 @@ func (a *Aggregator) processL7(ctx context.Context, d l7_req.L7Event) {
break
}
rc--
if rc == 0 {
break
}
time.Sleep(rt)
rt *= 2 // exponential backoff
log.Logger.Debug().Uint32("pid", d.Pid).Uint64("fd", d.Fd).Uint64("writeTimeNs", d.WriteTimeNs).
Msg("retrying getting socket info from skLine")

if rc == 0 {
break
}

select {
case <-ctx.Done():
log.Logger.Debug().Msg("processL7 exiting, stop retrying...")
Expand Down
99 changes: 77 additions & 22 deletions ebpf/l7_req/l7.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ struct go_read_key {
// __u64 fd; can't have fd at exit of read, because it is not available
};

struct read_enter_args {
__u64 id;
__u64 fd;
char* buf;
__u64 size;
__u64 time;
};

// Instead of allocating on bpf stack, we allocate on a per-CPU array map
struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
Expand Down Expand Up @@ -172,6 +180,7 @@ struct {
// Processing enter of write and sendto syscalls
static __always_inline
int process_enter_of_syscalls_write_sendto(void* ctx, __u64 fd, __u8 is_tls, char* buf, __u64 count){
__u64 timestamp = bpf_ktime_get_ns();
unsigned char func_name[] = "process_enter_of_syscalls_write_sendto";
__u64 id = bpf_get_current_pid_tgid();

Expand All @@ -187,7 +196,7 @@ int process_enter_of_syscalls_write_sendto(void* ctx, __u64 fd, __u8 is_tls, cha
req->protocol = PROTOCOL_UNKNOWN;
req->method = METHOD_UNKNOWN;
req->request_type = 0;
req->write_time_ns = bpf_ktime_get_ns();
req->write_time_ns = timestamp;

// TODO: If socket is not tcp (SOCK_STREAM), we are not interested in it

Expand Down Expand Up @@ -217,7 +226,7 @@ int process_enter_of_syscalls_write_sendto(void* ctx, __u64 fd, __u8 is_tls, cha
req->method = METHOD_PUBLISH;
struct write_args args = {};
args.fd = fd;
args.write_start_ns = bpf_ktime_get_ns();
args.write_start_ns = timestamp;
bpf_map_update_elem(&active_writes, &id, &args, BPF_ANY);
}else if (parse_client_postgres_data(buf, count, &req->request_type)){
// TODO: should wait for CloseComplete message in case of statement close
Expand All @@ -226,7 +235,7 @@ int process_enter_of_syscalls_write_sendto(void* ctx, __u64 fd, __u8 is_tls, cha
req->method = METHOD_STATEMENT_CLOSE_OR_CONN_TERMINATE;
struct write_args args = {};
args.fd = fd;
args.write_start_ns = bpf_ktime_get_ns();
args.write_start_ns = timestamp;
bpf_map_update_elem(&active_writes, &id, &args, BPF_ANY);
}
req->protocol = PROTOCOL_POSTGRES;
Expand Down Expand Up @@ -261,9 +270,10 @@ int process_enter_of_syscalls_write_sendto(void* ctx, __u64 fd, __u8 is_tls, cha
return 0;
}


// Processing enter of read, recv, recvfrom syscalls
static __always_inline
int process_enter_of_syscalls_read_recvfrom(void *ctx, __u64 id, __u64 fd, char* buf, __u64 size) {
int process_enter_of_syscalls_read_recvfrom(void *ctx, struct read_enter_args * params) {
unsigned char func_name[] = "process_enter_of_syscalls_read_recvfrom";
// __u64 id = bpf_get_current_pid_tgid();

Expand All @@ -282,12 +292,12 @@ int process_enter_of_syscalls_read_recvfrom(void *ctx, __u64 id, __u64 fd, char*


struct read_args args = {};
args.fd = fd;
args.buf = buf;
args.size = size;
args.read_start_ns = bpf_ktime_get_ns();
args.fd = params->fd;
args.buf = params->buf;
args.size = params->size;
args.read_start_ns = params->time;

long res = bpf_map_update_elem(&active_reads, &id, &args, BPF_ANY);
long res = bpf_map_update_elem(&active_reads, &(params->id), &args, BPF_ANY);
if(res < 0)
{
unsigned char log_msg[] = "write to active_reads failed -- err||";
Expand All @@ -300,6 +310,7 @@ int process_enter_of_syscalls_read_recvfrom(void *ctx, __u64 id, __u64 fd, char*
static __always_inline
int process_exit_of_syscalls_write_sendto(void* ctx, __s64 ret){
// unsigned char func_name[] = "process_exit_of_syscalls_write_sendto";
__u64 timestamp = bpf_ktime_get_ns();
__u64 id = bpf_get_current_pid_tgid();

// we only used this func for amqp, others will only be in active_l7_requests
Expand Down Expand Up @@ -343,7 +354,7 @@ int process_exit_of_syscalls_write_sendto(void* ctx, __s64 ret){
}

e->failed = 0; // success
e->duration = bpf_ktime_get_ns()- active_write->write_start_ns; // total write time
e->duration = timestamp - active_write->write_start_ns; // total write time

// request payload
e->payload_size = active_req->payload_size;
Expand All @@ -367,6 +378,7 @@ int process_exit_of_syscalls_write_sendto(void* ctx, __s64 ret){

static __always_inline
int process_exit_of_syscalls_read_recvfrom(void* ctx, __u64 id, __u32 pid, __s64 ret, __u8 is_tls) {
__u64 timestamp = bpf_ktime_get_ns();
unsigned char func_name[] = "process_exit_of_syscalls_read_recvfrom";
if (ret < 0) { // read failed
// -ERRNO
Expand Down Expand Up @@ -425,7 +437,7 @@ int process_exit_of_syscalls_read_recvfrom(void* ctx, __u64 id, __u32 pid, __s64
if (is_rabbitmq_consume(read_info->buf, ret)) {
e->protocol = PROTOCOL_AMQP;
e->method = METHOD_DELIVER;
e->duration = bpf_ktime_get_ns()- read_info->read_start_ns;
e->duration = timestamp - read_info->read_start_ns;
e->write_time_ns = read_info->read_start_ns; // TODO: it is not write time, but start of read time
e->payload_size = 0;
e->payload_read_complete = 0;
Expand Down Expand Up @@ -456,7 +468,7 @@ int process_exit_of_syscalls_read_recvfrom(void* ctx, __u64 id, __u32 pid, __s64
e->method = active_req->method;

e->protocol = active_req->protocol;
e->duration = bpf_ktime_get_ns() - active_req->write_time_ns;
e->duration = timestamp - active_req->write_time_ns;

e->write_time_ns = active_req->write_time_ns;

Expand Down Expand Up @@ -576,6 +588,7 @@ void ssl_uprobe_write_v_1_0_2(struct pt_regs *ctx, void* ssl, void* buffer, int

static __always_inline
void ssl_uprobe_read_enter_v1_0_2(struct pt_regs *ctx, __u64 id, __u32 pid, void* ssl, void* buffer, int num, size_t *count_ptr) {
__u64 time = bpf_ktime_get_ns();
unsigned char func_name[] = "ssl_uprobe_read_enter_v1_0_2";
struct ssl_st_v1_0_2 ssl_st;
long r = bpf_probe_read_user(&ssl_st, sizeof(ssl_st), ssl);
Expand All @@ -597,7 +610,15 @@ void ssl_uprobe_read_enter_v1_0_2(struct pt_regs *ctx, __u64 id, __u32 pid, voi
char* buf_ptr = (char*) buffer;
__u64 buf_size = num;

process_enter_of_syscalls_read_recvfrom(ctx, id, fd, buf_ptr, buf_size);

struct read_enter_args params = {
.id = id,
.fd = fd,
.buf = buf_ptr,
.size = buf_size,
.time = time
};
process_enter_of_syscalls_read_recvfrom(ctx, &params);
}

static __always_inline
Expand Down Expand Up @@ -628,6 +649,7 @@ void ssl_uprobe_write_v_1_1_1(struct pt_regs *ctx, void* ssl, void* buffer, int

static __always_inline
void ssl_uprobe_read_enter_v1_1_1(struct pt_regs *ctx, __u64 id, __u32 pid, void* ssl, void* buffer, int num, size_t *count_ptr) {
__u64 time = bpf_ktime_get_ns();
unsigned char func_name[] = "ssl_uprobe_read_enter_v1_1_1";
struct ssl_st_v1_1_1 ssl_st;
long r = bpf_probe_read_user(&ssl_st, sizeof(ssl_st), ssl);
Expand All @@ -649,7 +671,14 @@ void ssl_uprobe_read_enter_v1_1_1(struct pt_regs *ctx, __u64 id, __u32 pid, voi
char* buf_ptr = (char*) buffer;
__u64 buf_size = num;

process_enter_of_syscalls_read_recvfrom(ctx, id, fd, buf_ptr, buf_size);
struct read_enter_args params = {
.id = id,
.fd = fd,
.buf = buf_ptr,
.size = buf_size,
.time = time
};
process_enter_of_syscalls_read_recvfrom(ctx, &params);
}


Expand Down Expand Up @@ -681,6 +710,7 @@ void ssl_uprobe_write_v_3(struct pt_regs *ctx, void* ssl, void* buffer, int num,

static __always_inline
void ssl_uprobe_read_enter_v3(struct pt_regs *ctx, __u64 id, __u32 pid, void* ssl, void* buffer, int num, size_t *count_ptr) {
__u64 time = bpf_ktime_get_ns();
unsigned char func_name[] = "ssl_uprobe_read_enter_v3";
struct ssl_st_v3_0_0 ssl_st;
long r = bpf_probe_read_user(&ssl_st, sizeof(ssl_st), ssl);
Expand All @@ -702,7 +732,15 @@ void ssl_uprobe_read_enter_v3(struct pt_regs *ctx, __u64 id, __u32 pid, void* s
char* buf_ptr = (char*) buffer;
__u64 buf_size = num;

process_enter_of_syscalls_read_recvfrom(ctx, id, fd, buf_ptr, buf_size);
struct read_enter_args params = {
.id = id,
.fd = fd,
.buf = buf_ptr,
.size = buf_size,
.time = time

};
process_enter_of_syscalls_read_recvfrom(ctx, &params);
}

SEC("tracepoint/syscalls/sys_enter_write")
Expand All @@ -727,14 +765,30 @@ int sys_exit_sendto(struct trace_event_raw_sys_exit_sendto* ctx) {

SEC("tracepoint/syscalls/sys_enter_read")
int sys_enter_read(struct trace_event_raw_sys_enter_read* ctx) {
__u64 time = bpf_ktime_get_ns();
__u64 id = bpf_get_current_pid_tgid();
return process_enter_of_syscalls_read_recvfrom(ctx, id, ctx->fd, ctx->buf, ctx->count);
struct read_enter_args params = {
.id = id,
.fd = ctx->fd,
.buf = ctx->buf,
.size = ctx->count,
.time = time
};
return process_enter_of_syscalls_read_recvfrom(ctx, &params);
}

SEC("tracepoint/syscalls/sys_enter_recvfrom")
int sys_enter_recvfrom(struct trace_event_raw_sys_enter_recvfrom* ctx) {
__u64 time = bpf_ktime_get_ns();
__u64 id = bpf_get_current_pid_tgid();
return process_enter_of_syscalls_read_recvfrom(ctx, id, ctx->fd, ctx->ubuf, ctx->size);
struct read_enter_args params = {
.id = id,
.fd = ctx->fd,
.buf = ctx->ubuf,
.size = ctx->size,
.time = time
};
return process_enter_of_syscalls_read_recvfrom(ctx, &params);
}

SEC("tracepoint/syscalls/sys_exit_read")
Expand Down Expand Up @@ -822,6 +876,7 @@ struct go_interface {

static __always_inline
int process_enter_of_go_conn_write(void *ctx, __u32 pid, __u32 fd, char *buf_ptr, __u64 count) {
__u64 timestamp = bpf_ktime_get_ns();
unsigned char func_name[] = "process_enter_of_go_conn_write";
// parse and write to go_active_l7_req map
struct go_req_key k = {};
Expand All @@ -837,7 +892,7 @@ int process_enter_of_go_conn_write(void *ctx, __u32 pid, __u32 fd, char *buf_ptr
req->protocol = PROTOCOL_UNKNOWN;
req->payload_size = 0;
req->payload_read_complete = 0;
req->write_time_ns = bpf_ktime_get_ns();
req->write_time_ns = timestamp;
req->request_type = 0;

if(buf_ptr){
Expand Down Expand Up @@ -911,8 +966,8 @@ int BPF_UPROBE(go_tls_conn_write_enter) {
// func (c *Conn) Read(b []byte) (int, error)
SEC("uprobe/go_tls_conn_read_enter")
int BPF_UPROBE(go_tls_conn_read_enter) {
__u64 timestamp = bpf_ktime_get_ns();
unsigned char func_name[] = "go_tls_conn_read_enter";

__u32 fd;
struct go_interface conn;

Expand All @@ -937,7 +992,7 @@ int BPF_UPROBE(go_tls_conn_read_enter) {
args.fd = fd;
args.buf = buf_ptr;
args.size = buf_size;
args.read_start_ns = bpf_ktime_get_ns();
args.read_start_ns = timestamp;

struct go_read_key k = {};
k.goid = GOROUTINE(ctx);
Expand All @@ -955,8 +1010,8 @@ int BPF_UPROBE(go_tls_conn_read_enter) {
// attached to all RET instructions since uretprobe crashes go applications
SEC("uprobe/go_tls_conn_read_exit")
int BPF_UPROBE(go_tls_conn_read_exit) {
__u64 timestamp = bpf_ktime_get_ns();
unsigned char func_name[] = "go_tls_conn_read_exit";

// can't access to register we've access on read_enter here,
// registers are changed.
long int ret = GO_PARAM1(ctx);
Expand Down Expand Up @@ -992,7 +1047,7 @@ int BPF_UPROBE(go_tls_conn_read_exit) {
return 0;
}

e->duration = bpf_ktime_get_ns() - req->write_time_ns;
e->duration = timestamp - req->write_time_ns;
e->write_time_ns = req->write_time_ns;
e->failed = 0; // success

Expand Down
5 changes: 3 additions & 2 deletions ebpf/tcp_state/tcp_sockets.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,15 @@ struct
SEC("tracepoint/sock/inet_sock_set_state")
int inet_sock_set_state(void *ctx)
{
__u64 timestamp = bpf_ktime_get_ns();
struct trace_event_raw_inet_sock_set_state args = {};
if (bpf_core_read(&args, sizeof(args), ctx) < 0)
{
return 0;
}

// if not tcp protocol, ignore
if (BPF_CORE_READ(&args, protocol) != IPPROTO_TCP)
if (BPF_CORE_READ(&args, protocol) != IPPROTO_TCP)
{
return 0;
}
Expand Down Expand Up @@ -127,7 +128,7 @@ if (BPF_CORE_READ(&args, protocol) != IPPROTO_TCP)

__u64 fd = 0;
__u32 type = 0;
__u64 timestamp = bpf_ktime_get_ns();

void *map = &tcp_connect_events;
if (oldstate == BPF_TCP_SYN_SENT)
{
Expand Down

0 comments on commit 3c89e4b

Please sign in to comment.