Skip to content

Commit

Permalink
draining retries
Browse files Browse the repository at this point in the history
  • Loading branch information
Eytan Avisror committed Oct 2, 2019
1 parent 3d27c38 commit d51ba87
Show file tree
Hide file tree
Showing 21 changed files with 877 additions and 38 deletions.
43 changes: 40 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# lifecycle-manager

[![Build Status](https://travis-ci.org/keikoproj/lifecycle-manager.svg?branch=master)](https://travis-ci.org/keikoproj/lifecycle-manager)
[![codecov](https://codecov.io/gh/keikoproj/lifecycle-manager/branch/master/graph/badge.svg)](https://codecov.io/gh/keikoproj/lifecycle-manager)
[![Go Report Card](https://goreportcard.com/badge/github.com/keikoproj/lifecycle-manager)](https://goreportcard.com/report/github.com/keikoproj/lifecycle-manager)
Expand All @@ -14,16 +15,52 @@ lifecycle-manager uses lifecycle hooks from the autoscaling group (via SQS) to p

## Usage

```sh
1. Follow the [AWS docs](https://docs.aws.amazon.com/autoscaling/ec2/userguide/lifecycle-hooks.html#sqs-notifications) to create an SQS queue named `lifecycle-manager-queue`, a notification role, and a lifecycle-hook on your autoscaling group pointing to the created queue.

2. Deploy lifecycle-manager to your cluster:

```bash
kubectl create namespace lifecycle-manager

kubectl apply -f https://raw.githubusercontent.com/keikoproj/lifecycle-manager/master/examples/lifecycle-manager.yaml
```

3. Kill an instance in your scaling group and watch it getting drained:

```bash
$ aws autoscaling terminate-instance-in-auto-scaling-group --instance-id i-0d3ba307bc6cebeda --region us-west-2 --no-should-decrement-desired-capacity
{
"Activity": {
"ActivityId": "5285b629-6a18-0a43-7c3c-f76bac8205f0",
"AutoScalingGroupName": "my-scaling-group",
"Description": "Terminating EC2 instance: i-0d3ba307bc6cebeda",
"Cause": "At 2019-10-02T02:44:11Z instance i-0d3ba307bc6cebeda was taken out of service in response to a user request.",
"StartTime": "2019-10-02T02:44:11.394Z",
"StatusCode": "InProgress",
"Progress": 0,
"Details": "{\"Subnet ID\":\"subnet-0bf9bc85fEXAMPLE\",\"Availability Zone\":\"us-west-2c\"}"
}
}

$ kubectl logs lifecycle-manager
time="2019-10-02T02:44:05Z" level=info msg="starting lifecycle-manager service v0.1.0"
time="2019-10-02T02:44:05Z" level=info msg="region = us-west-2"
time="2019-10-02T02:44:05Z" level=info msg="queue = https://sqs.us-west-2.amazonaws.com/00000EXAMPLE/lifecycle-manager-queue"
time="2019-10-02T02:44:05Z" level=info msg="polling interval seconds = 10"
time="2019-10-02T02:44:05Z" level=info msg="drain timeout seconds = 300"
time="2019-10-02T02:44:05Z" level=info msg="drain retry interval seconds = 30"
time="2019-10-02T02:44:05Z" level=info msg="spawning sqs poller"
time="2019-10-02T02:44:12Z" level=info msg="spawning event handler"
time="2019-10-02T02:44:12Z" level=info msg="hook heartbeat timeout interval is 60, will send heartbeat every 30 seconds"
time="2019-10-02T02:44:12Z" level=info msg="draining node ip-10-10-10-10.us-west-2.compute.internal"
time="2019-10-02T02:44:42Z" level=info msg="sending heartbeat for event with instance 'i-0d3ba307bc6cebeda' and sleeping for 30 seconds"
time="2019-10-02T02:44:45Z" level=info msg="completed drain for node 'ip-10-10-10-10.us-west-2.compute.internal'"
time="2019-10-02T02:44:45Z" level=info msg="setting lifecycle event as completed with result: 'CONTINUE'"
```

## Release History

* 0.1.0
* Release alpha version of lifecycle-manager
Please see [CHANGELOG.md](.github/CHANGELOG.md).

## ❤ Contributing ❤

Expand Down
17 changes: 10 additions & 7 deletions cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ var (
kubectlLocalPath string
nodeName string

drainTimeoutSeconds int
pollingIntervalSeconds int
drainRetryIntervalSeconds int
drainTimeoutSeconds int
pollingIntervalSeconds int
)

// serveCmd represents the serve command
Expand All @@ -46,11 +47,12 @@ var serveCmd = &cobra.Command{

// prepare runtime context
context := service.ManagerContext{
KubectlLocalPath: kubectlLocalPath,
QueueName: queueName,
DrainTimeoutSeconds: int64(drainTimeoutSeconds),
PollingIntervalSeconds: int64(pollingIntervalSeconds),
Region: region,
KubectlLocalPath: kubectlLocalPath,
QueueName: queueName,
DrainTimeoutSeconds: int64(drainTimeoutSeconds),
PollingIntervalSeconds: int64(pollingIntervalSeconds),
DrainRetryIntervalSeconds: int64(drainRetryIntervalSeconds),
Region: region,
}

s := service.New(auth, context)
Expand All @@ -65,6 +67,7 @@ func init() {
serveCmd.Flags().StringVar(&queueName, "queue-name", "", "the name of the SQS queue to consume lifecycle hooks from")
serveCmd.Flags().StringVar(&kubectlLocalPath, "kubectl-path", "/usr/local/bin/kubectl", "the path to kubectl binary")
serveCmd.Flags().IntVar(&drainTimeoutSeconds, "drain-timeout", 300, "hard time limit for drain")
serveCmd.Flags().IntVar(&drainRetryIntervalSeconds, "drain-interval", 30, "interval in seconds for which to retry draining")
serveCmd.Flags().IntVar(&pollingIntervalSeconds, "polling-interval", 10, "interval in seconds for which to poll SQS")
}

Expand Down
3 changes: 1 addition & 2 deletions examples/lifecycle-manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ apiVersion: apps/v1
kind: Deployment
metadata:
name: lifecycle-manager
namespace: kube-system
namespace: lifecycle-manager
labels:
app: lifecycle-manager
spec:
Expand All @@ -70,7 +70,6 @@ spec:
memory: 100Mi
command:
- /bin/lifecycle-manager
- --kubectl-path /usr/local/bin/kubectl
- --queue-name my-queue
- --region us-west-2
volumeMounts:
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/keikoproj/lifecycle-manager
go 1.12

require (
github.com/avast/retry-go v2.4.1+incompatible
github.com/aws/aws-sdk-go v1.25.0
github.com/evanphx/json-patch v4.5.0+incompatible // indirect
github.com/gogo/protobuf v1.3.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb0
github.com/PuerkitoBio/purell v1.0.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbtSwDGJws/X0=
github.com/PuerkitoBio/urlesc v0.0.0-20160726150825-5bd2802263f2/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/avast/retry-go v2.4.1+incompatible h1:WMHc0mwoz20UVmBYK89mUB/KFRlxO0p+s+sgpmJMviY=
github.com/avast/retry-go v2.4.1+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY=
github.com/aws/aws-sdk-go v1.25.0 h1:MyXUdCesJLBvSSKYcaKeeEwxNUwUpG6/uqVYeH/Zzfo=
github.com/aws/aws-sdk-go v1.25.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
Expand Down
49 changes: 33 additions & 16 deletions pkg/service/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strings"
"time"

"github.com/avast/retry-go"
"github.com/keikoproj/lifecycle-manager/pkg/log"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -34,12 +35,13 @@ func getNodeByInstance(k kubernetes.Interface, instanceID string) (v1.Node, bool
return foundNode, false
}

func drainNode(kubectlPath, nodeName string, timeout int64) error {
func drainNode(kubectlPath, nodeName string, timeout, retryInterval int64) error {
log.Infof("draining node %v", nodeName)

drainArgs := []string{"drain", nodeName, "--ignore-daemonsets=true", "--delete-local-data=true", "--force", "--grace-period=-1"}
drainCommand := kubectlPath

_, err := runCommandWithContext(drainCommand, drainArgs, timeout)
err := runCommandWithContext(drainCommand, drainArgs, timeout, retryInterval)
if err != nil {
if err.Error() == "command execution timed out" {
log.Warnf("failed to drain node %v, drain command timed-out", nodeName)
Expand All @@ -51,23 +53,38 @@ func drainNode(kubectlPath, nodeName string, timeout int64) error {
return nil
}

func runCommandWithContext(call string, args []string, timeoutSeconds int64) (string, error) {
func runCommandWithContext(call string, args []string, timeoutSeconds, retryInterval int64) error {
// Create a new context and add a timeout to it
timeoutErr := fmt.Errorf("command execution timed out")
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeoutSeconds)*time.Second)
defer cancel()

cmd := exec.CommandContext(ctx, call, args...)
out, err := cmd.CombinedOutput()

if ctx.Err() == context.DeadlineExceeded {
timeoutErr := fmt.Errorf("command execution timed out")
log.Error(timeoutErr)
return string(out), timeoutErr
}

err := retry.Do(
func() error {
cmd := exec.CommandContext(ctx, call, args...)
out, err := cmd.CombinedOutput()
if ctx.Err() == context.DeadlineExceeded {
log.Error(timeoutErr)
return timeoutErr
}
if err != nil {
log.Errorf("call failed with output: %s, error: %s", string(out), err)
return err
}
return nil
},
retry.RetryIf(func(err error) bool {
if err.Error() == timeoutErr.Error() {
return false
}
log.Infoln("retrying drain")
return true
}),
retry.Attempts(3),
retry.Delay(time.Duration(retryInterval)*time.Second),
)
if err != nil {
log.Errorf("call failed with output: %s, error: %s", string(out), err)
return string(out), err
return err
}
return string(out), nil

return nil
}
8 changes: 4 additions & 4 deletions pkg/service/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,27 +70,27 @@ func Test_GetNodeByInstanceNegative(t *testing.T) {

func Test_DrainNodePositive(t *testing.T) {
t.Log("Test_DrainNodePositive: If drain process is successful, process should exit successfully")
err := drainNode(stubKubectlPathSuccess, "some-node", 10)
err := drainNode(stubKubectlPathSuccess, "some-node", 10, 0)
if err != nil {
t.Fatalf("drainNode: expected error not to have occured, %v", err)
}
}

func Test_DrainNodeNegative(t *testing.T) {
t.Log("Test_DrainNodePositive: If drain process is successful, process should exit successfully")
err := drainNode(stubKubectlPathFail, "some-node", 10)
err := drainNode(stubKubectlPathFail, "some-node", 10, 0)
if err == nil {
t.Fatalf("drainNode: expected error to have occured, %v", err)
}
}

func Test_RunCommandWithContextTimeout(t *testing.T) {
t.Log("Test_DrainNodePositive: If drain process is successful, process should exit successfully")
_, err := runCommandWithContext("/bin/sleep", []string{"10"}, 1)
err := runCommandWithContext("/bin/sleep", []string{"10"}, 1, 0)
if err == nil {
t.Fatalf("drainNode: expected error to have occured, %v", err)
}
expectedErr := "command execution timed out"
expectedErr := "All attempts fail:\n#1: command execution timed out"
if err.Error() != expectedErr {
t.Fatalf("drainNode: expected error message to be: %v, got: %v", expectedErr, err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/service/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func (g *Manager) Start() {
log.Infof("queue = %v", url)
log.Infof("polling interval seconds = %v", ctx.PollingIntervalSeconds)
log.Infof("drain timeout seconds = %v", ctx.DrainTimeoutSeconds)
log.Infof("drain retry interval seconds = %v", ctx.DrainRetryIntervalSeconds)

// create a poller goroutine that reads from sqs and posts to channel
log.Info("spawning sqs poller")
Expand Down Expand Up @@ -118,7 +119,7 @@ func (g *Manager) handleEvent(event *LifecycleEvent) error {
go sendHeartbeat(asgClient, event, recommendedHeartbeatActionInterval)

// drain action
err := drainNode(kubectlPath, event.referencedNode.Name, ctx.DrainTimeoutSeconds)
err := drainNode(kubectlPath, event.referencedNode.Name, ctx.DrainTimeoutSeconds, ctx.DrainRetryIntervalSeconds)
if err != nil {
return err
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/service/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,12 @@ func (g *Manager) CompleteEvent(event LifecycleEvent) {
}

type ManagerContext struct {
KubectlLocalPath string
QueueName string
Region string
DrainTimeoutSeconds int64
PollingIntervalSeconds int64
KubectlLocalPath string
QueueName string
Region string
DrainTimeoutSeconds int64
DrainRetryIntervalSeconds int64
PollingIntervalSeconds int64
}

type LifecycleEvent struct {
Expand Down
21 changes: 21 additions & 0 deletions vendor/github.com/avast/retry-go/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 37 additions & 0 deletions vendor/github.com/avast/retry-go/.godocdown.tmpl

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions vendor/github.com/avast/retry-go/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions vendor/github.com/avast/retry-go/Gopkg.toml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions vendor/github.com/avast/retry-go/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit d51ba87

Please sign in to comment.