-
Notifications
You must be signed in to change notification settings - Fork 20
/
bcache.go
188 lines (159 loc) · 4.03 KB
/
bcache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
package bcache
import (
"errors"
"net"
"strconv"
"time"
"github.com/weaveworks/mesh"
"golang.org/x/sync/singleflight"
)
const (
// weaveworks/mesh channel name
channel = "bcache"
)
var (
// ErrNilFiller returned when GetWithFiller called with nil
// filler func
ErrNilFiller = errors.New("nil filler")
)
// Bcache represents bcache struct
type Bcache struct {
peer *peer
router *mesh.Router
logger Logger
flight singleflight.Group
deletionDelay time.Duration
}
// New creates new bcache from the given config
func New(cfg Config) (*Bcache, error) {
const (
connLimit = 64 // mesh router connection limit
)
if err := cfg.setDefault(); err != nil {
return nil, err
}
var (
peerName = mesh.PeerName(cfg.PeerID)
nickName = cfg.ListenAddr
logger = cfg.Logger
)
// parse host port
host, portStr, err := net.SplitHostPort(cfg.ListenAddr)
if err != nil {
return nil, err
}
port, err := strconv.Atoi(portStr)
if err != nil {
return nil, err
}
// creates mesh router
router, err := mesh.NewRouter(mesh.Config{
Host: host,
Port: port,
ProtocolMinVersion: mesh.ProtocolMinVersion,
ConnLimit: connLimit,
PeerDiscovery: true,
TrustedSubnets: []*net.IPNet{},
}, peerName, nickName, mesh.NullOverlay{}, logger)
if err != nil {
return nil, err
}
// bcache peer
peer, err := newPeer(peerName, cfg.MaxKeys, logger)
if err != nil {
return nil, err
}
// creates gossip channel
gossip, err := router.NewGossip(channel, peer)
if err != nil {
return nil, err
}
peer.register(gossip)
// start mesh router
logger.Printf("mesh router starting at %s", cfg.ListenAddr)
router.Start()
// creates new connection to the provided peers
router.ConnectionMaker.InitiateConnections(cfg.Peers, true)
return &Bcache{
peer: peer,
router: router,
logger: logger,
deletionDelay: time.Duration(cfg.DeletionDelay) * time.Second,
}, nil
}
// Set sets value for the given key with the given ttl in second.
// if ttl <= 0, the key will expired instantly
func (b *Bcache) Set(key, val string, ttl int) {
if ttl <= 0 {
b.Delete(key)
return
}
b.set(key, val, ttl)
}
func (b *Bcache) set(key, val string, ttl int) int64 {
expired := time.Now().Add(time.Duration(ttl) * time.Second).UnixNano()
b.peer.Set(key, val, expired)
return expired
}
// Get gets value for the given key.
//
// It returns the value and true if the key exists
func (b *Bcache) Get(key string) (string, bool) {
return b.peer.Get(key)
}
// Delete the given key.
//
func (b *Bcache) Delete(key string) {
deleteTs := time.Now().Add(b.deletionDelay).UnixNano()
b.peer.Delete(key, deleteTs)
}
// Filler defines func to be called when the given key is not exists
type Filler func(key string) (val string, err error)
// GetWithFiller gets value for the given key and fill the cache
// if the given key is not exists.
//
// `filler` will be used to fill(Set) the cache
// when the given key is not exist.
// Even there are many goroutines which call `GetWithFiller`, the given `Filler` func
// will only called once for each of the key.
//
//
// It useful to avoid cache stampede to the underlying database
func (b *Bcache) GetWithFiller(key string, filler Filler, ttl int) (string, error) {
if filler == nil {
return "", ErrNilFiller
}
// get value from cache
val, ok := b.Get(key)
if ok {
return val, nil
}
// construct singleflight filler
flightFn := func() (interface{}, error) {
val, err := filler(key)
if err != nil {
b.logger.Errorf("filler failed: %v", err)
return nil, err
}
expired := b.set(key, val, ttl)
return value{
value: val,
expired: expired,
}, nil
}
// call the filler
valueIf, err, _ := b.flight.Do(key, func() (interface{}, error) {
return flightFn()
})
if err != nil {
return "", err
}
// return the value
value := valueIf.(value)
return value.value, nil
}
// Close closes the cache, free all the resource
func (b *Bcache) Close() error {
b.logger.Printf("mesh router stopping")
return b.router.Stop()
}