Skip to content

Commit

Permalink
connection pool: max idle connections implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal committed Dec 27, 2024
1 parent 9383943 commit 10025af
Showing 1 changed file with 42 additions and 0 deletions.
42 changes: 42 additions & 0 deletions go/pools/smartconnpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type RefreshCheck func() (bool, error)

type Config[C Connection] struct {
Capacity int64
MaxIdleCount int64
IdleTimeout time.Duration
MaxLifetime time.Duration
RefreshInterval time.Duration
Expand Down Expand Up @@ -123,6 +124,8 @@ type ConnPool[C Connection] struct {
active atomic.Int64
// capacity is the maximum number of connections that this pool can open
capacity atomic.Int64
// maxIdleCount is the maximum idle connections in the pool
idleCount atomic.Int64

// workers is a waitgroup for all the currently running worker goroutines
workers sync.WaitGroup
Expand All @@ -138,6 +141,8 @@ type ConnPool[C Connection] struct {
// maxCapacity is the maximum value to which capacity can be set; when the pool
// is re-opened, it defaults to this capacity
maxCapacity int64
// maxIdleCount is the maximum idle connections in the pool
maxIdleCount int64
// maxLifetime is the maximum time a connection can be open
maxLifetime atomic.Int64
// idleTimeout is the maximum time a connection can remain idle
Expand All @@ -158,6 +163,7 @@ func NewPool[C Connection](config *Config[C]) *ConnPool[C] {
pool := &ConnPool[C]{}
pool.freshSettingsStack.Store(-1)
pool.config.maxCapacity = config.Capacity
pool.config.maxIdleCount = config.MaxIdleCount
pool.config.maxLifetime.Store(config.MaxLifetime.Nanoseconds())
pool.config.idleTimeout.Store(config.IdleTimeout.Nanoseconds())
pool.config.refreshInterval.Store(config.RefreshInterval.Nanoseconds())
Expand Down Expand Up @@ -192,6 +198,7 @@ func (pool *ConnPool[C]) runWorker(close <-chan struct{}, interval time.Duration
func (pool *ConnPool[C]) open() {
pool.close = make(chan struct{})
pool.capacity.Store(pool.config.maxCapacity)
pool.setIdleCount()

// The expire worker takes care of removing from the waiter list any clients whose
// context has been cancelled.
Expand Down Expand Up @@ -315,6 +322,16 @@ func (pool *ConnPool[C]) MaxCapacity() int64 {
return pool.config.maxCapacity
}

func (pool *ConnPool[C]) setIdleCount() {
capacity := pool.Capacity()
maxIdleCount := pool.config.maxIdleCount
if maxIdleCount == 0 || maxIdleCount > capacity {
pool.idleCount.Store(capacity)
} else {
pool.idleCount.Store(maxIdleCount)
}
}

// InUse returns the number of connections that the pool has lent out to clients and that
// haven't been returned yet.
func (pool *ConnPool[C]) InUse() int64 {
Expand Down Expand Up @@ -396,6 +413,17 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) {
}

if !pool.wait.tryReturnConn(conn) {
// Option 1: do not care if more connections are closed than the allowed idle count
if pool.active.Load() > pool.idleCount.Load() {
conn.Close()
pool.closedConn()
return
}
// Option 2: precisely maintain the idle count
if pool.tryClose(conn) {
return
}

connSetting := conn.Conn.Setting()
if connSetting == nil {
pool.clean.Push(conn)
Expand All @@ -407,6 +435,19 @@ func (pool *ConnPool[C]) put(conn *Pooled[C]) {
}
}

func (pool *ConnPool[C]) tryClose(conn *Pooled[C]) bool {
for {
open := pool.active.Load()
if open <= pool.idleCount.Load() {
return false
}
if pool.active.CompareAndSwap(open, open-1) {
conn.Close()
return true
}
}
}

func (pool *ConnPool[D]) extendedMaxLifetime() time.Duration {
maxLifetime := pool.config.maxLifetime.Load()
if maxLifetime == 0 {
Expand Down Expand Up @@ -629,6 +670,7 @@ func (pool *ConnPool[C]) setCapacity(ctx context.Context, newcap int64) error {
if oldcap == newcap {
return nil
}
pool.setIdleCount()

const delay = 10 * time.Millisecond

Expand Down

0 comments on commit 10025af

Please sign in to comment.