Skip to content

Commit

Permalink
Optimization: refactor gateway compress and encode data.
Browse files Browse the repository at this point in the history
  • Loading branch information
FGadvancer committed Jan 29, 2024
1 parent 59b70cb commit 2ba13e2
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 29 deletions.
14 changes: 4 additions & 10 deletions internal/msggateway/compressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,10 @@ import (
"compress/gzip"
"errors"
"io"
"sync"

"github.com/OpenIMSDK/tools/utils"
)

var (
gzipWriterPool = sync.Pool{New: func() any { return gzip.NewWriter(nil) }}
gzipReaderPool = sync.Pool{New: func() any { return new(gzip.Reader) }}
)

type Compressor interface {
Compress(rawData []byte) ([]byte, error)
CompressWithPool(rawData []byte) ([]byte, error)
Expand Down Expand Up @@ -58,7 +52,7 @@ func (g *GzipCompressor) Compress(rawData []byte) ([]byte, error) {
}

func (g *GzipCompressor) CompressWithPool(rawData []byte) ([]byte, error) {
gz := gzipWriterPool.Get().(*gzip.Writer)
gz := gzipWriterPool.Get()
defer gzipWriterPool.Put(gz)

gzipBuffer := bytes.Buffer{}
Expand All @@ -74,7 +68,7 @@ func (g *GzipCompressor) CompressWithPool(rawData []byte) ([]byte, error) {
}

func (g *GzipCompressor) CompressWithExternalPool(rawData []byte, compressedData *bytes.Buffer) error {
gz := gzipWriterPool.Get().(*gzip.Writer)
gz := gzipWriterPool.Get()
defer gzipWriterPool.Put(gz)

gz.Reset(compressedData)
Expand Down Expand Up @@ -103,7 +97,7 @@ func (g *GzipCompressor) DeCompress(compressedData []byte) ([]byte, error) {
}

func (g *GzipCompressor) DecompressWithPool(compressedData []byte) ([]byte, error) {
reader := gzipReaderPool.Get().(*gzip.Reader)
reader := gzipReaderPool.Get()
if reader == nil {
return nil, errors.New("NewReader failed")
}
Expand All @@ -123,7 +117,7 @@ func (g *GzipCompressor) DecompressWithPool(compressedData []byte) ([]byte, erro
}

func (g *GzipCompressor) DecompressWithExternalPool(compressedData []byte, rawData *bytes.Buffer) error {
reader := gzipReaderPool.Get().(*gzip.Reader)
reader := gzipReaderPool.Get()
if reader == nil {
return errors.New("NewReader failed")
}
Expand Down
8 changes: 4 additions & 4 deletions internal/msggateway/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ import "time"
type (
Option func(opt *configs)
configs struct {
// 长连接监听端口
// Long connection listening port
port int
// 长连接允许最大链接数
// Maximum number of long connections allowed
maxConnNum int64
// 连接握手超时时间
// Connection handshake timeout
handshakeTimeout time.Duration
// 允许消息最大长度
// Maximum allowed message length
messageMaxMsgLength int
// websocket write buffer, default: 4096, 4kb.
writeBufferSize int
Expand Down
34 changes: 19 additions & 15 deletions internal/msggateway/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,28 @@ package msggateway

import (
"bytes"
"compress/gzip"
"sync"
)

var (
bufferPool = NewPool[*bytes.Buffer](func() *bytes.Buffer { return new(bytes.Buffer) },
func(b *bytes.Buffer) {
b.Reset()
})
reqPool = NewPool[*Req](func() *Req { return new(Req) },
func(r *Req) {
r.Data = nil
r.MsgIncr = ""
r.OperationID = ""
r.ReqIdentifier = 0
r.SendID = ""
r.Token = ""
})
gzipWriterPool = NewPool[*gzip.Writer](func() *gzip.Writer { return gzip.NewWriter(nil) }, nil)
gzipReaderPool = NewPool[*gzip.Reader](func() *gzip.Reader { return new(gzip.Reader) }, nil)
)

// Pool is a generic sync.Pool
type Pool[T any] struct {
pool sync.Pool
Expand Down Expand Up @@ -39,18 +58,3 @@ func (p *Pool[T]) Get() T {
func (p *Pool[T]) Put(item T) {
p.pool.Put(item)
}

var bufferPool = NewPool[*bytes.Buffer](func() *bytes.Buffer { return new(bytes.Buffer) },
func(b *bytes.Buffer) {
b.Reset()
})

var reqPool = NewPool[*Req](func() *Req { return new(Req) },
func(r *Req) {
r.Data = nil
r.MsgIncr = ""
r.OperationID = ""
r.ReqIdentifier = 0
r.SendID = ""
r.Token = ""
})

0 comments on commit 2ba13e2

Please sign in to comment.