Skip to content

Commit

Permalink
Apply as many changes as possible with errors at end (#532)
Browse files Browse the repository at this point in the history
- Changes that depend on failing changes shouldn't be applied
- exit-early-on-apply-error flag can be used to exit as soon as an error is encountered while applying changes (default true)
- exit-early-on-wait-error flag can be used to exit as soon as an error is encountered while waiting for changes (default true)
  • Loading branch information
praveenrewar authored Sep 19, 2022
1 parent 52a5483 commit 094843d
Show file tree
Hide file tree
Showing 7 changed files with 317 additions and 23 deletions.
25 changes: 16 additions & 9 deletions pkg/kapp/clusterapply/applying_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ type ApplyingChanges struct {
applied map[*ctldgraph.Change]struct{}
clusterChangeFactory ClusterChangeFactory
ui UI
exitOnError bool
}

func NewApplyingChanges(numTotal int, opts ApplyingChangesOpts, clusterChangeFactory ClusterChangeFactory, ui UI) *ApplyingChanges {
return &ApplyingChanges{numTotal, opts, map[*ctldgraph.Change]struct{}{}, clusterChangeFactory, ui}
func NewApplyingChanges(numTotal int, opts ApplyingChangesOpts, clusterChangeFactory ClusterChangeFactory, ui UI, exitOnError bool) *ApplyingChanges {
return &ApplyingChanges{numTotal, opts, map[*ctldgraph.Change]struct{}{}, clusterChangeFactory, ui, exitOnError}
}

type applyResult struct {
Expand All @@ -37,14 +38,16 @@ type applyResult struct {
Err error
}

func (c *ApplyingChanges) Apply(allChanges []*ctldgraph.Change) ([]WaitingChange, error) {
func (c *ApplyingChanges) Apply(allChanges []*ctldgraph.Change) ([]WaitingChange, []string, error) {
startTime := time.Now()

var unsuccessfulChangeDesc []string

for {
nonAppliedChanges := c.nonAppliedChanges(allChanges)
if len(nonAppliedChanges) == 0 {
// Do not print applying message if no changes
return nil, nil
return nil, unsuccessfulChangeDesc, nil
}

c.ui.NotifySection("applying %d changes %s", len(nonAppliedChanges), c.stats())
Expand Down Expand Up @@ -88,22 +91,26 @@ func (c *ApplyingChanges) Apply(allChanges []*ctldgraph.Change) ([]WaitingChange

if result.Err != nil {
lastErr = result.Err
if result.Retryable {
continue
if !result.Retryable {
if c.exitOnError {
return nil, nil, result.Err
}
unsuccessfulChangeDesc = append(unsuccessfulChangeDesc, result.Err.Error())
c.markApplied(result.Change)
}
return nil, result.Err
continue
}

c.markApplied(result.Change)
appliedChanges = append(appliedChanges, WaitingChange{result.Change, result.ClusterChange, time.Now()})
}

if len(appliedChanges) > 0 {
return appliedChanges, nil
return appliedChanges, unsuccessfulChangeDesc, nil
}

if time.Now().Sub(startTime) > c.opts.Timeout {
return nil, fmt.Errorf("Timed out waiting after %s: Last error: %w", c.opts.Timeout, lastErr)
return nil, unsuccessfulChangeDesc, fmt.Errorf("Timed out waiting after %s: Last error: %s", c.opts.Timeout, lastErr)
}

time.Sleep(c.opts.CheckInterval)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kapp/clusterapply/cluster_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (c *ClusterChange) applyErr(err error) error {
}
}

return fmt.Errorf("Applying %s: %s%s", c.ApplyDescription(),
return fmt.Errorf("%s: %s%s", c.ApplyDescription(),
uierrs.NewSemiStructuredError(err), hintMsg)
}

Expand Down
27 changes: 23 additions & 4 deletions pkg/kapp/clusterapply/cluster_change_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ package clusterapply

import (
"fmt"
"strings"

uierrs "github.com/cppforlife/go-cli-ui/errors"
ctlconf "github.com/vmware-tanzu/carvel-kapp/pkg/kapp/config"
ctldiff "github.com/vmware-tanzu/carvel-kapp/pkg/kapp/diff"
ctldgraph "github.com/vmware-tanzu/carvel-kapp/pkg/kapp/diffgraph"
Expand All @@ -15,6 +17,9 @@ import (
type ClusterChangeSetOpts struct {
ApplyingChangesOpts
WaitingChangesOpts

ExitEarlyOnApplyError bool
ExitEarlyOnWaitError bool
}

type ClusterChangeSet struct {
Expand Down Expand Up @@ -95,18 +100,30 @@ func (c ClusterChangeSet) Apply(changesGraph *ctldgraph.ChangeGraph) error {

blockedChanges := ctldgraph.NewBlockedChanges(changesGraph)
applyingChanges := NewApplyingChanges(
expectedNumChanges, c.opts.ApplyingChangesOpts, c.clusterChangeFactory, c.ui)
waitingChanges := NewWaitingChanges(expectedNumChanges, c.opts.WaitingChangesOpts, c.ui)
expectedNumChanges, c.opts.ApplyingChangesOpts, c.clusterChangeFactory, c.ui, c.opts.ExitEarlyOnApplyError)
waitingChanges := NewWaitingChanges(expectedNumChanges, c.opts.WaitingChangesOpts, c.ui, c.opts.ExitEarlyOnWaitError)

var unsuccessfulChanges []string

for {
appliedChanges, err := applyingChanges.Apply(blockedChanges.Unblocked())
appliedChanges, unsuccessfulChangeDesc, err := applyingChanges.Apply(blockedChanges.Unblocked())
if err != nil {
return err
}

unsuccessfulChanges = append(unsuccessfulChanges, unsuccessfulChangeDesc...)

waitingChanges.Track(appliedChanges)

if waitingChanges.IsEmpty() {
if len(unsuccessfulChanges) == 1 {
return fmt.Errorf("%s", unsuccessfulChanges[0])
}

if len(unsuccessfulChanges) > 0 {
return uierrs.NewSemiStructuredError(fmt.Errorf("[%s]", strings.Join(unsuccessfulChanges, ", ")))
}

err := applyingChanges.Complete()
if err != nil {
c.ui.Notify([]string{fmt.Sprintf("Blocked changes:\n%s\n", blockedChanges.WhyBlocked(blockedChanges.Blocked()))})
Expand All @@ -116,11 +133,13 @@ func (c ClusterChangeSet) Apply(changesGraph *ctldgraph.ChangeGraph) error {
return waitingChanges.Complete()
}

doneChanges, err := waitingChanges.WaitForAny()
doneChanges, unsuccessfulChangeDesc, err := waitingChanges.WaitForAny()
if err != nil {
return err
}

unsuccessfulChanges = append(unsuccessfulChanges, unsuccessfulChangeDesc...)

for _, change := range doneChanges {
blockedChanges.Unblock(change.Graph)
}
Expand Down
27 changes: 19 additions & 8 deletions pkg/kapp/clusterapply/waiting_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type WaitingChanges struct {
trackedChanges []WaitingChange
opts WaitingChangesOpts
ui UI
exitOnError bool
}

type WaitingChange struct {
Expand All @@ -35,8 +36,8 @@ type WaitingChange struct {
startTime time.Time
}

func NewWaitingChanges(numTotal int, opts WaitingChangesOpts, ui UI) *WaitingChanges {
return &WaitingChanges{numTotal, 0, nil, opts, ui}
func NewWaitingChanges(numTotal int, opts WaitingChangesOpts, ui UI, exitOnError bool) *WaitingChanges {
return &WaitingChanges{numTotal, 0, nil, opts, ui, exitOnError}
}

func (c *WaitingChanges) Track(changes []WaitingChange) {
Expand All @@ -54,7 +55,7 @@ type waitResult struct {
Err error
}

func (c *WaitingChanges) WaitForAny() ([]WaitingChange, error) {
func (c *WaitingChanges) WaitForAny() ([]WaitingChange, []string, error) {
startTime := time.Now()

for {
Expand Down Expand Up @@ -83,6 +84,7 @@ func (c *WaitingChanges) WaitForAny() ([]WaitingChange, error) {

var newInProgressChanges []WaitingChange
var doneChanges []WaitingChange
var unsuccessfulChangeDesc []string

for i := 0; i < len(c.trackedChanges); i++ {
result := <-waitCh
Expand All @@ -92,7 +94,12 @@ func (c *WaitingChanges) WaitForAny() ([]WaitingChange, error) {
c.ui.Notify(descMsgs)

if err != nil {
return nil, fmt.Errorf("%s: Errored: %w", desc, err)
err = fmt.Errorf("%s: Errored: %w", desc, err)
if c.exitOnError {
return nil, nil, err
}
unsuccessfulChangeDesc = append(unsuccessfulChangeDesc, err.Error())
continue
}
if state.Done {
c.numWaited++
Expand All @@ -111,7 +118,11 @@ func (c *WaitingChanges) WaitForAny() ([]WaitingChange, error) {
if len(state.Message) > 0 {
msg += " (" + state.Message + ")"
}
return nil, fmt.Errorf("%s: Finished unsuccessfully%s", desc, msg)
err := fmt.Errorf("%s: Finished unsuccessfully%s", desc, msg)
if c.exitOnError {
return nil, nil, err
}
unsuccessfulChangeDesc = append(unsuccessfulChangeDesc, err.Error())

case state.Done && state.Successful:
doneChanges = append(doneChanges, change)
Expand All @@ -120,16 +131,16 @@ func (c *WaitingChanges) WaitForAny() ([]WaitingChange, error) {

c.trackedChanges = newInProgressChanges

if len(c.trackedChanges) == 0 || len(doneChanges) > 0 {
return doneChanges, nil
if len(c.trackedChanges) == 0 || len(doneChanges) > 0 || len(unsuccessfulChangeDesc) > 0 {
return doneChanges, unsuccessfulChangeDesc, nil
}

if time.Now().Sub(startTime) > c.opts.Timeout {
var trackedResourcesDesc []string
for _, change := range c.trackedChanges {
trackedResourcesDesc = append(trackedResourcesDesc, change.Cluster.Resource().Description())
}
return nil, uierrs.NewSemiStructuredError(fmt.Errorf("Timed out waiting after %s for resources: [%s]", c.opts.Timeout, strings.Join(trackedResourcesDesc, ", ")))
return nil, unsuccessfulChangeDesc, uierrs.NewSemiStructuredError(fmt.Errorf("Timed out waiting after %s for resources: [%s]", c.opts.Timeout, strings.Join(trackedResourcesDesc, ", ")))
}

time.Sleep(c.opts.CheckInterval)
Expand Down
4 changes: 4 additions & 0 deletions pkg/kapp/cmd/app/apply_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ func (s *ApplyFlags) SetWithDefaults(prefix string, defaults ApplyFlags, cmd *co
cmd.Flags().StringVar(&s.AddOrUpdateChangeOpts.DefaultUpdateStrategy, prefix+"apply-default-update-strategy",
defaults.AddOrUpdateChangeOpts.DefaultUpdateStrategy, "Change default update strategy")

cmd.Flags().BoolVar(&s.ExitEarlyOnApplyError, prefix+"exit-early-on-apply-error", true, "Exit quickly on apply failure")

cmd.Flags().BoolVar(&s.Wait, prefix+"wait", defaults.Wait, "Set to wait for changes to be applied")
cmd.Flags().BoolVar(&s.WaitIgnored, prefix+"wait-ignored", defaults.WaitIgnored, "Set to wait for ignored changes to be applied")

Expand All @@ -63,6 +65,8 @@ func (s *ApplyFlags) SetWithDefaults(prefix string, defaults ApplyFlags, cmd *co
5, "Maximum number of concurrent wait operations")

cmd.Flags().BoolVar(&s.ExitStatus, prefix+"apply-exit-status", false, "Return specific exit status based on number of changes")

cmd.Flags().BoolVar(&s.ExitEarlyOnWaitError, prefix+"exit-early-on-wait-error", true, "Exit quickly on wait failure")
}

func mustParseDuration(str string) time.Duration {
Expand Down
Loading

0 comments on commit 094843d

Please sign in to comment.