Skip to content

Commit

Permalink
Merge pull request #14 from myENA/bugfix/session-churn
Browse files Browse the repository at this point in the history
  • Loading branch information
dcarbone authored Sep 21, 2018
2 parents 17e57f7 + dd3fcf0 commit f4531a2
Show file tree
Hide file tree
Showing 7 changed files with 527 additions and 432 deletions.
4 changes: 1 addition & 3 deletions candidate.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,11 @@ func NewCandidate(client *Client, candidateID, key, ttl string) (*Candidate, err
ID: candidateID,
SessionTTL: ttl,
Client: client.Client,
AutoRun: true,
})
if err != nil {
return nil, err
}

c.Run()

return &Candidate{Candidate: c}, nil
}

Expand Down
239 changes: 129 additions & 110 deletions candidate/candidate.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ type (
//
// Consul API client. If not specified, one will be created using api.DefaultConfig()
Client *api.Client

// AutoRun [optional]
//
// If set to true, the Candidate will immediately enter its election pool after successful construction
AutoRun bool
}

Candidate struct {
Expand All @@ -99,15 +104,19 @@ type (
)

func New(conf *Config) (*Candidate, error) {
var id, kvKey, sessionTTL string
var client *api.Client
var err error
var (
id, kvKey, sessionTTL string
client *api.Client
autoRun bool
err error
)

if conf != nil {
kvKey = conf.KVKey
id = conf.ID
sessionTTL = conf.SessionTTL
client = conf.Client
autoRun = conf.AutoRun
}

if kvKey == "" {
Expand Down Expand Up @@ -148,10 +157,16 @@ func New(conf *Config) (*Candidate, error) {
sessionTTL: sessionTTL,
}

// attempt to create persistent session manager...
if c.session, err = c.createSession(); err != nil {
return nil, err
}

if autoRun {
c.log.Debug("AutoRun enabled")
c.Run()
}

return c, nil
}

Expand Down Expand Up @@ -299,6 +314,10 @@ func (c *Candidate) UpdateWatchers() {
func (c *Candidate) WaitFor(td time.Duration) error {
var err error

if !c.Running() {
return fmt.Errorf("candidate %s is not in running", c.ID())
}

timer := time.NewTimer(td)

waitLoop:
Expand Down Expand Up @@ -335,8 +354,8 @@ func (c *Candidate) WaitUntil(t time.Time) error {
}

// Wait will block until a leader has been elected, regardless of candidate.
func (c *Candidate) Wait() {
c.WaitFor(1<<63 - 1)
func (c *Candidate) Wait() error {
return c.WaitFor(1<<63 - 1)
}

func (c *Candidate) State() State {
Expand All @@ -350,22 +369,28 @@ func (c *Candidate) Running() bool {
return c.State() == StateRunning
}

// sessionUpdate is the receiver for the session update callback
func (c *Candidate) sessionUpdate(update session.Update) {
if c.session != nil && c.session.ID() == update.ID {
if c.Running() {
c.sessionUpdateChan <- update
} else {
c.log.Printf("We are no longer in the running, cannot process session update: %#v", update)
c.mu.Lock()
if c.session.ID() == update.ID {
c.mu.Unlock()
select {
case c.sessionUpdateChan <- update:
default:
c.log.Printf("Unable to push session update onto channel. Update: %#v")
}
} else {
c.log.Printf("Received update from rogue session: %s", update.ID)
c.mu.Unlock()
c.log.Printf("Received update from session %q but our local session is %q...", update.ID, c.session.ID())
}
}

// acquire will attempt to do just that. Caller must hold lock!
func (c *Candidate) acquire(sid string) (bool, error) {
var err error
var elected bool
var (
elected bool
err error
)

kvpValue := &LeaderKVValue{
LeaderAddress: c.id,
Expand Down Expand Up @@ -398,14 +423,15 @@ func (c *Candidate) createSession() (*session.Session, error) {
}

func (c *Candidate) lockRunner() {
var sid string
var elected bool
var resigned chan struct{}
var updated bool
var sessionUpdate session.Update
var err error

sessionErrorsSeen, sessionStoppedUpdatesSeen := 0, 0
// TODO: this could stand for some further cleanup...
var (
sid string
elected, updated bool
resigned chan struct{}
sessionUpdate session.Update
consecutiveSessionErrorCount int
err error
)

// run initial session
c.session.Run()
Expand All @@ -425,100 +451,103 @@ acquisition:
select {
case <-acquireTicker.C:
c.mu.Lock()

if c.session == nil {
// it is possible for the session to be nil if the sessionUpdate case is unable to recreate upon error
// threshold being met

// set elected state to false, will be updated later.
if c.session.Running() {
// if our session manager is still running
if sid = c.session.ID(); sid == "" {
// this should only ever happen very early on in the election process
elected = false
updated = c.elected != nil && *c.elected != elected
c.log.Debugf("Acquire tick: Session does not exist, will try locking again in %d seconds...", int64(interval.Seconds()))
} else if elected, err = c.acquire(sid); err != nil {
// most likely hit due to transport error.
updated = c.elected != nil && *c.elected != elected
c.log.Printf("Acquire tick: Error attempting to acquire lock: %s", err)
} else {
// if c.elected is nil, indicating this is the initial election loop, or if the election state
// changed mark update as true
updated = c.elected == nil || *c.elected != elected
}
} else {
// if we are below the threshold, just try to restart existing session
c.log.Printf("Acquire tick: Session is in stopped state, attempting to restart...")
elected = false

// only send an update if a previous election attempt was successful and our state changed
updated = c.elected != nil && *c.elected != elected
c.session.Run()
}

c.log.Print("Acquire tick: No session, will try to create...")
if c.session, err = c.createSession(); err != nil {
c.log.Printf("Acquire tick: Error creating session, will try again in %d seconds. Err: %s", int64(interval.Seconds()), err)
// if updated
if updated {
if elected {
c.log.Debug("We have won the election")
} else {
c.session.Run()
c.log.Printf("Acquire tick: Session created successfully")
c.log.Debug("We have lost the election")
}
} else if sid = c.session.ID(); sid == "" {
// this should only ever happen very early on in the election process
elected = false
updated = c.elected != nil && *c.elected != elected
c.log.Debugf("Acquire tick: Session does not exist, will try locking again in %d seconds...", int64(interval.Seconds()))
} else if elected, err = c.acquire(sid); err != nil {
// most likely hit due to transport error.
elected = false
updated = c.elected != nil && *c.elected != elected
c.log.Printf("Acquire tick: Error attempting to acquire lock: %s", err)

// update internal state
*c.elected = elected

// send notifications
up.Elected = elected
c.mu.Unlock()

c.watchers.notify(*up)
} else {
updated = c.elected == nil || *c.elected != elected
c.mu.Unlock()
}

c.mu.Unlock()

case sessionUpdate = <-c.sessionUpdateChan:
c.mu.Lock()

// if there was an update either creating or renewing our session
if sessionUpdate.Error != nil {
sessionErrorsSeen++
sessionStoppedUpdatesSeen = 0

c.log.Printf("Session Update: Error (%d in a row): %s", sessionErrorsSeen, sessionUpdate.Error)

// if we breach this threshold, stop our current session and attempt to make a new one next pass
if sessionErrorsSeen >= 2 {
c.log.Print("Session Update: 2 successive errors seen, will construct new session")
c.session.Stop()
// if there was an update either creating or renewing our session
consecutiveSessionErrorCount++
c.log.Printf("Session Update: Error (%d in a row): %s", consecutiveSessionErrorCount, sessionUpdate.Error)
if sessionUpdate.State == session.StateRunning && consecutiveSessionErrorCount > 2 {
// if the session is still running but we've seen more than 2 errors, attempt a stop -> start cycle
c.log.Print("Session Update: 2 successive errors seen, stopping session")
if err = c.session.Stop(); err != nil {
c.log.Printf("Session update: Error stopping session: %s", err)
}
elected = false
c.session = nil
updated = c.elected != nil && *c.elected != elected
}
// do not modify elected state here unless we've breached the threshold. could just be a temporary
// issue
} else if sessionUpdate.State == session.StateStopped {
sessionStoppedUpdatesSeen++
sessionErrorsSeen = 0

// if somehow the session state became stopped (this should basically never happen...), do not attempt
// to kickstart session here. test if we need to update candidate state and notify watchers, then move
// on. next acquire tick will attempt to restart session.
consecutiveSessionErrorCount = 0
elected = false

c.log.Printf("Session Update: Stopped state seen (%d in row): %#v", sessionStoppedUpdatesSeen, sessionUpdate)

if sessionStoppedUpdatesSeen >= 2 {
c.log.Print("Session Update: Stopped state seen 2 successive times, will construct new session")
if c.session, err = c.createSession(); err != nil {
c.log.Printf("Unable to recreate will try again in %d seconds. Err: %s.", int64(interval.Seconds()), err)
} else {
c.log.Print("Session created successfully")
c.session.Run()
}
}
updated = c.elected != nil && *c.elected != elected
c.log.Printf("Session Update: Stopped state seen: %#v", sessionUpdate)
} else {
// if we got a non-error / non-stopped update, there is nothing to do.
consecutiveSessionErrorCount = 0
c.log.Debugf("Session Update: Received %#v", sessionUpdate)
}

updated = c.elected != nil && *c.elected != elected

c.mu.Unlock()

case resigned = <-c.resign:
break acquisition
}

// if updated
if updated {
if elected {
c.log.Debug("We have won the election")
// if updated
if updated {
// this should only ever hit if we breach the error threshold or our session stopped running
if elected {
c.log.Debug("We have won the election")
} else {
c.log.Debug("We have lost the election")
}
// update internal state
*c.elected = elected
// modify update payload
up.Elected = elected
c.mu.Unlock()
// send notifications after unlocking
c.watchers.notify(*up)
} else {
c.log.Debug("We have lost the election")
c.mu.Unlock()
}

// update internal state
*c.elected = elected

// send notifications
up.Elected = elected
c.watchers.notify(*up)
case resigned = <-c.resign:
break acquisition
}
}

Expand All @@ -528,40 +557,30 @@ acquisition:

c.mu.Lock()

// modify state
// modify internal state
*c.elected = false
c.state = StateResigned

// send notifications
up.Elected = false
up.State = StateResigned
c.watchers.notify(*up)

done := make(chan struct{})

// test for session nil, if so spin up new routine that waits for session to be destroyed.
// TODO: do something more clever with the session's update system instead of this chan?
if c.session != nil {
go func() {
// stop session, this might block for a bit
c.session.Stop()
done <- struct{}{}
}()
} else {
done <- struct{}{}
if err = c.session.Stop(); err != nil {
c.log.Printf("Error stopping session: %s", err)
}
}

// release lock before the final steps so the object is usable
c.mu.Unlock()

// if need be, wait for session to term
<-done
close(done)

// notify our caller that we've finished with resignation
// just in case....
if resigned != nil {
// notify caller that we've stopped
resigned <- struct{}{}
}

c.watchers.notify(*up)

c.log.Print("Resigned")
}
Loading

0 comments on commit f4531a2

Please sign in to comment.