Skip to content

Commit

Permalink
Merge pull request #21 from ddosify/develop
Browse files Browse the repository at this point in the history
Health Check & Performance Improvements
  • Loading branch information
fatihbaltaci authored Sep 26, 2023
2 parents fc1c2e2 + 1277b42 commit d3ced26
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 79 deletions.
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ FROM golang:1.20-alpine as builder
WORKDIR /app
COPY . ./
RUN apk update && apk add gcc musl-dev
RUN CGO_ENABLED=0 GOOS=linux go build -o alaz
ARG VERSION
RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-X 'github.com/ddosify/alaz/datastore.tag=$VERSION'" -o alaz

FROM alpine:3.18.3
RUN apk --no-cache add ca-certificates
Expand Down
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,20 @@ generate:
## Alaz Image

ALAZ_IMAGE_NAME := alaz
ALAZ_TAG := latest
REGISTRY := ddosify
ALAZ_TAG ?= latest
REGISTRY ?= ddosify
BUILDX_BUILDER := buildx-multi-arch
ALAZ_DOCKERFILE := Dockerfile

.PHONY: build_push_buildx
build_push_buildx:
docker buildx inspect $(BUILDX_BUILDER) || \
docker buildx create --name=$(BUILDX_BUILDER) && \
docker buildx build --push --platform=linux/amd64,linux/arm64 --builder=$(BUILDX_BUILDER) --build-arg ALAZ_TAG=$(ALAZ_TAG) --tag=$(REGISTRY)/$(ALAZ_IMAGE_NAME):$(ALAZ_TAG) -f $(ALAZ_DOCKERFILE) .
docker buildx build --push --platform=linux/amd64,linux/arm64 --builder=$(BUILDX_BUILDER) --build-arg ALAZ_TAG=$(ALAZ_TAG) --build-arg VERSION=$(ALAZ_TAG) --tag=$(REGISTRY)/$(ALAZ_IMAGE_NAME):$(ALAZ_TAG) -f $(ALAZ_DOCKERFILE) .


.PHONY: build_push
build_push:
docker build -t $(REGISTRY)/$(ALAZ_IMAGE_NAME):$(ALAZ_TAG) -f $(ALAZ_DOCKERFILE) .
docker build --build-arg VERSION=$(ALAZ_TAG) -t $(REGISTRY)/$(ALAZ_IMAGE_NAME):$(ALAZ_TAG) -f $(ALAZ_DOCKERFILE) .
docker push $(REGISTRY)/$(ALAZ_IMAGE_NAME):$(ALAZ_TAG)

99 changes: 59 additions & 40 deletions aggregator/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,12 @@ package aggregator
// 5. docker (TODO)

import (
"context"
"fmt"
"net"
"os"
"strconv"
"strings"
"sync"
"time"

"github.com/ddosify/alaz/config"
"github.com/ddosify/alaz/datastore"
"github.com/ddosify/alaz/ebpf"
"github.com/ddosify/alaz/ebpf/l7_req"
Expand All @@ -26,6 +22,7 @@ import (

"github.com/ddosify/alaz/k8s"

"github.com/patrickmn/go-cache"
"k8s.io/apimachinery/pkg/types"
)

Expand Down Expand Up @@ -87,32 +84,31 @@ var (
retryInterval = 400 * time.Millisecond
retryLimit = 5
// 400 + 800 + 1600 + 3200 + 6400 = 12400 ms

defaultExpiration = 5 * time.Minute
purgeTime = 10 * time.Minute
)

var usePgDs bool = false
var useBackendDs bool = true // default to true
var reverseDnsCache *cache.Cache

func init() {
reverseDnsCache = cache.New(defaultExpiration, purgeTime)
}

func NewAggregator(parentCtx context.Context, k8sChan <-chan interface{}, crChan <-chan interface{}, ebpfChan <-chan interface{}) *Aggregator {
ctx, _ := context.WithCancel(parentCtx)
func NewAggregator(k8sChan <-chan interface{}, crChan <-chan interface{}, ebpfChan <-chan interface{}, ds datastore.DataStore) *Aggregator {
clusterInfo := &ClusterInfo{
PodIPToPodUid: map[string]types.UID{},
ServiceIPToServiceUid: map[string]types.UID{},
PidToSocketMap: make(map[uint32]*SocketMap, 0),
}

metricsExport, _ := strconv.ParseBool(os.Getenv("METRICS_BACKEND"))
dsBackend := datastore.NewBackendDS(ctx, config.BackendConfig{
Host: os.Getenv("BACKEND_HOST"),
Port: os.Getenv("BACKEND_PORT"),
MetricsExport: metricsExport,
MetricsExportInterval: 10,
})

return &Aggregator{
k8sChan: k8sChan,
ebpfChan: ebpfChan,
clusterInfo: clusterInfo,
ds: dsBackend,
ds: ds,
}
}

Expand Down Expand Up @@ -290,25 +286,36 @@ func (a *Aggregator) processTcpConnect(data interface{}) {
}
}

func parseHttpPayload(request string) (method string, path string, httpVersion string) {
func parseHttpPayload(request string) (method string, path string, httpVersion string, hostHeader string) {
// Find the first space character
requestFirstLine := strings.Split(request, "\n")[0]
parts := strings.Split(requestFirstLine, " ")
lines := strings.Split(request, "\n")
parts := strings.Split(lines[0], " ")
if len(parts) >= 3 {
method = parts[0]
path = parts[1]
httpVersion = parts[2]
}
return method, path, httpVersion

for _, line := range lines[1:] {
// find Host header
if strings.HasPrefix(line, "Host:") {
hostParts := strings.Split(line, " ")
if len(hostParts) >= 2 {
hostHeader = hostParts[1]
hostHeader = strings.TrimSuffix(hostHeader, "\r")
break
}
}
}

return method, path, httpVersion, hostHeader
}

func parseSqlCommand(request string) string {
log.Logger.Debug().Str("request", request).Msg("parsing sql command")
func parseSqlCommand(r []uint8) string {
log.Logger.Debug().Uints8("request", r).Msg("parsing sql command")

// Q, 4 bytes of length, sql command

r := []byte(request)

// skip Q, (simple query)
r = r[1:]

Expand Down Expand Up @@ -419,17 +426,18 @@ func (a *Aggregator) processL7(d l7_req.L7Event) {
Method: d.Method,
}

var reqHostHeader string
// parse http payload, extract path, query params, headers
if d.Protocol == l7_req.L7_PROTOCOL_HTTP {
_, reqDto.Path, _ = parseHttpPayload(string(d.Payload[0:d.PayloadSize]))
_, reqDto.Path, _, reqHostHeader = parseHttpPayload(string(d.Payload[0:d.PayloadSize]))
log.Logger.Debug().Str("path", reqDto.Path).Msg("path extracted from http payload")
}

if d.Protocol == l7_req.L7_PROTOCOL_POSTGRES && d.Method == l7_req.SIMPLE_QUERY {
// parse sql command from payload
// path = sql command
// method = sql message type
reqDto.Path = parseSqlCommand(string(d.Payload[0:d.PayloadSize]))
reqDto.Path = parseSqlCommand(d.Payload[0:d.PayloadSize])
log.Logger.Debug().Str("path", reqDto.Path).Msg("path extracted from postgres payload")
}

Expand Down Expand Up @@ -468,13 +476,18 @@ func (a *Aggregator) processL7(d l7_req.L7Event) {
reqDto.ToType = "pod"
} else {
// 3rd party url
remoteDnsHost, err := getHostnameFromIP(skInfo.Daddr)
if err == nil {
// dns lookup successful
reqDto.ToUID = remoteDnsHost
if reqHostHeader != "" {
reqDto.ToUID = reqHostHeader
reqDto.ToType = "outbound"
} else {
log.Logger.Warn().Err(err).Str("Daddr", skInfo.Daddr).Msg("error getting hostname from ip")
remoteDnsHost, err := getHostnameFromIP(skInfo.Daddr)
if err == nil {
// dns lookup successful
reqDto.ToUID = remoteDnsHost
reqDto.ToType = "outbound"
} else {
log.Logger.Warn().Err(err).Str("Daddr", skInfo.Daddr).Msg("error getting hostname from ip")
}
}
}
}
Expand Down Expand Up @@ -502,16 +515,22 @@ func (a *Aggregator) processL7(d l7_req.L7Event) {

// reverse dns lookup
func getHostnameFromIP(ipAddr string) (string, error) {
addrs, err := net.LookupAddr(ipAddr)
if err != nil {
return "", err
}
// return from cache, if exists
// consumes too much memory otherwise
if host, ok := reverseDnsCache.Get(ipAddr); ok {
return host.(string), nil
} else {
addrs, err := net.LookupAddr(ipAddr)
if err != nil {
return "", err
}

// The reverse DNS lookup can return multiple names for the same IP.
// In this example, we return the first name found.
if len(addrs) > 0 {
return addrs[0], nil
// The reverse DNS lookup can return multiple names for the same IP.
// In this example, we return the first name found.
if len(addrs) > 0 {
reverseDnsCache.Set(ipAddr, addrs[0], 0)
return addrs[0], nil
}
return "", fmt.Errorf("no hostname found for IP address: %s", ipAddr)
}

return "", fmt.Errorf("no hostname found for IP address: %s", ipAddr)
}
Loading

0 comments on commit d3ced26

Please sign in to comment.