Skip to content

Commit

Permalink
Merge pull request #34 from ddosify/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
fatihbaltaci authored Oct 19, 2023
2 parents fc85045 + 6cc6a6d commit 04b9c64
Show file tree
Hide file tree
Showing 24 changed files with 2,090 additions and 255 deletions.
24 changes: 21 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
CLANG ?= clang-14
STRIP ?= llvm-strip-14
OBJCOPY ?= llvm-objcopy-14
CFLAGS := -O2 -g -Wall -Werror $(CFLAGS)

TARGET_ARCH ?= arm64 # x86 or arm64
CFLAGS := -O2 -g -Wall -Werror -D__TARGET_ARCH_$(TARGET_ARCH) $(CFLAGS)

# Obtain an absolute path to the directory of the Makefile.
# Assume the Makefile is in the root of the repository.
Expand Down Expand Up @@ -75,15 +75,33 @@ REGISTRY ?= ddosify
BUILDX_BUILDER := buildx-multi-arch
ALAZ_DOCKERFILE := Dockerfile

ifeq ($(TARGET_ARCH), arm64)
DOCKER_PLATFORM := linux/arm64
else
DOCKER_PLATFORM := linux/amd64
endif

.PHONY: build_push_buildx
build_push_buildx:
docker buildx inspect $(BUILDX_BUILDER) || \
docker buildx create --name=$(BUILDX_BUILDER) && \
docker buildx build --push --platform=linux/amd64,linux/arm64 --builder=$(BUILDX_BUILDER) --build-arg ALAZ_TAG=$(ALAZ_TAG) --build-arg VERSION=$(ALAZ_TAG) --tag=$(REGISTRY)/$(ALAZ_IMAGE_NAME):$(ALAZ_TAG) -f $(ALAZ_DOCKERFILE) .
docker buildx build --push --platform=$(DOCKER_PLATFORM) --builder=$(BUILDX_BUILDER) --build-arg ALAZ_TAG=$(ALAZ_TAG) --build-arg VERSION=$(ALAZ_TAG) --tag=$(REGISTRY)/$(ALAZ_IMAGE_NAME):$(ALAZ_TAG)-$(TARGET_ARCH) -f $(ALAZ_DOCKERFILE) .

.PHONY: docker_merge_platforms
docker_merge_platforms:
docker buildx imagetools create --tag $(REGISTRY)/$(ALAZ_IMAGE_NAME):$(ALAZ_TAG) $(REGISTRY)/$(ALAZ_IMAGE_NAME):$(ALAZ_TAG)-arm64 $(REGISTRY)/$(ALAZ_IMAGE_NAME):$(ALAZ_TAG)-x86

.PHONY: build_push
build_push:
docker build --build-arg VERSION=$(ALAZ_TAG) -t $(REGISTRY)/$(ALAZ_IMAGE_NAME):$(ALAZ_TAG) -f $(ALAZ_DOCKERFILE) .
docker push $(REGISTRY)/$(ALAZ_IMAGE_NAME):$(ALAZ_TAG)

# make go_builder_image_build
# ALAZ_TAG=latest
# make go_generate TARGET_ARCH=arm64
# make build_push_buildx TARGET_ARCH=arm64 ALAZ_TAG=$ALAZ_TAG

# make go_generate TARGET_ARCH=x86
# make build_push_buildx TARGET_ARCH=x86 ALAZ_TAG=$ALAZ_TAG

# make docker_merge_platforms ALAZ_TAG=$ALAZ_TAG
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ helm delete alaz --namespace ddosify
Alaz supports the following protocols:

- HTTP
- HTTPS (OpenSSL and Go TLS)
- Postgres
- RabbitMQ

Expand Down
127 changes: 81 additions & 46 deletions aggregator/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package aggregator
// 5. docker (TODO)

import (
"context"
"fmt"
"net"
"strings"
Expand All @@ -27,10 +28,14 @@ import (
)

type Aggregator struct {
ctx context.Context

// listen to events from different sources
k8sChan <-chan interface{}
ebpfChan <-chan interface{}

ec *ebpf.EbpfCollector

// store the service map
clusterInfo *ClusterInfo

Expand Down Expand Up @@ -97,24 +102,27 @@ func init() {
reverseDnsCache = cache.New(defaultExpiration, purgeTime)
}

func NewAggregator(k8sChan <-chan interface{}, crChan <-chan interface{}, ebpfChan <-chan interface{}, ds datastore.DataStore) *Aggregator {
func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{}, ec *ebpf.EbpfCollector, ds datastore.DataStore) *Aggregator {
ctx, _ := context.WithCancel(parentCtx)
clusterInfo := &ClusterInfo{
PodIPToPodUid: map[string]types.UID{},
ServiceIPToServiceUid: map[string]types.UID{},
PidToSocketMap: make(map[uint32]*SocketMap, 0),
}

return &Aggregator{
ctx: ctx,
k8sChan: k8sChan,
ebpfChan: ebpfChan,
ebpfChan: ec.EbpfEvents(),
ec: ec,
clusterInfo: clusterInfo,
ds: ds,
}
}

func (a *Aggregator) Run() {
go a.processk8s()
go a.processEbpf()
go a.processEbpf(a.ctx)
}

func (a *Aggregator) processk8s() {
Expand All @@ -141,57 +149,69 @@ func (a *Aggregator) processk8s() {
}
}

func (a *Aggregator) processEbpf() {
func (a *Aggregator) processEbpf(ctx context.Context) {
stop := make(chan struct{})
go func() {
<-ctx.Done()
close(stop)
}()

for data := range a.ebpfChan {
bpfEvent, ok := data.(ebpf.BpfEvent)
if !ok {
log.Logger.Error().Interface("ebpfData", data).Msg("error casting ebpf event")
continue
}
switch bpfEvent.Type() {
case tcp_state.TCP_CONNECT_EVENT:
d := data.(tcp_state.TcpConnectEvent) // copy data's value
tcpConnectEvent := tcp_state.TcpConnectEvent{
Fd: d.Fd,
Timestamp: d.Timestamp,
Type_: d.Type_,
Pid: d.Pid,
SPort: d.SPort,
DPort: d.DPort,
SAddr: d.SAddr,
DAddr: d.DAddr,
select {
case <-stop:
log.Logger.Info().Msg("processEbpf exiting...")
return
default:
bpfEvent, ok := data.(ebpf.BpfEvent)
if !ok {
log.Logger.Error().Interface("ebpfData", data).Msg("error casting ebpf event")
continue
}
go a.processTcpConnect(tcpConnectEvent)
case l7_req.L7_EVENT:
d := data.(l7_req.L7Event) // copy data's value

// copy payload slice
payload := [512]uint8{}
copy(payload[:], d.Payload[:])

l7Event := l7_req.L7Event{
Fd: d.Fd,
Pid: d.Pid,
Status: d.Status,
Duration: d.Duration,
Protocol: d.Protocol,
Method: d.Method,
Payload: payload,
PayloadSize: d.PayloadSize,
PayloadReadComplete: d.PayloadReadComplete,
Failed: d.Failed,
WriteTimeNs: d.WriteTimeNs,
switch bpfEvent.Type() {
case tcp_state.TCP_CONNECT_EVENT:
d := data.(tcp_state.TcpConnectEvent) // copy data's value
tcpConnectEvent := tcp_state.TcpConnectEvent{
Fd: d.Fd,
Timestamp: d.Timestamp,
Type_: d.Type_,
Pid: d.Pid,
SPort: d.SPort,
DPort: d.DPort,
SAddr: d.SAddr,
DAddr: d.DAddr,
}
go a.processTcpConnect(tcpConnectEvent)
case l7_req.L7_EVENT:
d := data.(l7_req.L7Event) // copy data's value

// copy payload slice
payload := [512]uint8{}
copy(payload[:], d.Payload[:])

l7Event := l7_req.L7Event{
Fd: d.Fd,
Pid: d.Pid,
Status: d.Status,
Duration: d.Duration,
Protocol: d.Protocol,
Tls: d.Tls,
Method: d.Method,
Payload: payload,
PayloadSize: d.PayloadSize,
PayloadReadComplete: d.PayloadReadComplete,
Failed: d.Failed,
WriteTimeNs: d.WriteTimeNs,
}
go a.processL7(ctx, l7Event)
}
go a.processL7(l7Event)
}
}
}

func (a *Aggregator) processTcpConnect(data interface{}) {
d := data.(tcp_state.TcpConnectEvent)
go a.ec.ListenForEncryptedReqs(d.Pid)
if d.Type_ == tcp_state.EVENT_TCP_ESTABLISHED {
// {pid,fd} -> SockInfo

// filter out localhost connections
if d.SAddr == "127.0.0.1" || d.DAddr == "127.0.0.1" {
return
Expand Down Expand Up @@ -328,7 +348,7 @@ func parseSqlCommand(r []uint8) string {
return sqlStatement
}

func (a *Aggregator) processL7(d l7_req.L7Event) {
func (a *Aggregator) processL7(ctx context.Context, d l7_req.L7Event) {
var sockMap *SocketMap
var skLine *SocketLine
var ok bool
Expand All @@ -346,6 +366,8 @@ func (a *Aggregator) processL7(d l7_req.L7Event) {
a.clusterInfo.mu.Lock() // lock for writing
a.clusterInfo.PidToSocketMap[d.Pid] = sockMap
a.clusterInfo.mu.Unlock() // unlock for writing

go a.ec.ListenForEncryptedReqs(d.Pid)
}

sockMap.mu.RLock() // lock for reading
Expand Down Expand Up @@ -399,6 +421,14 @@ func (a *Aggregator) processL7(d l7_req.L7Event) {
if rc == 0 {
break
}

select {
case <-ctx.Done():
log.Logger.Debug().Msg("processL7 exiting, stop retrying...")
return
default:
continue
}
}

if rc < retryLimit && skInfo != nil {
Expand All @@ -420,6 +450,7 @@ func (a *Aggregator) processL7(d l7_req.L7Event) {
FromIP: skInfo.Saddr,
ToIP: skInfo.Daddr,
Protocol: d.Protocol,
Tls: d.Tls,
Completed: true,
StatusCode: d.Status,
FailReason: "",
Expand Down Expand Up @@ -505,8 +536,12 @@ func (a *Aggregator) processL7(d l7_req.L7Event) {
reqDto.FromType, reqDto.ToType = reqDto.ToType, reqDto.FromType
}

if d.Protocol == l7_req.L7_PROTOCOL_HTTP && d.Tls {
reqDto.Protocol = "HTTPS"
}

go func() {
err := a.ds.PersistRequest(reqDto)
err := a.ds.PersistRequest(&reqDto)
if err != nil {
log.Logger.Error().Err(err).Msg("error persisting request")
}
Expand Down
58 changes: 47 additions & 11 deletions datastore/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,29 @@ const (
healthCheckEndpoint = "/alaz/healthcheck/"
)

type LeveledLogger struct {
l zerolog.Logger
}

func (ll LeveledLogger) Error(msg string, keysAndValues ...interface{}) {
ll.l.Error().Fields(keysAndValues).Msg(msg)
}
func (ll LeveledLogger) Info(msg string, keysAndValues ...interface{}) {
ll.l.Info().Fields(keysAndValues).Msg(msg)
}
func (ll LeveledLogger) Debug(msg string, keysAndValues ...interface{}) {
ll.l.Debug().Fields(keysAndValues).Msg(msg)
}
func (ll LeveledLogger) Warn(msg string, keysAndValues ...interface{}) {
ll.l.Warn().Fields(keysAndValues).Msg(msg)
}

func NewBackendDS(parentCtx context.Context, conf config.BackendConfig) *BackendDS {
ctx, _ := context.WithCancel(parentCtx)
rand.Seed(time.Now().UnixNano())

retryClient := retryablehttp.NewClient()
retryClient.Logger = LeveledLogger{l: log.Logger.With().Str("component", "retryablehttp").Logger()}
retryClient.Backoff = retryablehttp.DefaultBackoff
retryClient.RetryWaitMin = 1 * time.Second
retryClient.RetryWaitMax = 5 * time.Second
Expand Down Expand Up @@ -128,14 +146,29 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendConfig) *Backend
if err != nil {
log.Logger.Debug().Msgf("error reading response body: %v", err)
}
var resp BackendResponse
err = json.Unmarshal(rb, &resp)
if err != nil {
log.Logger.Debug().Msgf("error unmarshalling response body: %v", err)
}
if len(resp.Errors) > 0 {
for _, e := range resp.Errors {
log.Logger.Debug().Str("errorMsg", e.Error).Any("event", e.Event).Msgf("backend persist error")

// if req endpoint
if resp.Request.URL.Path == reqEndpoint {
var resp ReqBackendReponse
err = json.Unmarshal(rb, &resp)
if err != nil {
log.Logger.Debug().Msgf("error unmarshalling response body: %v", err)
}
if len(resp.Errors) > 0 {
for _, e := range resp.Errors {
log.Logger.Error().Str("errorMsg", e.Error).Any("event", e.Event).Msgf("backend persist error")
}
}
} else {
var resp BackendResponse
err = json.Unmarshal(rb, &resp)
if err != nil {
log.Logger.Debug().Msgf("error unmarshalling response body: %v", err)
}
if len(resp.Errors) > 0 {
for _, e := range resp.Errors {
log.Logger.Error().Str("errorMsg", e.Error).Any("event", e.Event).Msgf("backend persist error")
}
}
}
}
Expand Down Expand Up @@ -163,7 +196,7 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendConfig) *Backend
port: conf.Port,
c: client,
batchSize: bs,
reqInfoPool: newReqInfoPool(func() *ReqInfo { return &ReqInfo{} }, func(r *ReqInfo) { r = nil }),
reqInfoPool: newReqInfoPool(func() *ReqInfo { return &ReqInfo{} }, func(r *ReqInfo) {}),
reqChanBuffer: make(chan *ReqInfo, 10000),
podEventChan: make(chan interface{}, 100),
svcEventChan: make(chan interface{}, 100),
Expand Down Expand Up @@ -265,7 +298,9 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendConfig) *Backend

go func() {
<-ds.ctx.Done()
ds.reqInfoPool.Done()
// TODO:
// reqInfoPool.Put() results in send to closed channel if Done() is called
// ds.reqInfoPool.Done()
log.Logger.Info().Msg("backend datastore stopped")
}()

Expand Down Expand Up @@ -428,7 +463,7 @@ func newReqInfoPool(factory func() *ReqInfo, close func(*ReqInfo)) *poolutil.Poo
}
}

func (b *BackendDS) PersistRequest(request Request) error {
func (b *BackendDS) PersistRequest(request *Request) error {
// get a reqInfo from the pool
reqInfo := b.reqInfoPool.Get()

Expand All @@ -448,6 +483,7 @@ func (b *BackendDS) PersistRequest(request Request) error {
reqInfo[12] = request.FailReason // TODO ??
reqInfo[13] = request.Method
reqInfo[14] = request.Path
reqInfo[15] = request.Tls

b.reqChanBuffer <- reqInfo

Expand Down
2 changes: 1 addition & 1 deletion datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ type DataStore interface {
PersistContainer(c Container, eventType string) error
PersistDaemonSet(ds DaemonSet, eventType string) error

PersistRequest(request Request) error
PersistRequest(request *Request) error
}
Loading

0 comments on commit 04b9c64

Please sign in to comment.