Skip to content
This repository has been archived by the owner on Jan 30, 2020. It is now read-only.

Commit

Permalink
Merge pull request #1561 from endocode/dongsu/fleetd-detect-machine-id
Browse files Browse the repository at this point in the history
fleetd: detect the existing machine ID
  • Loading branch information
tixxdz committed Apr 21, 2016
2 parents 3fa54c5 + 176825f commit d357cf2
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 10 deletions.
100 changes: 100 additions & 0 deletions functional/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/coreos/fleet/functional/platform"
"github.com/coreos/fleet/functional/util"
)

// Simulate the shutdown of a single fleet node
Expand Down Expand Up @@ -82,3 +83,102 @@ func TestNodeShutdown(t *testing.T) {
t.Fatalf("Unit hello.service not reported as inactive:\n%s\n", stdout)
}
}

// TestDetectMachineId checks for etcd registration failing on a duplicated
// machine-id on different machines.
// First it creates a cluster with 2 members, m0 and m1. Then make their
// machine IDs the same as each other, by explicitly setting the m1's ID to
// the same as m0's. Test succeeds when an error returns, while test fails
// when nothing happens.
func TestDetectMachineId(t *testing.T) {
cluster, err := platform.NewNspawnCluster("smoke")
if err != nil {
t.Fatal(err)
}
defer cluster.Destroy()

members, err := platform.CreateNClusterMembers(cluster, 2)
if err != nil {
t.Fatal(err)
}

m0 := members[0]
m1 := members[1]
_, err = cluster.WaitForNMachines(m0, 2)
if err != nil {
t.Fatal(err)
}

machineIdFile := "/etc/machine-id"

// Restart fleet service, and check if its systemd status is still active.
restartFleetService := func(m platform.Member) error {
stdout, err := cluster.MemberCommand(m, "sudo", "systemctl", "restart", "fleet.service")
if err != nil {
return fmt.Errorf("Failed to restart fleet service\nstdout: %s\nerr: %v", stdout, err)
}

stdout, _ = cluster.MemberCommand(m, "systemctl", "show", "--property=ActiveState", "fleet")
if strings.TrimSpace(stdout) != "ActiveState=active" {
return fmt.Errorf("Fleet unit not reported as active: %s", stdout)
}
stdout, _ = cluster.MemberCommand(m, "systemctl", "show", "--property=Result", "fleet")
if strings.TrimSpace(stdout) != "Result=success" {
return fmt.Errorf("Result for fleet unit not reported as success: %s", stdout)
}
return nil
}

stdout, err := cluster.MemberCommand(m0, "cat", machineIdFile)
if err != nil {
t.Fatalf("Failed to get machine-id\nstdout: %s\nerr: %v", stdout, err)
}
m0_machine_id := strings.TrimSpace(stdout)

// If the two machine IDs are different with each other,
// set the m1's ID to the same one as m0, to intentionally
// trigger an error case of duplication of machine ID.
stdout, err = cluster.MemberCommand(m1,
"echo", m0_machine_id, "|", "sudo", "tee", machineIdFile)
if err != nil {
t.Fatalf("Failed to replace machine-id\nstdout: %s\nerr: %v", stdout, err)
}

if err := restartFleetService(m1); err != nil {
t.Fatal(err)
}

// fleetd should actually be running, but failing to list machines.
// So we should expect a specific error after running fleetctl list-machines,
// like "googlapi: Error 503: fleet server unable to communicate with etcd".
stdout, stderr, err := cluster.Fleetctl(m1, "list-machines", "--no-legend")
if err != nil {
if !strings.Contains(err.Error(), "exit status 1") ||
!strings.Contains(stderr, "fleet server unable to communicate with etcd") {
t.Fatalf("m1: Failed to get list of machines. err: %v\nstderr: %s", err, stderr)
}
// If both conditions are satisfied, "exit status 1" and
// "...unable to communicate...", then it's an expected error. PASS.
} else {
t.Fatalf("m1: should get an error, but got success.\nstderr: %s", stderr)
}

// Trigger another test case of m0's ID getting different from m1's.
// Then it's expected that m0 and m1 would be working properly with distinct
// machine IDs, after having restarted fleet.service both on m0 and m1.
stdout, err = cluster.MemberCommand(m0,
"echo", util.NewMachineID(), "|", "sudo", "tee", machineIdFile)
if err != nil {
t.Fatalf("m0: Failed to replace machine-id\nstdout: %s\nerr: %v", stdout, err)
}

// Restart fleet service on m0, and see that it's still working.
if err := restartFleetService(m0); err != nil {
t.Fatal(err)
}

stdout, stderr, err = cluster.Fleetctl(m0, "list-machines", "--no-legend")
if err != nil {
t.Fatalf("m0: error: %v\nstdout: %s\nstderr: %s", err, stdout, stderr)
}
}
8 changes: 1 addition & 7 deletions functional/platform/nspawn.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"time"

"github.com/coreos/fleet/Godeps/_workspace/src/github.com/coreos/go-systemd/dbus"
"github.com/coreos/fleet/Godeps/_workspace/src/github.com/pborman/uuid"

"github.com/coreos/fleet/functional/util"
)
Expand Down Expand Up @@ -384,14 +383,9 @@ func (nc *nspawnCluster) CreateMember() (m Member, err error) {
return nc.createMember(id)
}

func newMachineID() string {
// drop the standard separators to match systemd
return strings.Replace(uuid.New(), "-", "", -1)
}

func (nc *nspawnCluster) createMember(id string) (m Member, err error) {
nm := nspawnMember{
uuid: newMachineID(),
uuid: util.NewMachineID(),
id: id,
ip: fmt.Sprintf("172.18.1.%s", id),
}
Expand Down
7 changes: 7 additions & 0 deletions functional/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"os/exec"
"strings"
"time"

"github.com/coreos/fleet/Godeps/_workspace/src/github.com/pborman/uuid"
)

var fleetctlBinPath string
Expand Down Expand Up @@ -170,3 +172,8 @@ func WaitForState(stateCheckFunc func() bool) (time.Duration, error) {
}
}
}

func NewMachineID() string {
// drop the standard separators to match systemd
return strings.Replace(uuid.New(), "-", "", -1)
}
5 changes: 5 additions & 0 deletions heart/heart.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
type Heart interface {
Beat(time.Duration) (uint64, error)
Clear() error
Register(time.Duration) (uint64, error)
}

func New(reg registry.Registry, mach machine.Machine) Heart {
Expand All @@ -35,6 +36,10 @@ type machineHeart struct {
mach machine.Machine
}

func (h *machineHeart) Register(ttl time.Duration) (uint64, error) {
return h.reg.CreateMachineState(h.mach.State(), ttl)
}

func (h *machineHeart) Beat(ttl time.Duration) (uint64, error) {
return h.reg.SetMachineState(h.mach.State(), ttl)
}
Expand Down
1 change: 1 addition & 0 deletions registry/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

type Registry interface {
ClearUnitHeartbeat(name string)
CreateMachineState(ms machine.MachineState, ttl time.Duration) (uint64, error)
CreateUnit(*job.Unit) error
DestroyUnit(string) error
UnitHeartbeat(name, machID string, ttl time.Duration) error
Expand Down
19 changes: 19 additions & 0 deletions registry/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,25 @@ func (r *EtcdRegistry) Machines() (machines []machine.MachineState, err error) {
return
}

func (r *EtcdRegistry) CreateMachineState(ms machine.MachineState, ttl time.Duration) (uint64, error) {
val, err := marshal(ms)
if err != nil {
return uint64(0), err
}

key := r.prefixed(machinePrefix, ms.ID, "object")
opts := &etcd.SetOptions{
PrevExist: etcd.PrevNoExist,
TTL: ttl,
}
resp, err := r.kAPI.Set(r.ctx(), key, val, opts)
if err != nil {
return uint64(0), err
}

return resp.Node.ModifiedIndex, nil
}

func (r *EtcdRegistry) SetMachineState(ms machine.MachineState, ttl time.Duration) (uint64, error) {
val, err := marshal(ms)
if err != nil {
Expand Down
24 changes: 21 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Server struct {
api *api.Server
disableEngine bool
reconfigServer bool
restartServer bool

engineReconcileInterval time.Duration

Expand Down Expand Up @@ -147,6 +148,7 @@ func New(cfg config.Config, listeners []net.Listener) (*Server, error) {
engineReconcileInterval: eIval,
disableEngine: cfg.DisableEngine,
reconfigServer: false,
restartServer: false,
}

return &srv, nil
Expand Down Expand Up @@ -174,10 +176,20 @@ func (s *Server) Run() {

var err error
for sleep := time.Second; ; sleep = pkg.ExpBackoff(sleep, time.Minute) {
_, err = s.hrt.Beat(s.mon.TTL)
if err == nil {
break
if s.restartServer {
_, err = s.hrt.Beat(s.mon.TTL)
if err == nil {
log.Infof("hrt.Beat() success")
break
}
} else {
_, err = s.hrt.Register(s.mon.TTL)
if err == nil {
log.Infof("hrt.Register() success")
break
}
}
log.Errorf("Server register machine failed: %v", err)
time.Sleep(sleep)
}

Expand Down Expand Up @@ -238,7 +250,9 @@ func (s *Server) Supervise() {
}
if !sd {
log.Infof("Restarting server")
s.SetRestartServer(true)
s.Run()
s.SetRestartServer(false)
}
}

Expand Down Expand Up @@ -275,3 +289,7 @@ func (s *Server) GetApiServerListeners() []net.Listener {
func (s *Server) SetReconfigServer(isReconfigServer bool) {
s.reconfigServer = isReconfigServer
}

func (s *Server) SetRestartServer(isRestartServer bool) {
s.restartServer = isRestartServer
}

0 comments on commit d357cf2

Please sign in to comment.