Skip to content

Commit

Permalink
Merge pull request #20 from ddosify/refactor/memory-opt
Browse files Browse the repository at this point in the history
optimize memory on several points
  • Loading branch information
fatihbaltaci authored Sep 26, 2023
2 parents f99c32f + c05a9b3 commit 7320c75
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 27 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ ALAZ_DOCKERFILE := Dockerfile
build_push_buildx:
docker buildx inspect $(BUILDX_BUILDER) || \
docker buildx create --name=$(BUILDX_BUILDER) && \
docker buildx build --push --platform=linux/amd64,linux/arm64 --builder=$(BUILDX_BUILDER) --build-arg ALAZ_TAG=$(ALAZ_TAG) --tag=$(REGISTRY)/$(ALAZ_IMAGE_NAME):$(ALAZ_TAG) -f $(ALAZ_DOCKERFILE) .
docker buildx build --push --platform=linux/amd64,linux/arm64 --builder=$(BUILDX_BUILDER) --build-arg ALAZ_TAG=$(ALAZ_TAG) --build-arg VERSION=$(ALAZ_TAG) --tag=$(REGISTRY)/$(ALAZ_IMAGE_NAME):$(ALAZ_TAG) -f $(ALAZ_DOCKERFILE) .


.PHONY: build_push
Expand Down
82 changes: 57 additions & 25 deletions aggregator/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/ddosify/alaz/k8s"

"github.com/patrickmn/go-cache"
"k8s.io/apimachinery/pkg/types"
)

Expand Down Expand Up @@ -83,10 +84,18 @@ var (
retryInterval = 400 * time.Millisecond
retryLimit = 5
// 400 + 800 + 1600 + 3200 + 6400 = 12400 ms

defaultExpiration = 5 * time.Minute
purgeTime = 10 * time.Minute
)

var usePgDs bool = false
var useBackendDs bool = true // default to true
var reverseDnsCache *cache.Cache

func init() {
reverseDnsCache = cache.New(defaultExpiration, purgeTime)
}

func NewAggregator(k8sChan <-chan interface{}, crChan <-chan interface{}, ebpfChan <-chan interface{}, ds datastore.DataStore) *Aggregator {
clusterInfo := &ClusterInfo{
Expand Down Expand Up @@ -277,25 +286,36 @@ func (a *Aggregator) processTcpConnect(data interface{}) {
}
}

func parseHttpPayload(request string) (method string, path string, httpVersion string) {
func parseHttpPayload(request string) (method string, path string, httpVersion string, hostHeader string) {
// Find the first space character
requestFirstLine := strings.Split(request, "\n")[0]
parts := strings.Split(requestFirstLine, " ")
lines := strings.Split(request, "\n")
parts := strings.Split(lines[0], " ")
if len(parts) >= 3 {
method = parts[0]
path = parts[1]
httpVersion = parts[2]
}
return method, path, httpVersion

for _, line := range lines[1:] {
// find Host header
if strings.HasPrefix(line, "Host:") {
hostParts := strings.Split(line, " ")
if len(hostParts) >= 2 {
hostHeader = hostParts[1]
hostHeader = strings.TrimSuffix(hostHeader, "\r")
break
}
}
}

return method, path, httpVersion, hostHeader
}

func parseSqlCommand(request string) string {
log.Logger.Debug().Str("request", request).Msg("parsing sql command")
func parseSqlCommand(r []uint8) string {
log.Logger.Debug().Uints8("request", r).Msg("parsing sql command")

// Q, 4 bytes of length, sql command

r := []byte(request)

// skip Q, (simple query)
r = r[1:]

Expand Down Expand Up @@ -406,17 +426,18 @@ func (a *Aggregator) processL7(d l7_req.L7Event) {
Method: d.Method,
}

var reqHostHeader string
// parse http payload, extract path, query params, headers
if d.Protocol == l7_req.L7_PROTOCOL_HTTP {
_, reqDto.Path, _ = parseHttpPayload(string(d.Payload[0:d.PayloadSize]))
_, reqDto.Path, _, reqHostHeader = parseHttpPayload(string(d.Payload[0:d.PayloadSize]))
log.Logger.Debug().Str("path", reqDto.Path).Msg("path extracted from http payload")
}

if d.Protocol == l7_req.L7_PROTOCOL_POSTGRES && d.Method == l7_req.SIMPLE_QUERY {
// parse sql command from payload
// path = sql command
// method = sql message type
reqDto.Path = parseSqlCommand(string(d.Payload[0:d.PayloadSize]))
reqDto.Path = parseSqlCommand(d.Payload[0:d.PayloadSize])
log.Logger.Debug().Str("path", reqDto.Path).Msg("path extracted from postgres payload")
}

Expand Down Expand Up @@ -455,13 +476,18 @@ func (a *Aggregator) processL7(d l7_req.L7Event) {
reqDto.ToType = "pod"
} else {
// 3rd party url
remoteDnsHost, err := getHostnameFromIP(skInfo.Daddr)
if err == nil {
// dns lookup successful
reqDto.ToUID = remoteDnsHost
if reqHostHeader != "" {
reqDto.ToUID = reqHostHeader
reqDto.ToType = "outbound"
} else {
log.Logger.Warn().Err(err).Str("Daddr", skInfo.Daddr).Msg("error getting hostname from ip")
remoteDnsHost, err := getHostnameFromIP(skInfo.Daddr)
if err == nil {
// dns lookup successful
reqDto.ToUID = remoteDnsHost
reqDto.ToType = "outbound"
} else {
log.Logger.Warn().Err(err).Str("Daddr", skInfo.Daddr).Msg("error getting hostname from ip")
}
}
}
}
Expand Down Expand Up @@ -489,16 +515,22 @@ func (a *Aggregator) processL7(d l7_req.L7Event) {

// reverse dns lookup
func getHostnameFromIP(ipAddr string) (string, error) {
addrs, err := net.LookupAddr(ipAddr)
if err != nil {
return "", err
}
// return from cache, if exists
// consumes too much memory otherwise
if host, ok := reverseDnsCache.Get(ipAddr); ok {
return host.(string), nil
} else {
addrs, err := net.LookupAddr(ipAddr)
if err != nil {
return "", err
}

// The reverse DNS lookup can return multiple names for the same IP.
// In this example, we return the first name found.
if len(addrs) > 0 {
return addrs[0], nil
// The reverse DNS lookup can return multiple names for the same IP.
// In this example, we return the first name found.
if len(addrs) > 0 {
reverseDnsCache.Set(ipAddr, addrs[0], 0)
return addrs[0], nil
}
return "", fmt.Errorf("no hostname found for IP address: %s", ipAddr)
}

return "", fmt.Errorf("no hostname found for IP address: %s", ipAddr)
}
23 changes: 22 additions & 1 deletion datastore/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/version"
collector "github.com/prometheus/node_exporter/collector"

poolutil "go.ddosify.com/ddosify/core/util"
)

var MonitoringID string
Expand Down Expand Up @@ -64,6 +66,7 @@ type BackendDS struct {
batchSize int64

reqChanBuffer chan *ReqInfo
reqInfoPool *poolutil.Pool[*ReqInfo]

podEventChan chan interface{} // *PodEvent
svcEventChan chan interface{} // *SvcEvent
Expand Down Expand Up @@ -160,6 +163,7 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendConfig) *Backend
port: conf.Port,
c: client,
batchSize: bs,
reqInfoPool: newReqInfoPool(func() *ReqInfo { return &ReqInfo{} }, func(r *ReqInfo) { r = nil }),
reqChanBuffer: make(chan *ReqInfo, 10000),
podEventChan: make(chan interface{}, 100),
svcEventChan: make(chan interface{}, 100),
Expand Down Expand Up @@ -261,6 +265,7 @@ func NewBackendDS(parentCtx context.Context, conf config.BackendConfig) *Backend

go func() {
<-ds.ctx.Done()
ds.reqInfoPool.Done()
log.Logger.Info().Msg("backend datastore stopped")
}()

Expand Down Expand Up @@ -348,6 +353,11 @@ func (b *BackendDS) sendReqsInBatch() {

reqsPayload := convertReqsToPayload(batch)
b.sendToBackend(http.MethodPost, reqsPayload, reqEndpoint)

// return reqInfoss to the pool
for _, req := range batch {
b.reqInfoPool.Put(req)
}
}

for {
Expand Down Expand Up @@ -410,8 +420,19 @@ func (b *BackendDS) sendEventsInBatch(ch chan interface{}, endpoint string, inte
}
}

func newReqInfoPool(factory func() *ReqInfo, close func(*ReqInfo)) *poolutil.Pool[*ReqInfo] {
return &poolutil.Pool[*ReqInfo]{
Items: make(chan *ReqInfo, 5000),
Factory: factory,
Close: close,
}
}

func (b *BackendDS) PersistRequest(request Request) error {
reqInfo := &ReqInfo{}
// get a reqInfo from the pool
reqInfo := b.reqInfoPool.Get()

// overwrite the reqInfo, all fields must be set in order to avoid comple
reqInfo[0] = request.StartTime
reqInfo[1] = request.Latency
reqInfo[2] = request.FromIP
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ require (
github.com/onsi/ginkgo/v2 v2.9.5 // indirect
github.com/onsi/gomega v1.27.7 // indirect
github.com/opencontainers/selinux v1.11.0 // indirect
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/prometheus-community/go-runit v0.1.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/procfs v0.11.0 // indirect
Expand All @@ -77,6 +78,7 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/testify v1.8.4 // indirect
github.com/xhit/go-str2duration/v2 v2.1.0 // indirect
go.ddosify.com/ddosify v1.0.5
go.uber.org/multierr v1.11.0 // indirect
go4.org/intern v0.0.0-20211027215823-ae77deb06f29 // indirect
go4.org/unsafe/assume-no-moving-gc v0.0.0-20230525183740-e7c30c78aeb2 // indirect
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,12 @@ github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8Hm
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtLA=
github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/jsimonetti/rtnetlink v1.3.2 h1:dcn0uWkfxycEEyNy0IGfx3GrhQ38LH7odjxAghimsVI=
github.com/jsimonetti/rtnetlink v1.3.2/go.mod h1:BBu4jZCpTjP6Gk0/wfrO8qcqymnN3g0hoFqObRmUo6U=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
Expand Down Expand Up @@ -173,12 +175,15 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/onsi/ginkgo/v2 v2.9.5 h1:+6Hr4uxzP4XIUyAkg61dWBw8lb/gc4/X5luuxN/EC+Q=
github.com/onsi/ginkgo/v2 v2.9.5/go.mod h1:tvAoo1QUJwNEU2ITftXTpR7R1RbCzoZUOs3RonqW57k=
github.com/onsi/gomega v1.27.7 h1:fVih9JD6ogIiHUN6ePK7HJidyEDpWGVB5mzM7cWNXoU=
github.com/onsi/gomega v1.27.7/go.mod h1:1p8OOlwo2iUUDsHnOrjE5UKYJ+e3W8eQ3qSlRahPmr4=
github.com/opencontainers/selinux v1.11.0 h1:+5Zbo97w3Lbmb3PeqQtpmTkMwsW5nRI3YaLpt7tQ7oU=
github.com/opencontainers/selinux v1.11.0/go.mod h1:E5dMC3VPuVvVHDYmi78qvhJp8+M586T4DlDRYpFkyec=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down Expand Up @@ -227,6 +232,8 @@ github.com/xhit/go-str2duration/v2 v2.1.0 h1:lxklc02Drh6ynqX+DdPyp5pCKLUQpRT8bp8
github.com/xhit/go-str2duration/v2 v2.1.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.ddosify.com/ddosify v1.0.5 h1:2FPBr8hjGam7nIjCCGr11MwrX8U7+RvbAFDp6g5eR2I=
go.ddosify.com/ddosify v1.0.5/go.mod h1:ciivfP/6RuLd01uc56pdUa2+SQalfKwEXsIhOu04rL4=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
Expand Down
5 changes: 5 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import (
"context"

"github.com/ddosify/alaz/log"

"net/http"
_ "net/http/pprof"
)

func main() {
Expand Down Expand Up @@ -62,6 +65,8 @@ func main() {
a.Run()
}

go http.ListenAndServe(":8181", nil)

<-k8sCollector.Done()
log.Logger.Info().Msg("k8sCollector done")

Expand Down

0 comments on commit 7320c75

Please sign in to comment.