Skip to content

Commit

Permalink
Add ability for aggro cache cleanup to only consider Kraken's disk usage
Browse files Browse the repository at this point in the history
  • Loading branch information
Anton-Kalpakchiev committed Nov 26, 2024
1 parent 059a132 commit c332836
Show file tree
Hide file tree
Showing 3 changed files with 245 additions and 88 deletions.
130 changes: 85 additions & 45 deletions lib/store/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -16,6 +16,7 @@ package store
import (
"fmt"
"os"
"strconv"
"sync"
"time"

Expand All @@ -30,19 +31,15 @@ import (

// CleanupConfig defines configuration for periodically cleaning up idle files.
type CleanupConfig struct {
Disabled bool `yaml:"disabled"`
Interval time.Duration `yaml:"interval"` // How often cleanup runs.
TTI time.Duration `yaml:"tti"` // Time to idle based on last access time.
TTL time.Duration `yaml:"ttl"` // Time to live regardless of access. If 0, disables TTL.
AggressiveThreshold int `yaml:"aggressive_threshold"` // The disk util threshold to trigger aggressive cleanup. If 0, disables aggressive cleanup.
AggressiveTTL time.Duration `yaml:"aggressive_ttL"` // Time to live regardless of access if aggressive cleanup is triggered.
Disabled bool `yaml:"disabled"`
Interval time.Duration `yaml:"interval"` // How often cleanup runs.
TTI time.Duration `yaml:"tti"` // Time to idle based on last access time.
TTL time.Duration `yaml:"ttl"` // Time to live regardless of access. If 0, disables TTL.
ExcludeOtherServices bool `yaml:"exclude_other_services"` // Whether to exclude other services from the disk util calculation.
AggressiveThreshold int `yaml:"aggressive_threshold"` // The disk util threshold to trigger aggressive cleanup. If 0, disables aggressive cleanup.
AggressiveTTL time.Duration `yaml:"aggressive_ttL"` // Time to live regardless of access if aggressive cleanup is triggered.
}

type (
// Define a func type for mocking diskSpaceUtil function.
diskSpaceUtilFunc func() (int, error)
)

func (c CleanupConfig) applyDefaults() CleanupConfig {
if c.Interval == 0 {
c.Interval = 30 * time.Minute
Expand Down Expand Up @@ -87,6 +84,7 @@ func newCleanupManager(clk clock.Clock, stats tally.Scope) (*cleanupManager, err
// on the settings in config. op must set the desired states to clean before addJob
// is called.
func (m *cleanupManager) addJob(tag string, config CleanupConfig, op base.FileOp) {
jobStats := m.stats.Tagged(map[string]string{"job": tag})
config = config.applyDefaults()
if config.Disabled {
log.Warnf("Cleanup disabled for %s", op)
Expand All @@ -95,26 +93,17 @@ func (m *cleanupManager) addJob(tag string, config CleanupConfig, op base.FileOp
if config.TTL == 0 {
log.Warnf("TTL disabled for %s", op)
}

if config.AggressiveThreshold == 0 {
log.Warnf("Aggressive cleanup disabled for %s", op)
}

ticker := m.clk.Ticker(config.Interval)

usageGauge := m.stats.Tagged(map[string]string{"job": tag}).Gauge("disk_usage")

go func() {
for {
select {
case <-ticker.C:
log.Debugf("Performing cleanup of %s", op)
ttl := m.checkAggressiveCleanup(op, config, diskspaceutil.DiskSpaceUtil)
usage, err := m.scan(op, config.TTI, ttl)
if err != nil {
log.Errorf("Error scanning %s: %s", op, err)
}
usageGauge.Update(float64(usage))
m.clean(jobStats, config, op)
case <-m.stopc:
ticker.Stop()
return
Expand All @@ -123,32 +112,94 @@ func (m *cleanupManager) addJob(tag string, config CleanupConfig, op base.FileOp
}()
}

func (m *cleanupManager) stop() {
m.stopOnce.Do(func() { close(m.stopc) })
}

// scan scans the op for idle or expired files. Also returns the total disk usage
// of op.
func (m *cleanupManager) scan(
op base.FileOp, tti time.Duration, ttl time.Duration) (usage int64, err error) {
// clean deletes idle files from op based on the config.
func (m *cleanupManager) clean(jobStats tally.Scope, config CleanupConfig, op base.FileOp) {
log.Debugf("Performing cleanup of %s", op)
ttl := m.calculateTTL(jobStats, op, config, calculateDiskUtil)

names, err := op.ListNames()
if err != nil {
return 0, fmt.Errorf("list names: %s", err)
log.Errorf("Error cleaning cache: list names: %v", err)
}

var absUsage int64

for _, name := range names {
info, err := op.GetFileStat(name)
if err != nil {
log.With("name", name).Errorf("Error getting file stat: %s", err)
continue
}
if ready, err := m.readyForDeletion(op, name, info, tti, ttl); err != nil {
if ready, err := m.readyForDeletion(op, name, info, config.TTI, ttl); err != nil {
log.With("name", name).Errorf("Error checking if file expired: %s", err)
} else if ready {
if err := op.DeleteFile(name); err != nil && err != base.ErrFilePersisted {
log.With("name", name).Errorf("Error deleting expired file: %s", err)
}
}
absUsage += info.Size()
}

jobStats.Gauge("disk_usage").Update(float64(absUsage))
}

type diskUtilFn func(op base.FileOp, c CleanupConfig) (int, error)

// calculateTTL returns the TTL used for cleanup based on the config and current disk utilization.
func (m *cleanupManager) calculateTTL(jobStats tally.Scope, op base.FileOp, config CleanupConfig, calculateDiskUtil diskUtilFn) time.Duration {
if config.AggressiveThreshold == 0 {
return config.TTL
}

utilPercent, err := calculateDiskUtil(op, config)
if err != nil {
log.Errorf("Defaulting to normal TTL due to error calculating disk space util of %s: %v", op, err)
return config.TTL
}
jobStats.Tagged(map[string]string{"exclude_other_services": strconv.FormatBool(config.ExcludeOtherServices)}).
Gauge("disk_util").Update(float64(utilPercent))

if utilPercent >= config.AggressiveThreshold {
log.Debugf("Aggressive cleanup of %s triggers with disk space util %d", op, utilPercent)
return config.AggressiveTTL
}
return config.TTL
}

// calculateDiskUtil calculates the disk space utilization based on the config.
// If 'ExcludeOtherServices' is turned on, only this op's utilization of the filesystem is considered.
func calculateDiskUtil(op base.FileOp, config CleanupConfig) (int, error) {
if config.ExcludeOtherServices {
fsSize, err := diskspaceutil.FileSystemSize()
if err != nil {
return 0, err
}
opSize, err := size(op)
if err != nil {
return 0, err
}
utilPercent := int(100 * opSize / fsSize)
return utilPercent, nil
}

utilPercent, err := diskspaceutil.FileSystemUtil()
if err != nil {
return 0, err
}
return utilPercent, nil
}

func size(op base.FileOp) (usage int64, err error) {
names, err := op.ListNames()
if err != nil {
return 0, fmt.Errorf("list names: %s", err)
}
for _, name := range names {
info, err := op.GetFileStat(name)
if err != nil {
log.With("name", name).Errorf("Error getting file stat: %s", err)
continue
}
usage += info.Size()
}
return usage, nil
Expand All @@ -174,17 +225,6 @@ func (m *cleanupManager) readyForDeletion(
return m.clk.Now().Sub(lat.Time) > tti, nil
}

func (m *cleanupManager) checkAggressiveCleanup(op base.FileOp, config CleanupConfig, util diskSpaceUtilFunc) time.Duration {
if config.AggressiveThreshold != 0 {
diskspaceutil, err := util()
if err != nil {
log.Errorf("Error checking disk space util %s: %s", op, err)
return config.TTL
}
if diskspaceutil >= config.AggressiveThreshold {
log.Debugf("Aggressive cleanup of %s triggers with disk space util %d", op, diskspaceutil)
return config.AggressiveTTL
}
}
return config.TTL
func (m *cleanupManager) stop() {
m.stopOnce.Do(func() { close(m.stopc) })
}
Loading

0 comments on commit c332836

Please sign in to comment.