Skip to content

Commit

Permalink
Merge pull request #394 from askuy/feature/optimizecontextcancel
Browse files Browse the repository at this point in the history
增加ConnTcpMetricPorts
  • Loading branch information
askuy authored Jun 7, 2024
2 parents 87aa236 + 1018b70 commit 88c19d6
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 59 deletions.
72 changes: 21 additions & 51 deletions core/emetric/tcpstat.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/gotomicro/ego/core/elog"
"github.com/samber/lo"
)

type tcpConnectionState int
Expand Down Expand Up @@ -59,18 +60,21 @@ const (
)

type TcpStatCollector struct {
ForeignPorts []uint64
}

// NewTCPStatCollector returns a new Collector exposing network stats.
func NewTCPStatCollector() (*TcpStatCollector, error) {
return &TcpStatCollector{}, nil
func NewTCPStatCollector(foreignPorts []uint64) *TcpStatCollector {
return &TcpStatCollector{
ForeignPorts: foreignPorts,
}
}

func (c *TcpStatCollector) Update() error {
go func() {
for {
statsFile := path.Join("/proc", strconv.Itoa(os.Getpid()), "net", "tcp")
tcpStats, err := getTCPStats(statsFile)
tcpStats, err := c.getTCPStats(statsFile)
if err != nil {
elog.EgoLogger.Error(fmt.Errorf("couldn't get tcpstats: %w", err).Error())
return
Expand All @@ -87,34 +91,17 @@ func (c *TcpStatCollector) Update() error {
time.Sleep(5 * time.Second)
}
}()

// if enabled ipv6 system
//tcp6File := procFilePath("net/tcp6")
//if _, hasIPv6 := os.Stat(tcp6File); hasIPv6 == nil {
// tcp6Stats, err := getTCPStats(tcp6File)
// if err != nil {
// return fmt.Errorf("couldn't get tcp6stats: %w", err)
// }
//
// for st, value := range tcp6Stats {
// tcpStats[st] += value
// }
//}

//for st, value := range tcpStats {
// ch <- c.desc.mustNewConstMetric(value, st.String())
//}
return nil
}

func getTCPStats(statsFile string) (map[tcpConnectionState]map[string]float64, error) {
func (c *TcpStatCollector) getTCPStats(statsFile string) (map[tcpConnectionState]map[string]float64, error) {
file, err := os.Open(statsFile)
if err != nil {
return nil, err
}
defer file.Close()

return parseTCPStats(file)
return c.parseTCPStats(file)
}

/*
Expand All @@ -127,8 +114,7 @@ func getTCPStats(statsFile string) (map[tcpConnectionState]map[string]float64, e
|----------------------------------> number of entry
*/

func parseTCPStats(r io.Reader) (map[tcpConnectionState]map[string]float64, error) {
//tcpStats := map[tcpConnectionState]float64{}
func (c *TcpStatCollector) parseTCPStats(r io.Reader) (map[tcpConnectionState]map[string]float64, error) {
tcpStatsMap := make(map[tcpConnectionState]map[string]float64, 0)
contents, err := io.ReadAll(r)
if err != nil {
Expand All @@ -144,24 +130,7 @@ func parseTCPStats(r io.Reader) (map[tcpConnectionState]map[string]float64, erro
return nil, fmt.Errorf("invalid TCP stats line: %q", line)
}

//qu := strings.Split(parts[4], ":")
//if len(qu) < 2 {
// return nil, fmt.Errorf("cannot parse tx_queues and rx_queues: %q", line)
//}
//
//tx, err := strconv.ParseUint(qu[0], 16, 64)
//if err != nil {
// return nil, err
//}
//tcpStats[tcpConnectionState(tcpTxQueuedBytes)] += float64(tx)
//
//rx, err := strconv.ParseUint(qu[1], 16, 64)
//if err != nil {
// return nil, err
//}
//tcpStats[tcpConnectionState(tcpRxQueuedBytes)] += float64(rx)

ipv4, _ := parseIpV4(parts[2])
ipv4, _ := c.parseIpV4(parts[2])
st, err := strconv.ParseInt(parts[3], 16, 8)
if err != nil {
return nil, err
Expand All @@ -175,9 +144,6 @@ func parseTCPStats(r io.Reader) (map[tcpConnectionState]map[string]float64, erro
info[ipv4]++
}
tcpStatsMap[tcpConnectionState(st)] = info

//tcpStats[tcpConnectionState(st)]++

}

return tcpStatsMap, nil
Expand Down Expand Up @@ -218,23 +184,27 @@ func (st tcpConnectionState) String() string {

// 只解析IPV4
// 34190A0A:3D2D
func parseIpV4(s string) (string, error) {
func (ts *TcpStatCollector) parseIpV4(s string) (string, error) {
if len(s) != 13 {
return "", fmt.Errorf("not ipv4")
}
hexIP := s[:len(s)-5]
hexPort := s[len(s)-4:]
port, err := strconv.ParseUint(hexPort, 16, 16)
if err != nil {
return "", nil
}
// 防止端口过多,导致prometheus监控数据太大
if !lo.Contains(ts.ForeignPorts, port) {
return "all", nil
}

bytesIP, err := hex.DecodeString(hexIP)
if err != nil {
return "", nil
}
uint32IP := binary.LittleEndian.Uint32(bytesIP) //转换为主机字节序
IP := make(net.IP, 4)
binary.BigEndian.PutUint32(IP, uint32IP)
port, err := strconv.ParseUint(hexPort, 16, 16)
return fmt.Sprintf("%s:%d", IP.String(), port), err
}

//func parsePort(portStr string) (int64, error) {
// return strconv.ParseInt(portStr, 16, 16)
//}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ require (
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/samber/lo v1.39.0 // indirect
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee // indirect
github.com/shirou/gopsutil/v3 v3.21.6 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect
Expand All @@ -93,6 +94,7 @@ require (
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/crypto v0.11.0 // indirect
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
golang.org/x/mod v0.11.0 // indirect
golang.org/x/net v0.12.0 // indirect
golang.org/x/sys v0.12.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,8 @@ github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjR
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/samber/lo v1.39.0 h1:4gTz1wUhNYLhFSKl6O+8peW0v2F4BCY034GRpU9WnuA=
github.com/samber/lo v1.39.0/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee h1:8Iv5m6xEo1NR1AvpV+7XmhI4r39LGNzwUL4YpMuL5vk=
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee/go.mod h1:qwtSXrKuJh/zsFQ12yEE89xfCrGKK63Rr7ctU/uCo4g=
Expand Down Expand Up @@ -567,6 +569,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down
8 changes: 5 additions & 3 deletions server/egovernor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@ type Config struct {
Port int
EnableLocalMainIP bool
EnableConnTcpMetric bool
ConnTcpMetricPorts []uint64
Network string
}

// DefaultConfig 默认配置
func DefaultConfig() *Config {
return &Config{
Host: eflag.String("host"),
Network: "tcp4",
Port: 9003,
Host: eflag.String("host"),
Network: "tcp4",
Port: 9003,
ConnTcpMetricPorts: []uint64{6379, 3306, 8635, 27017, 9092},
}
}

Expand Down
6 changes: 1 addition & 5 deletions server/egovernor/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,8 @@ func (c *Container) Build(options ...Option) *Component {
option(c)
}
if c.config.EnableConnTcpMetric {
obj, err := emetric.NewTCPStatCollector()
if err != nil {
c.logger.Panic("NewTCPStatCollector fail", elog.FieldErr(err))
}
obj := emetric.NewTCPStatCollector(c.config.ConnTcpMetricPorts)
obj.Update()

Check failure on line 59 in server/egovernor/container.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `obj.Update` is not checked (errcheck)
}

return newComponent(c.name, c.config, c.logger)
}

0 comments on commit 88c19d6

Please sign in to comment.