-
Notifications
You must be signed in to change notification settings - Fork 0
/
rpc.go
475 lines (408 loc) · 10.9 KB
/
rpc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
package raft
import (
"errors"
"net"
"net/http"
"net/rpc"
"sync"
)
// RPC raft rpc client and register
type RPC interface {
Listen(addr string) error
Serve() error
Register(RPCService) error
Close() error
CallAppendEntries(addr RaftAddr, args AppendEntriesArgs) (AppendEntriesResults, error)
CallRequestVote(addr RaftAddr, args RequestVoteArgs) (RequestVoteResults, error)
}
// RPCService raft rpc service
type RPCService interface {
AppendEntries(args AppendEntriesArgs, results *AppendEntriesResults) error
RequestVote(args RequestVoteArgs, results *RequestVoteResults) error
}
type rpcArgsType int8
const (
_ rpcArgsType = iota
rpcArgsTypeAppendEntriesArgs
rpcArgsTypeAppendEntriesResults
rpcArgsTypeRequestVoteArgs
rpcArgsTypeRequestVoteResults
)
func (t rpcArgsType) String() string {
switch t {
case rpcArgsTypeAppendEntriesArgs:
return "AppendEntriesArgs"
case rpcArgsTypeAppendEntriesResults:
return "AppendEntriesResults"
case rpcArgsTypeRequestVoteArgs:
return "RequestVoteArgs"
case rpcArgsTypeRequestVoteResults:
return "RequestVoteResults"
default:
return "Unknown rpcArgsType"
}
}
// rpcArgs
type rpcArgs interface {
getType() rpcArgsType
getTerm() uint64
}
var _ rpcArgs = AppendEntriesArgs{}
// AppendEntriesArgs
type AppendEntriesArgs struct {
// leader’s term
Term uint64
// so follower can redirect clients
LeaderId RaftId
// index of log entry immediately preceding new ones
PrevLogIndex uint64
// term of prevLogIndex entry
PrevLogTerm uint64
// log entries to store (empty for heartbeat;
// may send more than one for efficiency)
Entries []LogEntry
// leader’s commitIndex
LeaderCommit uint64
}
func (AppendEntriesArgs) getType() rpcArgsType {
return rpcArgsTypeAppendEntriesArgs
}
func (a AppendEntriesArgs) getTerm() uint64 {
return a.Term
}
var _ rpcArgs = AppendEntriesResults{}
// AppendEntriesResults
type AppendEntriesResults struct {
// currentTerm
Term uint64
// for leader to update itself success true
// if follower contained entry matching
Success bool
}
func (AppendEntriesResults) getType() rpcArgsType {
return rpcArgsTypeAppendEntriesResults
}
func (a AppendEntriesResults) getTerm() uint64 {
return a.Term
}
var _ rpcArgs = RequestVoteArgs{}
// RequestVoteArgs
type RequestVoteArgs struct {
// term candidate’s term
Term uint64
// candidateId candidate requesting vote
CandidateId RaftId
// lastLogIndex index of candidate’s last log entry (§5.4)
LastLogIndex uint64
// lastLogTerm term of candidate’s last log entry (§5.4)
LastLogTerm uint64
}
func (RequestVoteArgs) getType() rpcArgsType {
return rpcArgsTypeRequestVoteArgs
}
func (a RequestVoteArgs) getTerm() uint64 {
return a.Term
}
var _ rpcArgs = RequestVoteResults{}
// RequestVoteResults
type RequestVoteResults struct {
// currentTerm, for candidate to update itself
Term uint64
// true means candidate received vote
VoteGranted bool
}
func (RequestVoteResults) getType() rpcArgsType {
return rpcArgsTypeRequestVoteResults
}
func (r RequestVoteResults) getTerm() uint64 {
return r.Term
}
var _ RPCService = (*rpcService)(nil)
// rpcService
type rpcService struct {
mu sync.Mutex
*raft
}
// AppendEntries 实现 AppendEntries RPC
//
// Invoked by leader to replicate log entries (§5.3);
//
// also used as heartbeat (§5.2).
//
// Implementation:
//
// 1. Reply false if term < currentTerm (§5.1)
// 2. Reply false if log doesn’t contain an entry at prevLogIndex
// whose term matches prevLogTerm (§5.3)
// 3. If an existing entry conflicts with a new one (same index
// but different terms), delete the existing entry and all that follow it (§5.3)
// 4. Append any new entries not already in the log
// 5. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)
func (s *rpcService) AppendEntries(args AppendEntriesArgs, results *AppendEntriesResults) error {
s.refreshLastHeartbeat()
s.raft.sendRPCArgs(args)
s.GetServer().ResetTimer()
defer func() {
results.Term = s.GetCurrentTerm()
}()
currentTerm := s.GetCurrentTerm()
// 1. Reply false if term < currentTerm (§5.1)
if args.Term < currentTerm {
return nil
}
// 2. Reply false if log doesn’t contain an entry at prevLogIndex
// whose term matches prevLogTerm (§5.3)
match, err := s.Match(args.PrevLogIndex, args.PrevLogTerm)
if err != nil {
return err
}
if !match {
return nil
}
results.Success = true
// 3. If an existing entry conflicts with a new one (same index
// but different terms), delete the existing entry and all that follow it (§5.3)
// 4. Append any new entries not already in the log
if len(args.Entries) > 0 {
err = s.raft.Log.AppendAfter(args.PrevLogIndex, args.Entries...)
if err != nil {
return err
}
// fallback config if config log entry is delete
config := s.raft.configs.GetConfig()
for config.GetIndex() > args.PrevLogIndex {
err = s.raft.configs.FallbackConfig()
if err != nil {
return err
}
config = s.raft.configs.GetConfig()
}
// Once a given server adds the new configuration entry to its log,
// it uses that configuration for all future decisions
for i, entry := range args.Entries {
index := args.PrevLogIndex + uint64(i) + 1
if entry.Type == logEntryTypeConfig {
config, err := s.raft.configs.NewConfig(index, entry.Command)
if err != nil {
return err
}
s.raft.configs.UseConfig(config)
if config.IsJoint() {
s.raft.debug("~> C(old,new): %v", config)
} else {
s.raft.debug("~> C(new): %v", config)
}
}
}
}
// 5. If leaderCommit > commitIndex,
// set commitIndex = min(leaderCommit, index of last new entry)
s.syncLeaderCommit(args.LeaderCommit)
return nil
}
// RequestVote 实现 RequestVote RPC
//
// Invoked by candidates to gather votes (§5.2).
//
// implementation:
//
// 1. Reply false if term < currentTerm (§5.1)
// 2. Reply false if log doesn’t contain an entry at prevLogIndex
// whose term matches prevLogTerm (§5.3)
// 3. If an existing entry conflicts with a new one (same index
// but different terms), delete the existing entry and all that follow it (§5.3)
// 4. Append any new entries not already in the log
// 5. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)
func (s *rpcService) RequestVote(args RequestVoteArgs, results *RequestVoteResults) error {
if s.isLeaderActive() {
return nil
}
// 加锁, 防止两个 term 相同
// 且比 currentTerm 大的节点同时获得投票
s.mu.Lock()
defer s.mu.Unlock()
s.debug("<- Vote request %s at %d", args.CandidateId, args.Term)
s.sendRPCArgs(args)
s.GetServer().ResetTimer()
defer func() {
results.Term = s.GetCurrentTerm()
if results.VoteGranted {
s.debug("-> Vote up %s at %d", args.CandidateId, args.Term)
s.SetVotedFor(args.CandidateId)
} else {
s.debug("-> Vote down %s at %d", args.CandidateId, args.Term)
}
}()
// 1. Reply false if term < currentTerm (§5.1)
currentTerm := s.GetCurrentTerm()
if args.Term < currentTerm {
return nil
}
// 2. If votedFor is null or candidateId, and candidate’s log is at
// least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)
votedFor := s.GetVotedFor()
if currentTerm == args.Term {
if !(votedFor.isNil() || args.CandidateId == votedFor) {
return nil
}
}
// Raft determines which of two logs is more up-to-date
// by comparing the index and term of the last entries in the
// logs.
//
// If the logs have last entries with different terms, then
// the log with the later term is more up-to-date.
//
// If the logs end with the same term, then whichever log is longer is
// more up-to-date.
index, term, err := s.Last()
if err != nil {
return err
}
if term < args.LastLogTerm {
results.VoteGranted = true
return nil
}
if term == args.LastLogTerm && index <= args.LastLogIndex {
results.VoteGranted = true
return nil
}
return nil
}
func newDefaultRpc() *defaultRPC {
rpc := &defaultRPC{
server: rpc.NewServer(),
}
return rpc
}
var _ RPC = (*defaultRPC)(nil)
// defaultRPC
type defaultRPC struct {
id RaftId
// protect l
mux sync.Mutex
l net.Listener
server *rpc.Server
clients rpcClients
}
func (r *defaultRPC) Listen(addr string) error {
r.mux.Lock()
defer r.mux.Unlock()
var err error
r.l, err = net.Listen("tcp", addr)
if err != nil {
return err
}
return nil
}
func (r *defaultRPC) Serve() error {
return http.Serve(r.l, r.server)
}
func (r *defaultRPC) Register(service RPCService) error {
return r.server.RegisterName("raft", service)
}
func (r *defaultRPC) Close() error {
r.mux.Lock()
defer r.mux.Unlock()
closes := []func() error{
r.l.Close,
r.clients.Close,
}
for _, close := range closes {
_ = close()
}
return nil
}
func (r *defaultRPC) CallAppendEntries(addr RaftAddr, args AppendEntriesArgs) (results AppendEntriesResults, err error) {
client, err := r.clients.Get(addr)
if err != nil {
return results, err
}
err = client.Call("raft.AppendEntries", args, &results)
if errors.Is(err, net.ErrClosed) {
r.clients.Delete(addr)
}
return results, err
}
func (r *defaultRPC) CallRequestVote(addr RaftAddr, args RequestVoteArgs) (results RequestVoteResults, err error) {
client, err := r.clients.Get(addr)
if err != nil {
return results, err
}
err = client.Call("raft.RequestVote", args, &results)
if errors.Is(err, net.ErrClosed) {
r.clients.Delete(addr)
}
return results, err
}
// rpcClients reuse rpc.Client
type rpcClients struct {
mux sync.RWMutex
clients map[RaftAddr]*rpc.Client
closed bool
}
func (c *rpcClients) Get(addr RaftAddr) (*rpc.Client, error) {
c.mux.RLock()
if c.clients != nil {
client, ok := c.clients[addr]
if ok {
c.mux.RUnlock()
return client, nil
}
}
c.mux.RUnlock()
c.mux.Lock()
defer c.mux.Unlock()
if c.clients == nil {
c.clients = make(map[RaftAddr]*rpc.Client)
}
client, err := rpc.DialHTTP("tcp", string(addr))
if err != nil {
return nil, err
}
c.clients[addr] = client
return client, nil
}
func (c *rpcClients) Delete(addr RaftAddr) {
c.mux.Lock()
defer c.mux.Unlock()
if c.clients == nil {
c.clients = make(map[RaftAddr]*rpc.Client)
}
delete(c.clients, addr)
}
func (c *rpcClients) Close() error {
c.mux.Lock()
defer c.mux.Unlock()
if c.closed {
return nil
}
for _, client := range c.clients {
_ = client.Close()
}
c.clients = nil
defer func() { c.closed = true }()
return nil
}
var _ RPC = (*rpcWrapper)(nil)
func newRpcWrapper(raft *raft, rpc RPC) *rpcWrapper {
return &rpcWrapper{
raft: raft,
RPC: rpc,
}
}
// rpcWrapper
type rpcWrapper struct {
*raft
RPC
}
func (w *rpcWrapper) CallAppendEntries(addr RaftAddr, args AppendEntriesArgs) (results AppendEntriesResults, err error) {
results, err = w.RPC.CallAppendEntries(addr, args)
w.raft.sendRPCArgs(results)
return results, err
}
func (w *rpcWrapper) CallRequestVote(addr RaftAddr, args RequestVoteArgs) (results RequestVoteResults, err error) {
results, err = w.RPC.CallRequestVote(addr, args)
w.raft.sendRPCArgs(results)
return results, err
}