Skip to content

Commit

Permalink
Allow drain retries even with timeout failures (#104)
Browse files Browse the repository at this point in the history
* fix unit test

Signed-off-by: sbadiger <[email protected]>
  • Loading branch information
shreyas-badiger authored May 4, 2023
1 parent eea6b4b commit d6d214f
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 34 deletions.
3 changes: 3 additions & 0 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var (
maxDrainConcurrency int64
drainTimeoutSeconds int
drainTimeoutUnknownSeconds int
drainRetryAttempts int
pollingIntervalSeconds int
maxTimeToProcessSeconds int64

Expand Down Expand Up @@ -79,6 +80,7 @@ var serveCmd = &cobra.Command{
DrainRetryIntervalSeconds: int64(drainRetryIntervalSeconds),
MaxDrainConcurrency: semaphore.NewWeighted(maxDrainConcurrency),
MaxTimeToProcessSeconds: int64(maxTimeToProcessSeconds),
DrainRetryAttempts: uint(drainRetryAttempts),
Region: region,
WithDeregister: deregisterTargetGroups,
}
Expand All @@ -100,6 +102,7 @@ func init() {
serveCmd.Flags().IntVar(&drainTimeoutSeconds, "drain-timeout", 300, "hard time limit for draining healthy nodes")
serveCmd.Flags().IntVar(&drainTimeoutUnknownSeconds, "drain-timeout-unknown", 30, "hard time limit for draining nodes that are in unknown state")
serveCmd.Flags().IntVar(&drainRetryIntervalSeconds, "drain-interval", 30, "interval in seconds for which to retry draining")
serveCmd.Flags().IntVar(&drainRetryAttempts, "drain-retries", 3, "number of times to retry the node drain operation")
serveCmd.Flags().IntVar(&pollingIntervalSeconds, "polling-interval", 10, "interval in seconds for which to poll SQS")
serveCmd.Flags().BoolVar(&deregisterTargetGroups, "with-deregister", true, "try to deregister deleting instance from target groups")
serveCmd.Flags().BoolVar(&refreshExpiredCredentials, "refresh-expired-credentials", false, "refreshes expired credentials (requires shared credentials file)")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
golang.org/x/crypto v0.0.0-20190927123631-a832865fa7ad // indirect
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa // indirect
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 // indirect
golang.org/x/sys v0.0.0-20191010194322-b09406accb47 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/text v0.3.2 // indirect
golang.org/x/time v0.0.0-20190921001708-c4c64cad1fd0 // indirect
google.golang.org/appengine v1.6.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190616124812-15dcb6c0061f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191010194322-b09406accb47 h1:/XfQ9z7ib8eEJX2hdgFTZJ/ntt0swNk5oYBziWeTCvY=
golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20181227161524-e6919f6577db/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
Expand Down
1 change: 1 addition & 0 deletions pkg/service/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type ManagerContext struct {
DrainTimeoutUnknownSeconds int64
DrainTimeoutSeconds int64
DrainRetryIntervalSeconds int64
DrainRetryAttempts uint
PollingIntervalSeconds int64
WithDeregister bool
MaxDrainConcurrency *semaphore.Weighted
Expand Down
25 changes: 9 additions & 16 deletions pkg/service/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func isNodeStatusInCondition(node v1.Node, condition v1.ConditionStatus) bool {
return false
}

func drainNode(kubectlPath, nodeName string, timeout, retryInterval int64) error {
func drainNode(kubectlPath, nodeName string, timeout, retryInterval int64, retryAttempts uint) error {
drainArgs := []string{"drain", nodeName, "--ignore-daemonsets=true", "--delete-local-data=true", "--force", "--grace-period=-1"}
drainCommand := kubectlPath

Expand All @@ -58,7 +58,7 @@ func drainNode(kubectlPath, nodeName string, timeout, retryInterval int64) error
return nil
}

err := runCommandWithContext(drainCommand, drainArgs, timeout, retryInterval)
err := runCommandWithContext(drainCommand, drainArgs, timeout, retryInterval, retryAttempts)
if err != nil {
if err.Error() == "command execution timed out" {
log.Warnf("failed to drain node %v, drain command timed-out", nodeName)
Expand All @@ -70,33 +70,26 @@ func drainNode(kubectlPath, nodeName string, timeout, retryInterval int64) error
return nil
}

func runCommandWithContext(call string, args []string, timeoutSeconds, retryInterval int64) error {
// Create a new context and add a timeout to it
timeoutErr := fmt.Errorf("command execution timed out")
func runCommandWithContext(call string, args []string, timeoutSeconds, retryInterval int64, retryAttempts uint) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeoutSeconds)*time.Second)
defer cancel()
err := retry.Do(
func() error {
cmd := exec.CommandContext(ctx, call, args...)
out, err := cmd.CombinedOutput()
if ctx.Err() == context.DeadlineExceeded {
log.Error(timeoutErr)
return timeoutErr
}
_, err := cmd.CombinedOutput()
if err != nil {
log.Errorf("call failed with output: %s, error: %s", string(out), err)
return err
}
return nil
},
retry.RetryIf(func(err error) bool {
if err.Error() == timeoutErr.Error() {
return false
if err != nil {
log.Infoln("retrying drain")
return true
}
log.Infoln("retrying drain")
return true
return false
}),
retry.Attempts(3),
retry.Attempts(retryAttempts),
retry.Delay(time.Duration(retryInterval)*time.Second),
)
if err != nil {
Expand Down
22 changes: 13 additions & 9 deletions pkg/service/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,30 +158,34 @@ func Test_GetNodesByAnnotationKey(t *testing.T) {

func Test_DrainNodePositive(t *testing.T) {
t.Log("Test_DrainNodePositive: If drain process is successful, process should exit successfully")
err := drainNode(stubKubectlPathSuccess, "some-node", 10, 0)
err := drainNode(stubKubectlPathSuccess, "some-node", 10, 0, 3)
if err != nil {
t.Fatalf("drainNode: expected error not to have occured, %v", err)
}
}

func Test_DrainNodeNegative(t *testing.T) {
t.Log("Test_DrainNodeNegative: If drain process is unsuccessful, process should error")
err := drainNode(stubKubectlPathFail, "some-node", 10, 0)
err := drainNode(stubKubectlPathFail, "some-node", 10, 0, 3)
if err == nil {
t.Fatalf("drainNode: expected error to have occured, %v", err)
}
}

func Test_RunCommandWithContextTimeout(t *testing.T) {
t.Log("Test_RunCommandWithContextTimeout: should run a command with context successfully")
err := runCommandWithContext("/bin/sleep", []string{"10"}, 1, 0)
func Test_RunCommandWithContextWithoutTimeout(t *testing.T) {
t.Log("Test_RunCommandWithContextTimeout: should run a command with context successfully (without timeout)")
err := runCommandWithContext("/bin/sleep", []string{"5"}, 10, 0, 3)
if err != nil {
t.Fatalf("drainNode: expected error to not have occured, %v", err)
}
}

func Test_RunCommandWithContextWithTimeout(t *testing.T) {
t.Log("Test_RunCommandWithContextTimeout: should throw error (with timeout)")
err := runCommandWithContext("/bin/sleep", []string{"5"}, 1, 0, 3)
if err == nil {
t.Fatalf("drainNode: expected error to have occured, %v", err)
}
expectedErr := "All attempts fail:\n#1: command execution timed out"
if err.Error() != expectedErr {
t.Fatalf("drainNode: expected error message to be: %v, got: %v", expectedErr, err)
}
}

func Test_RunCommand(t *testing.T) {
Expand Down
18 changes: 10 additions & 8 deletions pkg/service/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (mgr *Manager) Start() {
log.Infof("node drain timeout seconds = %v", ctx.DrainTimeoutSeconds)
log.Infof("unknown node drain timeout seconds = %v", ctx.DrainTimeoutUnknownSeconds)
log.Infof("node drain retry interval seconds = %v", ctx.DrainRetryIntervalSeconds)
log.Infof("node drain retry attempts = %v", ctx.DrainRetryAttempts)
log.Infof("with alb deregister = %v", ctx.WithDeregister)

// start metrics server
Expand Down Expand Up @@ -233,13 +234,14 @@ func (mgr *Manager) newPoller() {

func (mgr *Manager) drainNodeTarget(event *LifecycleEvent) error {
var (
ctx = &mgr.context
kubeClient = mgr.authenticator.KubernetesClient
kubectlPath = mgr.context.KubectlLocalPath
metrics = mgr.metrics
drainTimeout = ctx.DrainTimeoutSeconds
retryInterval = ctx.DrainRetryIntervalSeconds
successMsg = fmt.Sprintf(EventMessageNodeDrainSucceeded, event.referencedNode.Name)
ctx = &mgr.context
kubeClient = mgr.authenticator.KubernetesClient
kubectlPath = mgr.context.KubectlLocalPath
metrics = mgr.metrics
drainTimeout = ctx.DrainTimeoutSeconds
drainRetryAttempts = ctx.DrainRetryAttempts
retryInterval = ctx.DrainRetryIntervalSeconds
successMsg = fmt.Sprintf(EventMessageNodeDrainSucceeded, event.referencedNode.Name)
)

log.Debugf("%v> acquired drain semaphore", event.EC2InstanceID)
Expand All @@ -257,7 +259,7 @@ func (mgr *Manager) drainNodeTarget(event *LifecycleEvent) error {
}

log.Infof("%v> draining node/%v", event.EC2InstanceID, event.referencedNode.Name)
err := drainNode(kubectlPath, event.referencedNode.Name, drainTimeout, retryInterval)
err := drainNode(kubectlPath, event.referencedNode.Name, drainTimeout, retryInterval, drainRetryAttempts)
if err != nil {
metrics.AddCounter(FailedNodeDrainTotalMetric, 1)
failMsg := fmt.Sprintf(EventMessageNodeDrainFailed, event.referencedNode.Name, err)
Expand Down
1 change: 1 addition & 0 deletions pkg/service/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func _newBasicContext() ManagerContext {
QueueName: "my-queue",
Region: "us-west-2",
DrainTimeoutSeconds: 1,
DrainRetryAttempts: 3,
PollingIntervalSeconds: 1,
MaxDrainConcurrency: semaphore.NewWeighted(32),
MaxTimeToProcessSeconds: 3600,
Expand Down

0 comments on commit d6d214f

Please sign in to comment.