Skip to content
This repository has been archived by the owner on Jan 30, 2020. It is now read-only.

Commit

Permalink
Merge pull request #1509 from endocode/tixxdz/fleetctl-replace-unit-v2
Browse files Browse the repository at this point in the history
fleet: add replace unit support
  • Loading branch information
tixxdz committed Apr 25, 2016
2 parents d357cf2 + 23750a1 commit 6132bb0
Show file tree
Hide file tree
Showing 10 changed files with 599 additions and 27 deletions.
22 changes: 21 additions & 1 deletion api/units.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,35 @@ func (ur *unitsResource) set(rw http.ResponseWriter, req *http.Request, item str
return
}

newUnit := false
if eu == nil {
if len(su.Options) == 0 {
err := errors.New("unit does not exist and options field empty")
sendError(rw, http.StatusConflict, err)
return
} else if err := ValidateOptions(su.Options); err != nil {
sendError(rw, http.StatusBadRequest, err)
return
} else {
ur.create(rw, su.Name, &su)
// New valid unit
newUnit = true
}
} else if eu.Name == su.Name && len(su.Options) > 0 {
// There is already a unit with the same name that
// was submitted before. Check their hashes, if they do
// not match then this is probably a new version which
// needs its own new unit entry.
// In the other case if su.Options == 0 then probably we
// don't want to update the Unit options nor its content
// but only set the target job state of the
// corresponding unit, in this case just ignore.
a := schema.MapSchemaUnitOptionsToUnitFile(su.Options)
b := schema.MapSchemaUnitOptionsToUnitFile(eu.Options)
newUnit = !unit.MatchUnitFiles(a, b)
}

if newUnit {
ur.create(rw, su.Name, &su)
return
}

Expand Down
173 changes: 151 additions & 22 deletions fleetctl/fleetctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,15 @@ var (
Full bool
NoLegend bool
NoBlock bool
Replace bool
BlockAttempts int
Fields string
SSHPort int
}{}

// current command being executed
currentCommand string

// used to cache MachineStates
machineStates map[string]*machine.MachineState
)
Expand Down Expand Up @@ -287,6 +291,10 @@ func main() {
}
}

// We use this to know in which context we are,
// submit, load or start
currentCommand = cmd.Name

os.Exit(cmd.Run(cmd.Flags.Args()))

}
Expand Down Expand Up @@ -555,7 +563,7 @@ func getUnitFileFromTemplate(uni *unit.UnitNameInfo, fileName string) (*unit.Uni
}

if tmpl != nil {
warnOnDifferentLocalUnit(fileName, tmpl)
isLocalUnitDifferent(fileName, tmpl, true, false)
uf = schema.MapSchemaUnitOptionsToUnitFile(tmpl.Options)
log.Debugf("Template Unit(%s) found in registry", uni.Template)
} else {
Expand Down Expand Up @@ -663,6 +671,88 @@ func createUnit(name string, uf *unit.UnitFile) (*schema.Unit, error) {
return &u, nil
}

// checkReplaceUnitState checks if the unit should be replaced.
// It takes a Unit object as a parameter.
// It returns 0 on success and if the unit should be replaced, 1 if the
// unit should not be replaced; and any error encountered.
func checkReplaceUnitState(unit *schema.Unit) (int, error) {
// We replace units only for 'submit', 'load' and
// 'start' commands.
allowedReplace := map[string][]job.JobState{
"submit": []job.JobState{
job.JobStateInactive,
},
"load": []job.JobState{
job.JobStateInactive,
job.JobStateLoaded,
},
"start": []job.JobState{
job.JobStateInactive,
job.JobStateLoaded,
job.JobStateLaunched,
},
}

if allowedJobs, ok := allowedReplace[currentCommand]; ok {
for _, j := range allowedJobs {
if job.JobState(unit.DesiredState) == j {
return 0, nil
}
}
// Report back to caller that we are not allowed to
// cross unit transition states
stderr("Warning: can not replace Unit(%s) in state '%s', use the appropriate command", unit.Name, unit.DesiredState)
} else {
// This function should only be called from 'submit',
// 'load' and 'start' upper paths.
return 1, fmt.Errorf("error: replacing units is not supported in this context")
}

return 1, nil
}

// checkUnitCreation checks if the unit should be created.
// It takes a unit file path as a parameter.
// It returns 0 on success and if the unit should be created, 1 if the
// unit should not be created; and any error encountered.
func checkUnitCreation(arg string) (int, error) {
name := unitNameMangle(arg)

// First, check if there already exists a Unit by the given name in the Registry
unit, err := cAPI.Unit(name)
if err != nil {
return 1, fmt.Errorf("error retrieving Unit(%s) from Registry: %v", name, err)
}

// check if the unit is running
if unit == nil {
if sharedFlags.Replace {
log.Debugf("Unit(%s) was not found in Registry", name)
}
// Create a new unit
return 0, nil
}

// if sharedFlags.Replace is not set then we warn in case
// the units differ
different, err := isLocalUnitDifferent(arg, unit, !sharedFlags.Replace, false)

// if sharedFlags.Replace is set then we fail for errors
if sharedFlags.Replace {
if err != nil {
return 1, err
} else if different {
return checkReplaceUnitState(unit)
} else {
stdout("Found same Unit(%s) in Registry, nothing to do", unit.Name)
}
} else if different == false {
log.Debugf("Found same Unit(%s) in Registry, no need to recreate it", name)
}

return 1, nil
}

// lazyCreateUnits iterates over a set of unit names and, for each, attempts to
// ensure that a unit by that name exists in the Registry, by checking a number
// of conditions and acting on the first one that succeeds, in order of:
Expand All @@ -680,14 +770,10 @@ func lazyCreateUnits(args []string) error {
arg = maybeAppendDefaultUnitType(arg)
name := unitNameMangle(arg)

// First, check if there already exists a Unit by the given name in the Registry
u, err := cAPI.Unit(name)
ret, err := checkUnitCreation(arg)
if err != nil {
return fmt.Errorf("error retrieving Unit(%s) from Registry: %v", name, err)
}
if u != nil {
log.Debugf("Found Unit(%s) in Registry, no need to recreate it", name)
warnOnDifferentLocalUnit(arg, u)
return err
} else if ret != 0 {
continue
}

Expand Down Expand Up @@ -726,24 +812,67 @@ func lazyCreateUnits(args []string) error {
return nil
}

func warnOnDifferentLocalUnit(loc string, su *schema.Unit) {
suf := schema.MapSchemaUnitOptionsToUnitFile(su.Options)
if _, err := os.Stat(loc); !os.IsNotExist(err) {
luf, err := getUnitFromFile(loc)
if err == nil && luf.Hash() != suf.Hash() {
stderr("WARNING: Unit %s in registry differs from local unit file %s", su.Name, loc)
return
// matchLocalFileAndUnit compares a file with a Unit
// Returns true if the contents of the file matches the unit one, false
// otherwise; and any error encountered.
func matchLocalFileAndUnit(file string, su *schema.Unit) (bool, error) {
result := false
a := schema.MapSchemaUnitOptionsToUnitFile(su.Options)

_, err := os.Stat(file)
if err == nil {
b, err := getUnitFromFile(file)
if err == nil {
result = unit.MatchUnitFiles(a, b)
}
}
if uni := unit.NewUnitNameInfo(path.Base(loc)); uni != nil && uni.IsInstance() {
file := path.Join(path.Dir(loc), uni.Template)
if _, err := os.Stat(file); !os.IsNotExist(err) {
tmpl, err := getUnitFromFile(file)
if err == nil && tmpl.Hash() != suf.Hash() {
stderr("WARNING: Unit %s in registry differs from local template unit file %s", su.Name, uni.Template)
}

return result, err
}

// isLocalUnitDifferent compares a Unit on the file system with a one
// provided from the Registry.
// isLocalUnitDifferent first tries to load the passed Unit from the
// local file system and compares it with the Unit that is in the
// Registry. If it fails to load that Unit from the filesystem and
// fatal was not set, it will check again if that file name is an
// instance of a template, if so it will load the template Unit and
// compare it with the provided Unit.
// It takes four arguments; a path to the local Unit on the file system,
// the Unit in the registry, a boolean to warn in case the Units differ;
// and a last boolean to fail in case fatal errors happen.
// Returns true if the local Unit on file system is different from the
// one provided, false otherwise; and any error encountered.
func isLocalUnitDifferent(file string, su *schema.Unit, warnIfDifferent bool, fatal bool) (bool, error) {
result, err := matchLocalFileAndUnit(file, su)
if err == nil {
// Warn in case unit differs from local file
if result == false && warnIfDifferent {
stderr("WARNING: Unit %s in registry differs from local unit file %s", su.Name, file)
}
return !result, nil
} else if fatal {
return false, err
}

info := unit.NewUnitNameInfo(path.Base(file))
if info == nil {
return false, fmt.Errorf("error extracting information from unit name %s", file)
} else if !info.IsInstance() {
return false, fmt.Errorf("error Unit %s does not seem to be a template unit", file)
}

templFile := path.Join(path.Dir(file), info.Template)
result, err = matchLocalFileAndUnit(templFile, su)
if err == nil {
// Warn in case unit differs from local template unit file
if result == false && warnIfDifferent {
stderr("WARNING: Unit %s in registry differs from local template unit file %s", su.Name, info.Template)
}
return !result, nil
}

return false, err
}

func lazyLoadUnits(args []string) ([]*schema.Unit, error) {
Expand Down
1 change: 1 addition & 0 deletions fleetctl/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func init() {
cmdLoadUnits.Flags.BoolVar(&sharedFlags.Sign, "sign", false, "DEPRECATED - this option cannot be used")
cmdLoadUnits.Flags.IntVar(&sharedFlags.BlockAttempts, "block-attempts", 0, "Wait until the jobs are loaded, performing up to N attempts before giving up. A value of 0 indicates no limit. Does not apply to global units.")
cmdLoadUnits.Flags.BoolVar(&sharedFlags.NoBlock, "no-block", false, "Do not wait until the jobs have been loaded before exiting. Always the case for global units.")
cmdLoadUnits.Flags.BoolVar(&sharedFlags.Replace, "replace", false, "Replace the old scheduled units in the cluster with new versions.")
}

func runLoadUnits(args []string) (exit int) {
Expand Down
1 change: 1 addition & 0 deletions fleetctl/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func init() {
cmdStartUnit.Flags.BoolVar(&sharedFlags.Sign, "sign", false, "DEPRECATED - this option cannot be used")
cmdStartUnit.Flags.IntVar(&sharedFlags.BlockAttempts, "block-attempts", 0, "Wait until the units are launched, performing up to N attempts before giving up. A value of 0 indicates no limit. Does not apply to global units.")
cmdStartUnit.Flags.BoolVar(&sharedFlags.NoBlock, "no-block", false, "Do not wait until the units have launched before exiting. Always the case for global units.")
cmdStartUnit.Flags.BoolVar(&sharedFlags.Replace, "replace", false, "Replace the already started units in the cluster with new versions.")
}

func runStartUnit(args []string) (exit int) {
Expand Down
1 change: 1 addition & 0 deletions fleetctl/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ Submit a directory of units with glob matching:

func init() {
cmdSubmitUnit.Flags.BoolVar(&sharedFlags.Sign, "sign", false, "DEPRECATED - this option cannot be used")
cmdSubmitUnit.Flags.BoolVar(&sharedFlags.Replace, "replace", false, "Replace the old submitted units in the cluster with new versions.")
}

func runSubmitUnits(args []string) (exit int) {
Expand Down
5 changes: 5 additions & 0 deletions functional/fixtures/units/replace-sync.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[Service]
ExecStartPre=/bin/bash -c "echo 'sync'"
ExecStart=/bin/bash -c "while true; do echo Hello, World!; sleep 1; done"
ExecStop=/bin/bash -c "echo 'stopping'"
ExecStopPost=/bin/bash -c "sleep 3; touch /tmp/fleetSyncReplaceFile"
Loading

0 comments on commit 6132bb0

Please sign in to comment.