diff --git a/api/units.go b/api/units.go index 67b4a9537..654084299 100644 --- a/api/units.go +++ b/api/units.go @@ -101,15 +101,35 @@ func (ur *unitsResource) set(rw http.ResponseWriter, req *http.Request, item str return } + newUnit := false if eu == nil { if len(su.Options) == 0 { err := errors.New("unit does not exist and options field empty") sendError(rw, http.StatusConflict, err) + return } else if err := ValidateOptions(su.Options); err != nil { sendError(rw, http.StatusBadRequest, err) + return } else { - ur.create(rw, su.Name, &su) + // New valid unit + newUnit = true } + } else if eu.Name == su.Name && len(su.Options) > 0 { + // There is already a unit with the same name that + // was submitted before. Check their hashes, if they do + // not match then this is probably a new version which + // needs its own new unit entry. + // In the other case if su.Options == 0 then probably we + // don't want to update the Unit options nor its content + // but only set the target job state of the + // corresponding unit, in this case just ignore. + a := schema.MapSchemaUnitOptionsToUnitFile(su.Options) + b := schema.MapSchemaUnitOptionsToUnitFile(eu.Options) + newUnit = !unit.MatchUnitFiles(a, b) + } + + if newUnit { + ur.create(rw, su.Name, &su) return } diff --git a/fleetctl/fleetctl.go b/fleetctl/fleetctl.go index 69513e04f..d6503c9ef 100644 --- a/fleetctl/fleetctl.go +++ b/fleetctl/fleetctl.go @@ -103,11 +103,15 @@ var ( Full bool NoLegend bool NoBlock bool + Replace bool BlockAttempts int Fields string SSHPort int }{} + // current command being executed + currentCommand string + // used to cache MachineStates machineStates map[string]*machine.MachineState ) @@ -287,6 +291,10 @@ func main() { } } + // We use this to know in which context we are, + // submit, load or start + currentCommand = cmd.Name + os.Exit(cmd.Run(cmd.Flags.Args())) } @@ -555,7 +563,7 @@ func getUnitFileFromTemplate(uni *unit.UnitNameInfo, fileName string) (*unit.Uni } if tmpl != nil { - warnOnDifferentLocalUnit(fileName, tmpl) + isLocalUnitDifferent(fileName, tmpl, true, false) uf = schema.MapSchemaUnitOptionsToUnitFile(tmpl.Options) log.Debugf("Template Unit(%s) found in registry", uni.Template) } else { @@ -663,6 +671,88 @@ func createUnit(name string, uf *unit.UnitFile) (*schema.Unit, error) { return &u, nil } +// checkReplaceUnitState checks if the unit should be replaced. +// It takes a Unit object as a parameter. +// It returns 0 on success and if the unit should be replaced, 1 if the +// unit should not be replaced; and any error encountered. +func checkReplaceUnitState(unit *schema.Unit) (int, error) { + // We replace units only for 'submit', 'load' and + // 'start' commands. + allowedReplace := map[string][]job.JobState{ + "submit": []job.JobState{ + job.JobStateInactive, + }, + "load": []job.JobState{ + job.JobStateInactive, + job.JobStateLoaded, + }, + "start": []job.JobState{ + job.JobStateInactive, + job.JobStateLoaded, + job.JobStateLaunched, + }, + } + + if allowedJobs, ok := allowedReplace[currentCommand]; ok { + for _, j := range allowedJobs { + if job.JobState(unit.DesiredState) == j { + return 0, nil + } + } + // Report back to caller that we are not allowed to + // cross unit transition states + stderr("Warning: can not replace Unit(%s) in state '%s', use the appropriate command", unit.Name, unit.DesiredState) + } else { + // This function should only be called from 'submit', + // 'load' and 'start' upper paths. + return 1, fmt.Errorf("error: replacing units is not supported in this context") + } + + return 1, nil +} + +// checkUnitCreation checks if the unit should be created. +// It takes a unit file path as a parameter. +// It returns 0 on success and if the unit should be created, 1 if the +// unit should not be created; and any error encountered. +func checkUnitCreation(arg string) (int, error) { + name := unitNameMangle(arg) + + // First, check if there already exists a Unit by the given name in the Registry + unit, err := cAPI.Unit(name) + if err != nil { + return 1, fmt.Errorf("error retrieving Unit(%s) from Registry: %v", name, err) + } + + // check if the unit is running + if unit == nil { + if sharedFlags.Replace { + log.Debugf("Unit(%s) was not found in Registry", name) + } + // Create a new unit + return 0, nil + } + + // if sharedFlags.Replace is not set then we warn in case + // the units differ + different, err := isLocalUnitDifferent(arg, unit, !sharedFlags.Replace, false) + + // if sharedFlags.Replace is set then we fail for errors + if sharedFlags.Replace { + if err != nil { + return 1, err + } else if different { + return checkReplaceUnitState(unit) + } else { + stdout("Found same Unit(%s) in Registry, nothing to do", unit.Name) + } + } else if different == false { + log.Debugf("Found same Unit(%s) in Registry, no need to recreate it", name) + } + + return 1, nil +} + // lazyCreateUnits iterates over a set of unit names and, for each, attempts to // ensure that a unit by that name exists in the Registry, by checking a number // of conditions and acting on the first one that succeeds, in order of: @@ -680,14 +770,10 @@ func lazyCreateUnits(args []string) error { arg = maybeAppendDefaultUnitType(arg) name := unitNameMangle(arg) - // First, check if there already exists a Unit by the given name in the Registry - u, err := cAPI.Unit(name) + ret, err := checkUnitCreation(arg) if err != nil { - return fmt.Errorf("error retrieving Unit(%s) from Registry: %v", name, err) - } - if u != nil { - log.Debugf("Found Unit(%s) in Registry, no need to recreate it", name) - warnOnDifferentLocalUnit(arg, u) + return err + } else if ret != 0 { continue } @@ -726,24 +812,67 @@ func lazyCreateUnits(args []string) error { return nil } -func warnOnDifferentLocalUnit(loc string, su *schema.Unit) { - suf := schema.MapSchemaUnitOptionsToUnitFile(su.Options) - if _, err := os.Stat(loc); !os.IsNotExist(err) { - luf, err := getUnitFromFile(loc) - if err == nil && luf.Hash() != suf.Hash() { - stderr("WARNING: Unit %s in registry differs from local unit file %s", su.Name, loc) - return +// matchLocalFileAndUnit compares a file with a Unit +// Returns true if the contents of the file matches the unit one, false +// otherwise; and any error encountered. +func matchLocalFileAndUnit(file string, su *schema.Unit) (bool, error) { + result := false + a := schema.MapSchemaUnitOptionsToUnitFile(su.Options) + + _, err := os.Stat(file) + if err == nil { + b, err := getUnitFromFile(file) + if err == nil { + result = unit.MatchUnitFiles(a, b) } } - if uni := unit.NewUnitNameInfo(path.Base(loc)); uni != nil && uni.IsInstance() { - file := path.Join(path.Dir(loc), uni.Template) - if _, err := os.Stat(file); !os.IsNotExist(err) { - tmpl, err := getUnitFromFile(file) - if err == nil && tmpl.Hash() != suf.Hash() { - stderr("WARNING: Unit %s in registry differs from local template unit file %s", su.Name, uni.Template) - } + + return result, err +} + +// isLocalUnitDifferent compares a Unit on the file system with a one +// provided from the Registry. +// isLocalUnitDifferent first tries to load the passed Unit from the +// local file system and compares it with the Unit that is in the +// Registry. If it fails to load that Unit from the filesystem and +// fatal was not set, it will check again if that file name is an +// instance of a template, if so it will load the template Unit and +// compare it with the provided Unit. +// It takes four arguments; a path to the local Unit on the file system, +// the Unit in the registry, a boolean to warn in case the Units differ; +// and a last boolean to fail in case fatal errors happen. +// Returns true if the local Unit on file system is different from the +// one provided, false otherwise; and any error encountered. +func isLocalUnitDifferent(file string, su *schema.Unit, warnIfDifferent bool, fatal bool) (bool, error) { + result, err := matchLocalFileAndUnit(file, su) + if err == nil { + // Warn in case unit differs from local file + if result == false && warnIfDifferent { + stderr("WARNING: Unit %s in registry differs from local unit file %s", su.Name, file) + } + return !result, nil + } else if fatal { + return false, err + } + + info := unit.NewUnitNameInfo(path.Base(file)) + if info == nil { + return false, fmt.Errorf("error extracting information from unit name %s", file) + } else if !info.IsInstance() { + return false, fmt.Errorf("error Unit %s does not seem to be a template unit", file) + } + + templFile := path.Join(path.Dir(file), info.Template) + result, err = matchLocalFileAndUnit(templFile, su) + if err == nil { + // Warn in case unit differs from local template unit file + if result == false && warnIfDifferent { + stderr("WARNING: Unit %s in registry differs from local template unit file %s", su.Name, info.Template) } + return !result, nil } + + return false, err } func lazyLoadUnits(args []string) ([]*schema.Unit, error) { diff --git a/fleetctl/load.go b/fleetctl/load.go index 9b1127dab..94b78c489 100644 --- a/fleetctl/load.go +++ b/fleetctl/load.go @@ -43,6 +43,7 @@ func init() { cmdLoadUnits.Flags.BoolVar(&sharedFlags.Sign, "sign", false, "DEPRECATED - this option cannot be used") cmdLoadUnits.Flags.IntVar(&sharedFlags.BlockAttempts, "block-attempts", 0, "Wait until the jobs are loaded, performing up to N attempts before giving up. A value of 0 indicates no limit. Does not apply to global units.") cmdLoadUnits.Flags.BoolVar(&sharedFlags.NoBlock, "no-block", false, "Do not wait until the jobs have been loaded before exiting. Always the case for global units.") + cmdLoadUnits.Flags.BoolVar(&sharedFlags.Replace, "replace", false, "Replace the old scheduled units in the cluster with new versions.") } func runLoadUnits(args []string) (exit int) { diff --git a/fleetctl/start.go b/fleetctl/start.go index 99e03c2d1..c053f1780 100644 --- a/fleetctl/start.go +++ b/fleetctl/start.go @@ -51,6 +51,7 @@ func init() { cmdStartUnit.Flags.BoolVar(&sharedFlags.Sign, "sign", false, "DEPRECATED - this option cannot be used") cmdStartUnit.Flags.IntVar(&sharedFlags.BlockAttempts, "block-attempts", 0, "Wait until the units are launched, performing up to N attempts before giving up. A value of 0 indicates no limit. Does not apply to global units.") cmdStartUnit.Flags.BoolVar(&sharedFlags.NoBlock, "no-block", false, "Do not wait until the units have launched before exiting. Always the case for global units.") + cmdStartUnit.Flags.BoolVar(&sharedFlags.Replace, "replace", false, "Replace the already started units in the cluster with new versions.") } func runStartUnit(args []string) (exit int) { diff --git a/fleetctl/submit.go b/fleetctl/submit.go index 1297cff50..9b563aecf 100644 --- a/fleetctl/submit.go +++ b/fleetctl/submit.go @@ -33,6 +33,7 @@ Submit a directory of units with glob matching: func init() { cmdSubmitUnit.Flags.BoolVar(&sharedFlags.Sign, "sign", false, "DEPRECATED - this option cannot be used") + cmdSubmitUnit.Flags.BoolVar(&sharedFlags.Replace, "replace", false, "Replace the old submitted units in the cluster with new versions.") } func runSubmitUnits(args []string) (exit int) { diff --git a/functional/fixtures/units/replace-sync.service b/functional/fixtures/units/replace-sync.service new file mode 100644 index 000000000..cb8df8c4a --- /dev/null +++ b/functional/fixtures/units/replace-sync.service @@ -0,0 +1,5 @@ +[Service] +ExecStartPre=/bin/bash -c "echo 'sync'" +ExecStart=/bin/bash -c "while true; do echo Hello, World!; sleep 1; done" +ExecStop=/bin/bash -c "echo 'stopping'" +ExecStopPost=/bin/bash -c "sleep 3; touch /tmp/fleetSyncReplaceFile" diff --git a/functional/unit_action_test.go b/functional/unit_action_test.go index 891ec2479..01674c17f 100644 --- a/functional/unit_action_test.go +++ b/functional/unit_action_test.go @@ -17,6 +17,7 @@ package functional import ( "fmt" "io/ioutil" + "os" "path" "reflect" "sort" @@ -27,6 +28,13 @@ import ( "github.com/coreos/fleet/functional/util" ) +const ( + tmpHelloService = "/tmp/hello.service" + fxtHelloService = "fixtures/units/hello.service" + tmpFixtures = "/tmp/fixtures" + numUnitsReplace = 9 +) + // TestUnitRunnable is the simplest test possible, deplying a single-node // cluster and ensuring a unit can enter an 'active' state func TestUnitRunnable(t *testing.T) { @@ -128,6 +136,42 @@ func TestUnitStart(t *testing.T) { } } +// TestUnitSubmitReplace() tests whether a command "fleetctl submit --replace +// hello.service" works or not. +func TestUnitSubmitReplace(t *testing.T) { + if err := replaceUnitCommon("submit"); err != nil { + t.Fatal(err) + } + + if err := replaceUnitMultiple("submit", numUnitsReplace); err != nil { + t.Fatal(err) + } +} + +// TestUnitLoadReplace() tests whether a command "fleetctl load --replace +// hello.service" works or not. +func TestUnitLoadReplace(t *testing.T) { + if err := replaceUnitCommon("load"); err != nil { + t.Fatal(err) + } + + if err := replaceUnitMultiple("load", numUnitsReplace); err != nil { + t.Fatal(err) + } +} + +// TestUnitStartReplace() tests whether a command "fleetctl start --replace +// hello.service" works or not. +func TestUnitStartReplace(t *testing.T) { + if err := replaceUnitCommon("start"); err != nil { + t.Fatal(err) + } + + if err := replaceUnitMultiple("start", numUnitsReplace); err != nil { + t.Fatal(err) + } +} + func TestUnitSSHActions(t *testing.T) { cluster, err := platform.NewNspawnCluster("smoke") if err != nil { @@ -458,3 +502,328 @@ func doMultipleUnitsCmd(cluster platform.Cluster, m platform.Member, cmd string, return nil } + +// replaceUnitCommon() tests whether a command "fleetctl {submit,load,start} +// --replace hello.service" works or not. +func replaceUnitCommon(cmd string) error { + // check if cmd is one of the supported commands. + listCmds := []string{"submit", "load", "start"} + found := false + for _, ccmd := range listCmds { + if ccmd == cmd { + found = true + } + } + if !found { + return fmt.Errorf("invalid command %s", cmd) + } + + cluster, err := platform.NewNspawnCluster("smoke") + if err != nil { + return fmt.Errorf("%v", err) + } + defer cluster.Destroy() + + m, err := cluster.CreateMember() + if err != nil { + return fmt.Errorf("%v", err) + } + _, err = cluster.WaitForNMachines(m, 1) + if err != nil { + return fmt.Errorf("%v", err) + } + + WaitForNUnitsCmd := func(cmd string, expectedUnits int) (err error) { + if cmd == "submit" { + _, err = cluster.WaitForNUnitFiles(m, expectedUnits) + } else { + _, err = cluster.WaitForNUnits(m, expectedUnits) + } + return err + } + + // run a command for a unit and assert it shows up + if _, _, err := cluster.Fleetctl(m, cmd, fxtHelloService); err != nil { + return fmt.Errorf("Unable to %s fleet unit: %v", cmd, err) + } + if err := WaitForNUnitsCmd(cmd, 1); err != nil { + return fmt.Errorf("Did not find 1 unit in cluster: %v", err) + } + + helloFilename := path.Base(tmpHelloService) + + // store content of hello.service to bodyOrig + bodyOrig, _, err := cluster.Fleetctl(m, "cat", helloFilename) + if err != nil { + return fmt.Errorf("Failed to run cat %s: %v", helloFilename, err) + } + + // replace the unit and assert it shows up + err = util.GenNewFleetService(tmpHelloService, fxtHelloService, "sleep 2", "sleep 1") + if err != nil { + return fmt.Errorf("Failed to generate a temp fleet service: %v", err) + } + if _, _, err := cluster.Fleetctl(m, cmd, "--replace", tmpHelloService); err != nil { + return fmt.Errorf("Unable to replace fleet unit: %v", err) + } + if err := WaitForNUnitsCmd(cmd, 1); err != nil { + return fmt.Errorf("Did not find 1 unit in cluster: %v", err) + } + + // store content of the replaced unit hello.service to bodyNew + bodyNew, _, err := cluster.Fleetctl(m, "cat", helloFilename) + if err != nil { + return fmt.Errorf("Failed to run cat %s: %v", helloFilename, err) + } + + if bodyOrig == bodyNew { + return fmt.Errorf("Error. the unit %s has not been replaced.", helloFilename) + } + + os.Remove(tmpHelloService) + + if _, _, err := cluster.Fleetctl(m, "destroy", fxtHelloService); err != nil { + return fmt.Errorf("Failed to destroy unit: %v", err) + } + if err := WaitForNUnitsCmd(cmd, 0); err != nil { + return fmt.Errorf("Failed to get every unit to be cleaned up: %v", err) + } + + return nil +} + +// replaceUnitMultiple() tests whether a command "fleetctl {submit,load,start} +// --replace hello.service" works or not. +func replaceUnitMultiple(cmd string, n int) error { + // check if cmd is one of the supported commands. + listCmds := []string{"submit", "load", "start"} + found := false + for _, ccmd := range listCmds { + if ccmd == cmd { + found = true + } + } + if !found { + return fmt.Errorf("invalid command %s", cmd) + } + + cluster, err := platform.NewNspawnCluster("smoke") + if err != nil { + return fmt.Errorf("%v", err) + } + defer cluster.Destroy() + + m, err := cluster.CreateMember() + if err != nil { + return fmt.Errorf("%v", err) + } + _, err = cluster.WaitForNMachines(m, 1) + if err != nil { + return fmt.Errorf("%v", err) + } + + WaitForNUnitsCmd := func(cmd string, expectedUnits int) (err error) { + if cmd == "submit" { + _, err = cluster.WaitForNUnitFiles(m, expectedUnits) + } else { + _, err = cluster.WaitForNUnits(m, expectedUnits) + } + return err + } + + if _, err := os.Stat(tmpFixtures); os.IsNotExist(err) { + os.Mkdir(tmpFixtures, 0755) + } + + var stdout string + var bodiesOrig []string + for i := 1; i <= n; i++ { + curHelloService := fmt.Sprintf("/tmp/hello%d.service", i) + tmpHelloFixture := fmt.Sprintf("/tmp/fixtures/hello%d.service", i) + + // generate a new service derived by fixtures, and store it under /tmp + err = util.CopyFile(tmpHelloFixture, fxtHelloService) + if err != nil { + return fmt.Errorf("Failed to copy a temp fleet service: %v", err) + } + + // run a command for a unit and assert it shows up + if _, _, err := cluster.Fleetctl(m, cmd, tmpHelloFixture); err != nil { + return fmt.Errorf("Unable to %s fleet unit: %v", cmd, err) + } + if err := WaitForNUnitsCmd(cmd, i); err != nil { + return fmt.Errorf("Did not find %d units in cluster: \n%s", i, stdout) + } + + helloFilename := path.Base(curHelloService) + + // retrieve content of hello.service, and append to bodiesOrig[] + bodyCur, _, err := cluster.Fleetctl(m, "cat", helloFilename) + if err != nil { + return fmt.Errorf("Failed to run cat %s: %v", helloFilename, err) + } + + bodiesOrig = append(bodiesOrig, bodyCur) + + // generate a new service derived by fixtures, and store it under /tmp + err = util.GenNewFleetService(curHelloService, fxtHelloService, "sleep 2", "sleep 1") + if err != nil { + return fmt.Errorf("Failed to generate a temp fleet service: %v", err) + } + } + + for i := 1; i <= n; i++ { + curHelloService := fmt.Sprintf("/tmp/hello%d.service", i) + + // replace the unit and assert it shows up + if _, _, err = cluster.Fleetctl(m, cmd, "--replace", curHelloService); err != nil { + return fmt.Errorf("Unable to replace fleet unit: %v", err) + } + if err := WaitForNUnitsCmd(cmd, n); err != nil { + return fmt.Errorf("Did not find %d units in cluster: \n%s", n, stdout) + } + + helloFilename := path.Base(curHelloService) + + // retrieve content of hello.service, and compare it with the + // correspondent entry in bodiesOrig[] + bodyCur, _, err := cluster.Fleetctl(m, "cat", helloFilename) + if err != nil { + return fmt.Errorf("Failed to run cat %s: %v", helloFilename, err) + } + + if bodiesOrig[i-1] == bodyCur { + return fmt.Errorf("Error. the unit %s has not been replaced.", helloFilename) + } + } + + // clean up temp services under /tmp + for i := 1; i <= n; i++ { + curHelloService := fmt.Sprintf("/tmp/hello%d.service", i) + + if _, _, err := cluster.Fleetctl(m, "destroy", curHelloService); err != nil { + fmt.Printf("Failed to destroy unit: %v", err) + continue + } + + os.Remove(curHelloService) + } + + if err := WaitForNUnitsCmd(cmd, 0); err != nil { + return fmt.Errorf("Failed to get every unit to be cleaned up: %v", err) + } + + os.Remove(tmpFixtures) + + return nil +} + +// TestReplaceSerialization tests if the ExecStartPre of the new version +// of the unit when it replaces the old one is excuted after +// ExecStopPost of the old version. +// This test is to make sure that two versions of the same unit will not +// conflict with each other, that the directives are always serialized, +// and it tries its best to avoid the following scenarios: +// https://github.com/coreos/fleet/issues/1000 +// https://github.com/systemd/systemd/issues/518 +// Now we can't guarantee that that behaviour will not be triggered by +// another external operation, but at least from the Unit replace +// feature context we try to avoid it. +func TestReplaceSerialization(t *testing.T) { + cluster, err := platform.NewNspawnCluster("smoke") + if err != nil { + t.Fatal(err) + } + defer cluster.Destroy() + + m, err := cluster.CreateMember() + if err != nil { + t.Fatal(err) + } + + _, err = cluster.WaitForNMachines(m, 1) + if err != nil { + t.Fatal(err) + } + + tmpSyncFile := "/tmp/fleetSyncReplaceFile" + syncOld := "echo 'sync'" + syncNew := fmt.Sprintf("test -f %s", tmpSyncFile) + tmpSyncService := "/tmp/replace-sync.service" + syncService := "fixtures/units/replace-sync.service" + + stdout, stderr, err := cluster.Fleetctl(m, "start", syncService) + if err != nil { + t.Fatalf("Unable to start unit: \nstdout: %s\nstderr: %s\nerr: %v", stdout, stderr, err) + } + + _, err = cluster.WaitForNActiveUnits(m, 1) + if err != nil { + t.Fatal(err) + } + + // replace the unit content, make sure that: + // It shows up and it did 'test -f /tmp/fleetSyncReplaceFile' correctly + err = util.GenNewFleetService(tmpSyncService, syncService, syncNew, syncOld) + if err != nil { + t.Fatalf("Failed to generate a temp fleet service: %v", err) + } + + stdout, stderr, err = cluster.Fleetctl(m, "start", "--replace", tmpSyncService) + if err != nil { + t.Fatalf("Failed to replace unit: \nstdout: %s\nstderr: %s\nerr: %v", stdout, stderr, err) + } + + _, err = cluster.WaitForNActiveUnits(m, 1) + if err != nil { + t.Fatalf("Did not find 1 unit in cluster, unit replace failed: %v", err) + } + + // Wait for the sync file, if the sync file is not created then + // the previous unit failed, if it's created we continue. Here + // the new version of the unit is probably already running and + // the ExecStartPre is running at the same time, if it failed + // then we probably will catch it later when we check its status + tmpService := path.Base(tmpSyncService) + timeout, err := util.WaitForState( + func() bool { + _, err = cluster.MemberCommand(m, syncNew) + if err != nil { + return false + } + return true + }, + ) + if err != nil { + t.Fatalf("Failed to check if file %s exists within %v", tmpSyncFile, timeout) + } + + timeout, err = util.WaitForState( + func() bool { + stdout, _ = cluster.MemberCommand(m, "systemctl", "show", "--property=ActiveState", tmpService) + if strings.TrimSpace(stdout) != "ActiveState=active" { + return false + } + return true + }, + ) + if err != nil { + t.Fatalf("%s unit not reported as active within %v", tmpService, timeout) + } + + timeout, err = util.WaitForState( + func() bool { + stdout, _ = cluster.MemberCommand(m, "systemctl", "show", "--property=Result", tmpService) + if strings.TrimSpace(stdout) != "Result=success" { + return false + } + return true + }, + ) + if err != nil { + t.Fatalf("Result for %s unit not reported as success withing %v", tmpService, timeout) + } + + os.Remove(tmpSyncFile) + os.Remove(tmpSyncService) +} diff --git a/functional/util/util.go b/functional/util/util.go index 88c45a140..f21ca3291 100644 --- a/functional/util/util.go +++ b/functional/util/util.go @@ -177,3 +177,39 @@ func NewMachineID() string { // drop the standard separators to match systemd return strings.Replace(uuid.New(), "-", "", -1) } + +// CopyFile() +func CopyFile(newFile, oldFile string) error { + input, err := ioutil.ReadFile(oldFile) + if err != nil { + return err + } + err = ioutil.WriteFile(newFile, []byte(input), 0644) + if err != nil { + return err + } + return nil +} + +// GenNewFleetService() is a helper for generating a temporary fleet service +// that reads from oldFile, replaces oldVal with newVal, and stores the result +// to newFile. +func GenNewFleetService(newFile, oldFile, newVal, oldVal string) error { + input, err := ioutil.ReadFile(oldFile) + if err != nil { + return err + } + lines := strings.Split(string(input), "\n") + + for i, line := range lines { + if strings.Contains(line, oldVal) { + lines[i] = strings.Replace(line, oldVal, newVal, len(oldVal)) + } + } + output := strings.Join(lines, "\n") + err = ioutil.WriteFile(newFile, []byte(output), 0644) + if err != nil { + return err + } + return nil +} diff --git a/registry/job.go b/registry/job.go index ca16282eb..53935f8f4 100644 --- a/registry/job.go +++ b/registry/job.go @@ -326,14 +326,14 @@ func (r *EtcdRegistry) CreateUnit(u *job.Unit) (err error) { } opts := &etcd.SetOptions{ - PrevExist: etcd.PrevNoExist, + // Since we support replacing units, just ignore previous + // job keys if they exist, this allows us to update the + // job object key with a new unit. + PrevExist: etcd.PrevIgnore, } key := r.prefixed(jobPrefix, u.Name, "object") _, err = r.kAPI.Set(r.ctx(), key, val, opts) if err != nil { - if isEtcdError(err, etcd.ErrorCodeNodeExist) { - err = errors.New("job already exists") - } return } diff --git a/unit/unit.go b/unit/unit.go index 2da58a6bb..6c54c0b91 100644 --- a/unit/unit.go +++ b/unit/unit.go @@ -136,6 +136,16 @@ func (u *UnitFile) Hash() Hash { return Hash(sha1.Sum(u.Bytes())) } +// MatchUnitFiles compares two unitFiles +// Returns true if the units match, false otherwise. +func MatchUnitFiles(a *UnitFile, b *UnitFile) bool { + if a.Hash() == b.Hash() { + return true + } + + return false +} + // RecognizedUnitType determines whether or not the given unit name represents // a recognized unit type. func RecognizedUnitType(name string) bool {