Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

connection pool: max idle connections implementation #17443

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions go/pools/smartconnpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type RefreshCheck func() (bool, error)

type Config[C Connection] struct {
Capacity int64
MaxIdleCount int64
IdleTimeout time.Duration
MaxLifetime time.Duration
RefreshInterval time.Duration
Expand Down Expand Up @@ -123,6 +124,8 @@ type ConnPool[C Connection] struct {
active atomic.Int64
// capacity is the maximum number of connections that this pool can open
capacity atomic.Int64
// maxIdleCount is the maximum idle connections in the pool
idleCount atomic.Int64

// workers is a waitgroup for all the currently running worker goroutines
workers sync.WaitGroup
Expand All @@ -138,6 +141,8 @@ type ConnPool[C Connection] struct {
// maxCapacity is the maximum value to which capacity can be set; when the pool
// is re-opened, it defaults to this capacity
maxCapacity int64
// maxIdleCount is the maximum idle connections in the pool
maxIdleCount int64
// maxLifetime is the maximum time a connection can be open
maxLifetime atomic.Int64
// idleTimeout is the maximum time a connection can remain idle
Expand All @@ -158,6 +163,7 @@ func NewPool[C Connection](config *Config[C]) *ConnPool[C] {
pool := &ConnPool[C]{}
pool.freshSettingsStack.Store(-1)
pool.config.maxCapacity = config.Capacity
pool.config.maxIdleCount = config.MaxIdleCount
pool.config.maxLifetime.Store(config.MaxLifetime.Nanoseconds())
pool.config.idleTimeout.Store(config.IdleTimeout.Nanoseconds())
pool.config.refreshInterval.Store(config.RefreshInterval.Nanoseconds())
Expand Down Expand Up @@ -192,6 +198,7 @@ func (pool *ConnPool[C]) runWorker(close <-chan struct{}, interval time.Duration
func (pool *ConnPool[C]) open() {
pool.close = make(chan struct{})
pool.capacity.Store(pool.config.maxCapacity)
pool.setIdleCount()

// The expire worker takes care of removing from the waiter list any clients whose
// context has been cancelled.
Expand Down Expand Up @@ -315,6 +322,16 @@ func (pool *ConnPool[C]) MaxCapacity() int64 {
return pool.config.maxCapacity
}

func (pool *ConnPool[C]) setIdleCount() {
capacity := pool.Capacity()
maxIdleCount := pool.config.maxIdleCount
if maxIdleCount == 0 || maxIdleCount > capacity {
pool.idleCount.Store(capacity)
} else {
pool.idleCount.Store(maxIdleCount)
}
}

// InUse returns the number of connections that the pool has lent out to clients and that
// haven't been returned yet.
func (pool *ConnPool[C]) InUse() int64 {
Expand Down Expand Up @@ -396,6 +413,17 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) {
}

if !pool.wait.tryReturnConn(conn) {
// Option 1: do not care if more connections are closed than the allowed idle count
if pool.active.Load() > pool.idleCount.Load() {
conn.Close()
pool.closedConn()
return
}
// Option 2: precisely maintain the idle count
if pool.tryClose(conn) {
return
}

connSetting := conn.Conn.Setting()
if connSetting == nil {
pool.clean.Push(conn)
Expand All @@ -407,6 +435,19 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) {
}
}

func (pool *ConnPool[C]) tryClose(conn *Pooled[C]) bool {
for {
open := pool.active.Load()
if open <= pool.idleCount.Load() {
return false
}
if pool.active.CompareAndSwap(open, open-1) {
conn.Close()
return true
}
}
}

func (pool *ConnPool[D]) extendedMaxLifetime() time.Duration {
maxLifetime := pool.config.maxLifetime.Load()
if maxLifetime == 0 {
Expand Down Expand Up @@ -629,6 +670,7 @@ func (pool *ConnPool[C]) setCapacity(ctx context.Context, newcap int64) error {
if oldcap == newcap {
return nil
}
pool.setIdleCount()

const delay = 10 * time.Millisecond

Expand Down
Loading