Skip to content

Commit

Permalink
cost aware window (#7)
Browse files Browse the repository at this point in the history
* make deque cost aware

* fix test

* add deque kv struct

* add remove in deque test

* update

* fix race

* no deque preallocation

* fix empty deque bug and optimize structs

* update qlen type
  • Loading branch information
Yiling-J authored Apr 26, 2023
1 parent 8266e9b commit 6a80f59
Show file tree
Hide file tree
Showing 11 changed files with 163 additions and 67 deletions.
4 changes: 2 additions & 2 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ const (
)

type Builder[K comparable, V any] struct {
maxsize int64
cost func(V) int64
doorkeeper bool
removalListener func(key K, value V, reason RemoveReason)
maxsize int64
doorkeeper bool
}

func NewBuilder[K comparable, V any](maxsize int64) *Builder[K, V] {
Expand Down
2 changes: 1 addition & 1 deletion cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func TestGetSetDeleteNoRace(t *testing.T) {
key := keys[i]
client.Get(key)
if i%3 == 0 {
client.SetWithTTL(key, key, 1, time.Second*time.Duration(i%25+5))
client.SetWithTTL(key, key, int64(i%10+1), time.Second*time.Duration(i%25+5))
}
if i%5 == 0 {
client.Delete(key)
Expand Down
2 changes: 1 addition & 1 deletion internal/doorkeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (

// doorkeeper is a small bloom-filter-based cache admission policy
type doorkeeper struct {
filter bitvector // our filter bit vector
m uint32 // size of bit vector in bits
k uint32 // distinct hash functions needed
filter bitvector // our filter bit vector
}

func newDoorkeeper(capacity int, falsePositiveRate float64) *doorkeeper {
Expand Down
14 changes: 8 additions & 6 deletions internal/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,30 @@ type ReadBufItem[K comparable, V any] struct {
}
type WriteBufItem[K comparable, V any] struct {
entry *Entry[K, V]
code int8
costChange int64
code int8
rechedule bool
}

type MetaData[K comparable, V any] struct {
root bool
list uint8 // used in slru, probation or protected
prev *Entry[K, V]
next *Entry[K, V]
wheelPrev *Entry[K, V]
wheelNext *Entry[K, V]
root bool
list uint8 // used in slru, probation or protected
}

type Entry[K comparable, V any] struct {
shard uint16
cost atomic.Int64
key K
value V
expire atomic.Int64
meta MetaData[K, V]
cost atomic.Int64
expire atomic.Int64
frequency atomic.Int32
shard uint16
removed bool
deque bool
}

func NewEntry[K comparable, V any](key K, value V, cost int64, expire int64) *Entry[K, V] {
Expand Down
4 changes: 2 additions & 2 deletions internal/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ const (
// List represents a doubly linked list.
// The zero value for List is an empty list ready to use.
type List[K comparable, V any] struct {
bounded bool
listType uint8 // 1 tinylfu list, 2 timerwheel list
root Entry[K, V] // sentinel list element, only &root, root.prev, and root.next are used
len int // current list length excluding (this) sentinel element
capacity uint
bounded bool
listType uint8 // 1 tinylfu list, 2 timerwheel list
}

// New returns an initialized list.
Expand Down
9 changes: 5 additions & 4 deletions internal/singleflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,25 +46,26 @@ func newPanicError(v interface{}) error {

// call is an in-flight or completed singleflight.Do call
type call[V any] struct {
wg sync.WaitGroup

// These fields are written once before the WaitGroup is done
// and are only read after the WaitGroup is done.
val V
err error

chans []chan<- Result
wg sync.WaitGroup

// These fields are read and written with the singleflight
// mutex held before the WaitGroup is done, and are read but
// not written after the WaitGroup is done.
dups int
chans []chan<- Result
dups int
}

// Group represents a class of work and forms a namespace in
// which units of work can be executed with duplicate suppression.
type Group[K comparable, V any] struct {
mu sync.Mutex // protects m
m map[K]*call[V] // lazily initialized
mu sync.Mutex // protects m
}

// Result holds the results of Do, so they can be passed
Expand Down
2 changes: 1 addition & 1 deletion internal/sketch.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package internal

type CountMinSketch struct {
table []uint64
rowCounterSize uint
row64Size uint
rowMask uint
table []uint64
additions uint
sampleSize uint
}
Expand Down
120 changes: 76 additions & 44 deletions internal/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,23 @@ const (

type Shard[K comparable, V any] struct {
hashmap map[K]*Entry[K, V]
mu sync.RWMutex
dookeeper *doorkeeper
deque *deque.Deque[*Entry[K, V]]
group Group[K, Loaded[V]]
size uint
qsize uint
qlen int
counter uint
deque *deque.Deque[*Entry[K, V]]
group Group[K, Loaded[V]]
mu sync.RWMutex
}

func NewShard[K comparable, V any](size uint, qsize uint) *Shard[K, V] {
return &Shard[K, V]{
hashmap: make(map[K]*Entry[K, V], size),
hashmap: make(map[K]*Entry[K, V]),
dookeeper: newDoorkeeper(int(20*size), 0.01),
size: size,
qsize: qsize,
deque: deque.New[*Entry[K, V]](int(qsize)),
deque: deque.New[*Entry[K, V]](),
}
}

Expand Down Expand Up @@ -72,22 +73,22 @@ type Metrics struct {
}

type Store[K comparable, V any] struct {
tailUpdate bool
cap uint
shards []*Shard[K, V]
shardCount uint
entryPool sync.Pool
writebuf chan WriteBufItem[K, V]
hasher *Hasher[K]
removalListener func(key K, value V, reason RemoveReason)
policy *TinyLfu[K, V]
timerwheel *TimerWheel[K, V]
readbuf *Queue[ReadBufItem[K, V]]
readCounter *atomic.Uint32
writebuf chan WriteBufItem[K, V]
hasher *Hasher[K]
entryPool sync.Pool
cost func(V) int64
readCounter *atomic.Uint32
shards []*Shard[K, V]
cap uint
shardCount uint
mlock sync.Mutex
tailUpdate bool
doorkeeper bool
closed bool
removalListener func(key K, value V, reason RemoveReason)
}

// New returns a new data struct with the specified capacity
Expand Down Expand Up @@ -201,17 +202,20 @@ func (s *Store[K, V]) Set(key K, value V, cost int64, ttl time.Duration) bool {
var reschedule bool
var costChange int64
exist.value = value
oldCost := exist.cost.Swap(cost)
if oldCost != cost {
costChange = cost - oldCost
if exist.deque {
shard.qlen += int(costChange)
}
}
shard.mu.Unlock()
if expire > 0 {
old := exist.expire.Swap(expire)
if old != expire {
reschedule = true
}
}
oldCost := exist.cost.Swap(cost)
if oldCost != cost {
costChange = cost - oldCost
}
if reschedule || costChange != 0 {
s.writebuf <- WriteBufItem[K, V]{
entry: exist, code: UPDATE, costChange: costChange, rechedule: reschedule,
Expand Down Expand Up @@ -245,50 +249,73 @@ func (s *Store[K, V]) Set(key K, value V, cost int64, ttl time.Duration) bool {
s.writebuf <- WriteBufItem[K, V]{entry: entry, code: NEW}
return true
}
entry.deque = true
shard.deque.PushFront(entry)
var k K
var v V
if shard.deque.Len() > int(shard.qsize) {
shard.qlen += int(cost)
s.processDeque(shard)
return true
}

type dequeKV[K comparable, V any] struct {
k K
v V
}

func (s *Store[K, V]) processDeque(shard *Shard[K, V]) {
if shard.qlen <= int(shard.qsize) {
shard.mu.Unlock()
return
}
// send to slru
send := make([]*Entry[K, V], 0, 2)
// removed because frequency < slru tail frequency
removedkv := make([]dequeKV[K, V], 0, 2)
// expired
expiredkv := make([]dequeKV[K, V], 0, 2)
// expired
for shard.qlen > int(shard.qsize) {
evicted := shard.deque.PopBack()
evicted.deque = false
expire := evicted.expire.Load()
shard.qlen -= int(evicted.cost.Load())
if expire != 0 && expire <= s.timerwheel.clock.nowNano() {
deleted := shard.delete(evicted)
// double check because entry maybe removed already by Delete API
if deleted {
k, v = evicted.key, evicted.value
s.postDelete(evicted, EXPIRED)
}
shard.mu.Unlock()
if deleted {
if s.removalListener != nil {
s.removalListener(k, v, EXPIRED)
}
expiredkv = append(expiredkv, dequeKV[K, V]{k: evicted.key, v: evicted.value})
s.postDelete(evicted)
}
} else {
count := evicted.frequency.Load()
if count == -1 {
count = 0
}
if int32(count) >= s.policy.threshold.Load() {
shard.mu.Unlock()
s.writebuf <- WriteBufItem[K, V]{entry: evicted, code: NEW}
send = append(send, evicted)
} else {
deleted := shard.delete(evicted)
// double check because entry maybe removed already by Delete API
if deleted {
k, v = evicted.key, evicted.value
s.postDelete(evicted, EXPIRED)
}
shard.mu.Unlock()
if deleted {
if s.removalListener != nil {
s.removalListener(k, v, EVICTED)
}
removedkv = append(
expiredkv, dequeKV[K, V]{k: evicted.key, v: evicted.value},
)
s.postDelete(evicted)
}
}
}
} else {
shard.mu.Unlock()
}
return true
shard.mu.Unlock()
for _, entry := range send {
s.writebuf <- WriteBufItem[K, V]{entry: entry, code: NEW}
}
if s.removalListener != nil {
for _, e := range removedkv {
s.removalListener(e.k, e.v, EVICTED)
}
for _, e := range expiredkv {
s.removalListener(e.k, e.v, EXPIRED)
}
}
}

func (s *Store[K, V]) Delete(key K) {
Expand Down Expand Up @@ -324,7 +351,7 @@ func (s *Store[K, V]) index(key K) (uint64, int) {
return base, int(h & uint64(s.shardCount-1))
}

func (s *Store[K, V]) postDelete(entry *Entry[K, V], reason RemoveReason) {
func (s *Store[K, V]) postDelete(entry *Entry[K, V]) {
var zero V
entry.value = zero
s.entryPool.Put(entry)
Expand Down Expand Up @@ -354,7 +381,7 @@ func (s *Store[K, V]) removeEntry(entry *Entry[K, V], reason RemoveReason) {
if s.removalListener != nil {
s.removalListener(k, v, reason)
}
s.postDelete(entry, reason)
s.postDelete(entry)
}
// already removed from shard map
case REMOVED:
Expand Down Expand Up @@ -410,6 +437,10 @@ func (s *Store[K, V]) maintance() {
// lock free because store API never read/modify entry metadata
switch item.code {
case NEW:
if entry.removed {
s.mlock.Unlock()
continue
}
if entry.expire.Load() != 0 {
s.timerwheel.schedule(entry)
}
Expand All @@ -422,6 +453,7 @@ func (s *Store[K, V]) maintance() {
s.removeEntry(e, EVICTED)
}
case REMOVE:
entry.removed = true
s.removeEntry(entry, REMOVED)
case UPDATE:
if item.rechedule {
Expand Down
Loading

0 comments on commit 6a80f59

Please sign in to comment.