Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add expirable LRU finalizer to fix expirable LRU's goroutine leak #161

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 42 additions & 24 deletions expirable/expirable_lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package expirable

import (
"runtime"
"sync"
"time"

Expand All @@ -13,20 +14,35 @@ import (
// EvictCallback is used to get a callback when a cache entry is evicted
type EvictCallback[K comparable, V any] func(key K, value V)

// LRU implements a thread-safe LRU with expirable entries.
// This is a wrapper around lru[K,V] to make delete-expired routine
// stop when LRU is garbage collected.
//
// This trick ensures that the deleteExpired goroutine (is
// running DeleteExpired on lru forever) does not keep the
// returned LRU object from being garbage collected.
// When it is garbage collected, the finalizer of LRU
// stops the deleteExpired goroutine, after which lru can
// be collected.
type LRU[K comparable, V any] struct {
*lru[K, V]
}

// LRU implements a thread-safe LRU with expirable entries.
type lru[K comparable, V any] struct {
size int
evictList *internal.LruList[K, V]
items map[K]*internal.Entry[K, V]
onEvict EvictCallback[K, V]

// expirable options
mu sync.Mutex
mu sync.Mutex

ttl time.Duration
done chan struct{}

// buckets for expiration
buckets []bucket[K, V]

// uint8 because it's number between 0 and numBuckets
nextCleanupBucket uint8
}
Expand Down Expand Up @@ -59,7 +75,7 @@ func NewLRU[K comparable, V any](size int, onEvict EvictCallback[K, V], ttl time
ttl = noEvictionTTL
}

res := LRU[K, V]{
res := lru[K, V]{
ttl: ttl,
size: size,
evictList: internal.NewList[K, V](),
Expand All @@ -75,9 +91,6 @@ func NewLRU[K comparable, V any](size int, onEvict EvictCallback[K, V], ttl time
}

// enable deleteExpired() running in separate goroutine for cache with non-zero TTL
//
// Important: done channel is never closed, so deleteExpired() goroutine will never exit,
// it's decided to add functionality to close it in the version later than v2.
if res.ttl != noEvictionTTL {
go func(done <-chan struct{}) {
ticker := time.NewTicker(res.ttl / numBuckets)
Expand All @@ -92,12 +105,17 @@ func NewLRU[K comparable, V any](size int, onEvict EvictCallback[K, V], ttl time
}
}(res.done)
}
return &res

wrap := LRU[K, V]{&res}
runtime.SetFinalizer(&wrap, func(c *LRU[K, V]) {
close(c.lru.done)
})
return &wrap
}

// Purge clears the cache completely.
// onEvict is called for each evicted key.
func (c *LRU[K, V]) Purge() {
func (c *lru[K, V]) Purge() {
c.mu.Lock()
defer c.mu.Unlock()
for k, v := range c.items {
Expand All @@ -117,7 +135,7 @@ func (c *LRU[K, V]) Purge() {
// Add adds a value to the cache. Returns true if an eviction occurred.
// Returns false if there was no eviction: the item was already in the cache,
// or the size was not exceeded.
func (c *LRU[K, V]) Add(key K, value V) (evicted bool) {
func (c *lru[K, V]) Add(key K, value V) (evicted bool) {
c.mu.Lock()
defer c.mu.Unlock()
now := time.Now()
Expand Down Expand Up @@ -146,7 +164,7 @@ func (c *LRU[K, V]) Add(key K, value V) (evicted bool) {
}

// Get looks up a key's value from the cache.
func (c *LRU[K, V]) Get(key K) (value V, ok bool) {
func (c *lru[K, V]) Get(key K) (value V, ok bool) {
c.mu.Lock()
defer c.mu.Unlock()
var ent *internal.Entry[K, V]
Expand All @@ -163,7 +181,7 @@ func (c *LRU[K, V]) Get(key K) (value V, ok bool) {

// Contains checks if a key is in the cache, without updating the recent-ness
// or deleting it for being stale.
func (c *LRU[K, V]) Contains(key K) (ok bool) {
func (c *lru[K, V]) Contains(key K) (ok bool) {
c.mu.Lock()
defer c.mu.Unlock()
_, ok = c.items[key]
Expand All @@ -172,7 +190,7 @@ func (c *LRU[K, V]) Contains(key K) (ok bool) {

// Peek returns the key value (or undefined if not found) without updating
// the "recently used"-ness of the key.
func (c *LRU[K, V]) Peek(key K) (value V, ok bool) {
func (c *lru[K, V]) Peek(key K) (value V, ok bool) {
c.mu.Lock()
defer c.mu.Unlock()
var ent *internal.Entry[K, V]
Expand All @@ -188,7 +206,7 @@ func (c *LRU[K, V]) Peek(key K) (value V, ok bool) {

// Remove removes the provided key from the cache, returning if the
// key was contained.
func (c *LRU[K, V]) Remove(key K) bool {
func (c *lru[K, V]) Remove(key K) bool {
c.mu.Lock()
defer c.mu.Unlock()
if ent, ok := c.items[key]; ok {
Expand All @@ -199,7 +217,7 @@ func (c *LRU[K, V]) Remove(key K) bool {
}

// RemoveOldest removes the oldest item from the cache.
func (c *LRU[K, V]) RemoveOldest() (key K, value V, ok bool) {
func (c *lru[K, V]) RemoveOldest() (key K, value V, ok bool) {
c.mu.Lock()
defer c.mu.Unlock()
if ent := c.evictList.Back(); ent != nil {
Expand All @@ -210,7 +228,7 @@ func (c *LRU[K, V]) RemoveOldest() (key K, value V, ok bool) {
}

// GetOldest returns the oldest entry
func (c *LRU[K, V]) GetOldest() (key K, value V, ok bool) {
func (c *lru[K, V]) GetOldest() (key K, value V, ok bool) {
c.mu.Lock()
defer c.mu.Unlock()
if ent := c.evictList.Back(); ent != nil {
Expand All @@ -220,7 +238,7 @@ func (c *LRU[K, V]) GetOldest() (key K, value V, ok bool) {
}

// Keys returns a slice of the keys in the cache, from oldest to newest.
func (c *LRU[K, V]) Keys() []K {
func (c *lru[K, V]) Keys() []K {
c.mu.Lock()
defer c.mu.Unlock()
keys := make([]K, 0, len(c.items))
Expand All @@ -232,7 +250,7 @@ func (c *LRU[K, V]) Keys() []K {

// Values returns a slice of the values in the cache, from oldest to newest.
// Expired entries are filtered out.
func (c *LRU[K, V]) Values() []V {
func (c *lru[K, V]) Values() []V {
c.mu.Lock()
defer c.mu.Unlock()
values := make([]V, len(c.items))
Expand All @@ -249,14 +267,14 @@ func (c *LRU[K, V]) Values() []V {
}

// Len returns the number of items in the cache.
func (c *LRU[K, V]) Len() int {
func (c *lru[K, V]) Len() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.evictList.Length()
}

// Resize changes the cache size. Size of 0 means unlimited.
func (c *LRU[K, V]) Resize(size int) (evicted int) {
func (c *lru[K, V]) Resize(size int) (evicted int) {
c.mu.Lock()
defer c.mu.Unlock()
if size <= 0 {
Expand Down Expand Up @@ -287,14 +305,14 @@ func (c *LRU[K, V]) Resize(size int) (evicted int) {
// }

// removeOldest removes the oldest item from the cache. Has to be called with lock!
func (c *LRU[K, V]) removeOldest() {
func (c *lru[K, V]) removeOldest() {
if ent := c.evictList.Back(); ent != nil {
c.removeElement(ent)
}
}

// removeElement is used to remove a given list element from the cache. Has to be called with lock!
func (c *LRU[K, V]) removeElement(e *internal.Entry[K, V]) {
func (c *lru[K, V]) removeElement(e *internal.Entry[K, V]) {
c.evictList.Remove(e)
delete(c.items, e.Key)
c.removeFromBucket(e)
Expand All @@ -305,7 +323,7 @@ func (c *LRU[K, V]) removeElement(e *internal.Entry[K, V]) {

// deleteExpired deletes expired records from the oldest bucket, waiting for the newest entry
// in it to expire first.
func (c *LRU[K, V]) deleteExpired() {
func (c *lru[K, V]) deleteExpired() {
c.mu.Lock()
bucketIdx := c.nextCleanupBucket
timeToExpire := time.Until(c.buckets[bucketIdx].newestEntry)
Expand All @@ -323,7 +341,7 @@ func (c *LRU[K, V]) deleteExpired() {
}

// addToBucket adds entry to expire bucket so that it will be cleaned up when the time comes. Has to be called with lock!
func (c *LRU[K, V]) addToBucket(e *internal.Entry[K, V]) {
func (c *lru[K, V]) addToBucket(e *internal.Entry[K, V]) {
bucketID := (numBuckets + c.nextCleanupBucket - 1) % numBuckets
e.ExpireBucket = bucketID
c.buckets[bucketID].entries[e.Key] = e
Expand All @@ -333,6 +351,6 @@ func (c *LRU[K, V]) addToBucket(e *internal.Entry[K, V]) {
}

// removeFromBucket removes the entry from its corresponding bucket. Has to be called with lock!
func (c *LRU[K, V]) removeFromBucket(e *internal.Entry[K, V]) {
func (c *lru[K, V]) removeFromBucket(e *internal.Entry[K, V]) {
delete(c.buckets[e.ExpireBucket].entries, e.Key)
}