Skip to content

Commit

Permalink
Merge pull request #183 from tstromberg/persist-on-the-fly
Browse files Browse the repository at this point in the history
Persist items on the fly rather than periodically in bulk
  • Loading branch information
tstromberg authored Jul 13, 2020
2 parents cf8953a + 582ef45 commit dc0f24a
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 89 deletions.
5 changes: 1 addition & 4 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func main() {
Party: tp,
MinRefresh: *minRefresh,
MaxRefresh: *maxRefresh,
PersistFunc: c.Save,
PersistFunc: c.Cleanup,
})

if *dryRun {
Expand All @@ -160,9 +160,6 @@ func main() {
go func() {
for sig := range sigc {
klog.Infof("signal caught: %v", sig)
if err := c.Save(); err != nil {
klog.Errorf("unable to save: %v", err)
}
os.Exit(0)
}
}()
Expand Down
4 changes: 0 additions & 4 deletions cmd/tester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,6 @@ func main() {
} else {
executeRule(ctx, tp)
}

if err := c.Save(); err != nil {
klog.Exitf("persist save to %s: %v", c, err)
}
}

func executeCollection(ctx context.Context, tp *triage.Party) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/persist/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (d *Disk) Initialize() error {
if err := d.load(); err != nil {
klog.Infof("recreating cache due to load error: %v", err)
d.cache = createMem()
if err := d.Save(); err != nil {
if err := d.Cleanup(); err != nil {
return fmt.Errorf("save: %w", err)
}
}
Expand Down Expand Up @@ -84,6 +84,7 @@ func (d *Disk) load() error {
// Set stores a thing into memory
func (d *Disk) Set(key string, t *Thing) error {
setMem(d.cache, key, t)
// Implementation quirk: the disk driver does not persist until Cleanup() is called
return nil
}

Expand All @@ -98,7 +99,7 @@ func (d *Disk) GetNewerThan(key string, t time.Time) *Thing {
return newerThanMem(d.cache, key, t)
}

func (d *Disk) Save() error {
func (d *Disk) Cleanup() error {
items := d.cache.Items()
klog.Infof("*** Saving %d items to disk cache at %s", len(items), d.path)

Expand Down
4 changes: 2 additions & 2 deletions pkg/persist/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (m *Memory) GetNewerThan(key string, t time.Time) *Thing {
return newerThanMem(m.cache, key, t)
}

func (m *Memory) Save() error {
klog.Warningf("Save is not implemented by the memory backend")
func (m *Memory) Cleanup() error {
klog.Warningf("Cleanup is not implemented by the memory backend")
return nil
}
55 changes: 27 additions & 28 deletions pkg/persist/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ func (m *MySQL) loadItems() error {
var item cache.Item
gd := gob.NewDecoder(bytes.NewBuffer(mi.Value))
if err := gd.Decode(&item); err != nil {
return fmt.Errorf("decode: %w", err)
klog.Errorf("decode failed for %s (saved %s, bytes: %d): %v", mi.Key, mi.Saved, len(mi.Value), err)
continue
}
decoded[mi.Key] = item
}
Expand All @@ -115,6 +116,14 @@ func (m *MySQL) loadItems() error {
// Set stores a thing
func (m *MySQL) Set(key string, th *Thing) error {
setMem(m.cache, key, th)

go func() {
err := m.persist(key, th)
if err != nil {
klog.Errorf("failed to persist %s: %s", key, err)
}
}()

return nil
}

Expand All @@ -129,39 +138,29 @@ func (m *MySQL) GetNewerThan(key string, t time.Time) *Thing {
return newerThanMem(m.cache, key, t)
}

func (m *MySQL) Save() error {
start := time.Now()
newerThan := time.Now().Add(-1 * MaxSaveAge)
items := m.cache.Items()
klog.Infof("*** Saving %d items to MySQL", len(items))

for k, v := range items {
th := v.Object.(*Thing)
if th.Created.Before(newerThan) {
klog.Infof("skipping %s (%s is too old)", k, th.Created)
continue
}

b := new(bytes.Buffer)
ge := gob.NewEncoder(b)
if err := ge.Encode(v); err != nil {
return fmt.Errorf("encode: %w", err)
}
// persist writes an thing to MySQL
func (m *MySQL) persist(key string, th *Thing) error {
b := new(bytes.Buffer)
ge := gob.NewEncoder(b)

if _, err := m.db.Exec(`
INSERT INTO persist (k, v, saved) VALUES (?, ?, ?)
ON DUPLICATE KEY UPDATE k=VALUES(k), v=VALUES(v)`,
k, b.Bytes(), start); err != nil {
return fmt.Errorf("sql exec: %v (len=%d)", err, len(b.Bytes()))
}
item := cache.Item{Object: th}
if err := ge.Encode(item); err != nil {
return fmt.Errorf("encode: %w", err)
}

return m.cleanup(newerThan)
_, err := m.db.Exec(`
INSERT INTO persist (k, v, saved) VALUES (?, ?, ?)
ON DUPLICATE KEY UPDATE k=VALUES(k), v=VALUES(v)`, key, b.Bytes(), time.Now())

return err
}

// Cleanup deletes older cache items
func (m *MySQL) cleanup(t time.Time) error {
res, err := m.db.Exec(`DELETE FROM persist WHERE saved < ?`, t)
func (m *MySQL) Cleanup() error {
start := time.Now()
maxAge := start.Add(-1 * MaxSaveAge)

res, err := m.db.Exec(`DELETE FROM persist WHERE saved < ?`, maxAge)

if err != nil {
return fmt.Errorf("delete exec: %w", err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/persist/persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (

var (
// MaxSaveAge is the oldest allowable entry to persist
MaxSaveAge = 30 * 24 * time.Hour
MaxSaveAge = 1 * 24 * time.Hour
// MaxLoadAge is the oldest allowable entry to load
MaxLoadAge = 45 * 24 * time.Hour
MaxLoadAge = 10 * 24 * time.Hour
)

// Config is cache configuration
Expand Down Expand Up @@ -58,7 +58,7 @@ type Cacher interface {
GetNewerThan(string, time.Time) *Thing

Initialize() error
Save() error
Cleanup() error
}

func New(cfg Config) (Cacher, error) {
Expand Down
57 changes: 26 additions & 31 deletions pkg/persist/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ func (m *Postgres) loadItems() error {
var item cache.Item
gd := gob.NewDecoder(bytes.NewBuffer(mi.Value))
if err := gd.Decode(&item); err != nil {
return fmt.Errorf("decode: %w", err)
klog.Errorf("decode failed for %s (saved %s, bytes: %d): %v", mi.Key, mi.Saved, len(mi.Value), err)
continue
}
decoded[mi.Key] = item
}
Expand All @@ -110,6 +111,14 @@ func (m *Postgres) loadItems() error {
// Set stores a thing
func (m *Postgres) Set(key string, th *Thing) error {
setMem(m.cache, key, th)

go func() {
err := m.persist(key, th)
if err != nil {
klog.Errorf("failed to persist %s: %s", key, err)
}
}()

return nil
}

Expand All @@ -124,44 +133,30 @@ func (m *Postgres) GetNewerThan(key string, t time.Time) *Thing {
return newerThanMem(m.cache, key, t)
}

func (m *Postgres) Save() error {
start := time.Now()
maxAge := start.Add(-1 * MaxSaveAge)
items := m.cache.Items()

klog.Infof("*** Saving %d items to Postgres", len(items))
defer func() {
klog.Infof("*** Postgres.Save took %s", time.Since(start))
}()

for k, v := range items {
th := v.Object.(*Thing)
if th.Created.Before(maxAge) {
klog.Infof("skipping %s (%s is too old)", k, th.Created)
continue
}
// persist writes an thing to MySQL
func (m *Postgres) persist(key string, th *Thing) error {
b := new(bytes.Buffer)
ge := gob.NewEncoder(b)

b := new(bytes.Buffer)
ge := gob.NewEncoder(b)
if err := ge.Encode(v); err != nil {
return fmt.Errorf("encode: %w", err)
}
item := cache.Item{Object: th}
if err := ge.Encode(item); err != nil {
return fmt.Errorf("encode: %w", err)
}

if _, err := m.db.Exec(`
_, err := m.db.Exec(`
INSERT INTO persist (k, v, saved) VALUES ($1, $2, $3)
ON CONFLICT (k)
DO UPDATE SET v=EXCLUDED.v, saved=EXCLUDED.saved`,
k, b.Bytes(), start); err != nil {
return fmt.Errorf("sql exec: %v (len=%d)", err, len(b.Bytes()))
}
}
DO UPDATE SET v=EXCLUDED.v, saved=EXCLUDED.saved`, key, b.Bytes(), time.Now())

return m.cleanup(maxAge)
return err
}

// Cleanup deletes older cache items
func (m *Postgres) cleanup(t time.Time) error {
res, err := m.db.Exec(`DELETE FROM persist WHERE saved < $1`, t)
func (m *Postgres) Cleanup() error {
start := time.Now()
maxAge := start.Add(-1 * MaxSaveAge)

res, err := m.db.Exec(`DELETE FROM persist WHERE saved < $1`, maxAge)

if err != nil {
return fmt.Errorf("delete exec: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/site/page.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (h *Handlers) collectionPage(ctx context.Context, id string, refresh bool)
}

if result.RuleResults == nil {
p.Notification = template.HTML(`Downloading data from GitHub ...`)
p.Notification = template.HTML(`Gathering data ...`)
} else if p.ResultAge > h.warnAge {
p.Notification = template.HTML(fmt.Sprintf(`Refreshing data in the background. Displayed data may be up to %s old. Use <a href="https://en.wikipedia.org/wiki/Wikipedia:Bypass_your_cache#Bypassing_cache">Shift-Reload</a> to force a data refresh at any time.`, humanDuration(time.Since(result.OldestInput))))
p.Stale = true
Expand Down
19 changes: 5 additions & 14 deletions pkg/updater/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,32 +274,23 @@ func (u *Updater) Persist() error {
}

func (u *Updater) shouldPersist(updated bool) bool {
// Already running
if !u.persistStart.IsZero() {
if updated {
klog.Infof("still persisting (%s)...", time.Since(u.persistStart))
}
return false
}

if u.updateCycles < 2 {
klog.Infof("Only on cycle %d, will wait longer before persist", u.updateCycles)
// No new data to persist
if !updated {
return false
}

sinceSave := time.Since(u.lastPersist)

// Avoid write contention by fuzzing
fuzz := time.Duration(rand.Intn(int(u.maxRefresh.Seconds()))) * time.Second
cutoff := u.maxRefresh + fuzz
if updated && sinceSave > cutoff {
klog.Infof("New data, and %s since cache has been saved (cutoff=%s)", cutoff, sinceSave)
return true
}

// Fallback for a very quiet repository, or bug that keeps us from realizing an update has occurred
cutoff = (u.maxRefresh * 4) + fuzz
sinceSave := time.Since(u.lastPersist)
if sinceSave > cutoff {
klog.Warningf("No new data, but %s since cache has been saved (cutoff=%s)", cutoff, sinceSave)
klog.Infof("Should persist: we have new data, and it's been %s since the last run", sinceSave)
return true
}

Expand Down

0 comments on commit dc0f24a

Please sign in to comment.