diff --git a/README.md b/README.md index bc8b8cab..695c99d9 100644 --- a/README.md +++ b/README.md @@ -137,6 +137,8 @@ Overview of configuration which can be set via Ingress annotations. |`zalando.org/aws-load-balancer-ssl-policy`|`string`|`ELBSecurityPolicy-2016-08`| |`zalando.org/aws-load-balancer-type`| `nlb` \| `alb`|`alb`| |`zalando.org/aws-load-balancer-http2`| `true` \| `false`|`true`| +|[`zalando.org/aws-nlb-cascade-http-to-alb`](#cascading-to-an-alb)| `true` \| `false`|`false`| +|[`zalando.org/aws-nlb-extra-listeners`](#extra-listen-ports)|`string`|N/A| |`zalando.org/aws-waf-web-acl-id` | `string` | N/A | |`kubernetes.io/ingress.class`|`string`|N/A| @@ -664,6 +666,37 @@ In *AWS CNI Mode* (`target-access-mode=AWSCNI`) the controller actively manages | `AWSCNI` | `false` | `true` | PodIP != HostIP: limited scaling and host bound | | `AWSCNI` | `false` | `false` | free scaling, pod VPC CNI IP used | +## Advanced Options for NLBs + +### Extra Listen Ports + +Some real world scenarios may require non-standard TCP or UDP ingresses. The `zalando.org/aws-nlb-extra-listeners` +annotation allows you to specify a list of additional listeners to add to your NLB. The value of the annotation should +be a valid JSON string of the following format. + +```json +[ + { + "protocol": "TCP", + "listenport": 22, + "targetport": 2222, + "podlabel": "application=ssh-service" + } +] +``` + +The `podlabel` value is used to register targets in the target group associated with the listener. This depends on the +AWS CNI Mode feature, where individual pods receive accessible IP addresses. The value is used to identify pods running +in the same namespace as the ingress that will receive traffic from the load balancer. + +### Cascading to an ALB + +If your usage of an NLB requires both HTTP(S) and extra listen ports, but you already have an ALB managed by +kube-ingress-aws-controller, you may wish to route the HTTP(S) traffic through the ALB for consistency. Some +applications may even depend on ALB features, for example the way SSL is offloaded or how redirects are handled. +The `zalando.org/aws-nlb-cascade-http-to-alb` allows the NLB to use TCP listeners on standard ports to forward +HTTP(S) traffic to any ALBs discovered and managed by kube-ingress-aws-controller. + ## Trying it out The Ingress Controller's responsibility is limited to managing load balancers, as described above. To have a fully diff --git a/aws/adapter.go b/aws/adapter.go index c9d52dda..6221f582 100644 --- a/aws/adapter.go +++ b/aws/adapter.go @@ -75,13 +75,24 @@ type Adapter struct { denyInternalRespContentType string denyInternalRespStatusCode int TargetCNI *TargetCNIconfig + CascadeCh chan []TargetGroupWithLabels } type TargetCNIconfig struct { Enabled bool - TargetGroupCh chan []string + TargetGroupCh chan []TargetGroupWithLabels } +type TargetGroupWithLabels struct { + ARN string + PodNamespace string + PodLabel string +} +type CNIEndpoint struct { + IPAddress string + Namespace string + Podlabel string +} type manifest struct { securityGroup *securityGroupDetails instance *instanceDetails @@ -235,7 +246,7 @@ func NewAdapter(clusterID, newControllerID, vpcID string, debug, disableInstrume customFilter: DefaultCustomFilter, TargetCNI: &TargetCNIconfig{ Enabled: false, - TargetGroupCh: make(chan []string, 10), + TargetGroupCh: make(chan []TargetGroupWithLabels, 10), }, } @@ -592,8 +603,33 @@ func (a *Adapter) UpdateTargetGroupsAndAutoScalingGroups(stacks []*Stack, proble a.TargetCNI.TargetGroupCh <- targetTypesARNs[elbv2.TargetTypeEnumIp] } + // run through any target groups with ALB targets and register all ALBs + for _, tg := range targetTypesARNs[elbv2.TargetTypeEnumAlb] { + albARNs := make([]string, 0, len(stacks)) + for _, stack := range stacks { + if stack.LoadBalancerType == LoadBalancerTypeApplication { + albARNs = append(albARNs, stack.loadbalancerARN) + } + } + registeredTargets, err := a.getRegisteredTargets(tg.ARN) + if err != nil { + problems.Add("failed to get existing targets: %w", err) + } + if err := a.registerAndDeregister(albARNs, registeredTargets, tg.ARN); err != nil { + problems.Add("failed to update target registration %w", err) + } + } + // remove the IP TGs from the list keeping all other TGs including problematic #127 and nonexistent #436 - targetGroupARNs := difference(allTargetGroupARNs, targetTypesARNs[elbv2.TargetTypeEnumIp]) + var targetGroupARNs []string + for targetType, tgList := range targetTypesARNs { + if targetType == elbv2.TargetTypeEnumIp || targetType == elbv2.TargetTypeEnumAlb { + continue + } + for _, tg := range tgList { + targetGroupARNs = append(targetGroupARNs, tg.ARN) + } + } ownerTags := map[string]string{ clusterIDTagPrefix + a.ClusterID(): resourceLifecycleOwned, @@ -639,7 +675,7 @@ func (a *Adapter) UpdateTargetGroupsAndAutoScalingGroups(stacks []*Stack, proble // All the required resources (listeners and target group) are created in a // transactional fashion. // Failure to create the stack causes it to be deleted automatically. -func (a *Adapter) CreateStack(certificateARNs []string, scheme, securityGroup, owner, sslPolicy, ipAddressType, wafWebACLID string, cwAlarms CloudWatchAlarmList, loadBalancerType string, http2 bool) (string, error) { +func (a *Adapter) CreateStack(certificateARNs []string, scheme, securityGroup, owner, sslPolicy, ipAddressType, wafWebACLID string, cwAlarms CloudWatchAlarmList, loadBalancerType string, http2 bool, extraListeners []ExtraListener, cascade bool) (string, error) { certARNs := make(map[string]time.Time, len(certificateARNs)) for _, arn := range certificateARNs { certARNs[arn] = time.Time{} @@ -673,7 +709,7 @@ func (a *Adapter) CreateStack(certificateARNs []string, scheme, securityGroup, o nlbHealthyThresholdCount: a.nlbHealthyThresholdCount, targetPort: a.targetPort, targetHTTPS: a.targetHTTPS, - httpDisabled: a.httpDisabled(loadBalancerType), + httpDisabled: a.httpDisabled(loadBalancerType, cascade), httpTargetPort: a.httpTargetPort(loadBalancerType), timeoutInMinutes: uint(a.creationTimeout.Minutes()), stackTerminationProtection: a.stackTerminationProtection, @@ -690,6 +726,8 @@ func (a *Adapter) CreateStack(certificateARNs []string, scheme, securityGroup, o httpRedirectToHTTPS: a.httpRedirectToHTTPS, nlbCrossZone: a.nlbCrossZone, http2: http2, + extraListeners: extraListeners, + cascade: cascade, tags: a.stackTags, internalDomains: a.internalDomains, targetAccessModeCNI: a.TargetCNI.Enabled, @@ -704,7 +742,7 @@ func (a *Adapter) CreateStack(certificateARNs []string, scheme, securityGroup, o return createStack(a.cloudformation, spec) } -func (a *Adapter) UpdateStack(stackName string, certificateARNs map[string]time.Time, scheme, securityGroup, owner, sslPolicy, ipAddressType, wafWebACLID string, cwAlarms CloudWatchAlarmList, loadBalancerType string, http2 bool) (string, error) { +func (a *Adapter) UpdateStack(stackName string, certificateARNs map[string]time.Time, scheme, securityGroup, owner, sslPolicy, ipAddressType, wafWebACLID string, cwAlarms CloudWatchAlarmList, loadBalancerType string, http2 bool, extraListeners []ExtraListener, cascade bool) (string, error) { if _, ok := SSLPolicies[sslPolicy]; !ok { return "", fmt.Errorf("invalid SSLPolicy '%s' defined", sslPolicy) } @@ -729,7 +767,7 @@ func (a *Adapter) UpdateStack(stackName string, certificateARNs map[string]time. nlbHealthyThresholdCount: a.nlbHealthyThresholdCount, targetPort: a.targetPort, targetHTTPS: a.targetHTTPS, - httpDisabled: a.httpDisabled(loadBalancerType), + httpDisabled: a.httpDisabled(loadBalancerType, cascade), httpTargetPort: a.httpTargetPort(loadBalancerType), timeoutInMinutes: uint(a.creationTimeout.Minutes()), stackTerminationProtection: a.stackTerminationProtection, @@ -746,6 +784,8 @@ func (a *Adapter) UpdateStack(stackName string, certificateARNs map[string]time. httpRedirectToHTTPS: a.httpRedirectToHTTPS, nlbCrossZone: a.nlbCrossZone, http2: http2, + extraListeners: extraListeners, + cascade: cascade, tags: a.stackTags, internalDomains: a.internalDomains, targetAccessModeCNI: a.TargetCNI.Enabled, @@ -769,8 +809,8 @@ func (a *Adapter) httpTargetPort(loadBalancerType string) uint { return a.targetPort } -func (a *Adapter) httpDisabled(loadBalancerType string) bool { - if loadBalancerType == LoadBalancerTypeNetwork { +func (a *Adapter) httpDisabled(loadBalancerType string, cascade bool) bool { + if loadBalancerType == LoadBalancerTypeNetwork && !cascade { return !a.nlbHTTPEnabled } return false @@ -1039,36 +1079,56 @@ func nonTargetedASGs(ownedASGs, targetedASGs map[string]*autoScalingGroupDetails return nonTargetedASGs } +func (a *Adapter) getRegisteredTargets(tgARN string) ([]string, error) { + tgh, err := a.elbv2.DescribeTargetHealth(&elbv2.DescribeTargetHealthInput{TargetGroupArn: &tgARN}) + if err != nil { + log.Errorf("unable to describe target health %v", err) + return []string{}, err + } + registeredTargets := make([]string, len(tgh.TargetHealthDescriptions)) + for i, target := range tgh.TargetHealthDescriptions { + registeredTargets[i] = *target.Target.Id + } + return registeredTargets, nil +} + +func (a *Adapter) registerAndDeregister(new []string, old []string, tgARN string) error { + toRegister := difference(new, old) + if len(toRegister) > 0 { + log.Info("Registering CNI targets: ", toRegister) + err := registerTargetsOnTargetGroups(a.elbv2, []string{tgARN}, toRegister) + if err != nil { + return err + } + } + toDeregister := difference(old, new) + if len(toDeregister) > 0 { + log.Info("Deregistering CNI targets: ", toDeregister) + err := deregisterTargetsOnTargetGroups(a.elbv2, []string{tgARN}, toDeregister) + if err != nil { + return err + } + } + return nil +} + // SetTargetsOnCNITargetGroups implements desired state for CNI target groups // by polling the current list of targets thus creating a diff of what needs to be added and removed. -func (a *Adapter) SetTargetsOnCNITargetGroups(endpoints, cniTargetGroupARNs []string) error { - log.Debugf("setting targets on CNI target groups: '%v'", cniTargetGroupARNs) - for _, targetGroupARN := range cniTargetGroupARNs { - tgh, err := a.elbv2.DescribeTargetHealth(&elbv2.DescribeTargetHealthInput{TargetGroupArn: &targetGroupARN}) +func (a *Adapter) SetTargetsOnCNITargetGroups(endpoints []CNIEndpoint, cniTargetGroups []TargetGroupWithLabels) error { + log.Debugf("setting targets on CNI target groups: '%v'", cniTargetGroups) + for _, targetGroup := range cniTargetGroups { + registeredTargets, err := a.getRegisteredTargets(targetGroup.ARN) if err != nil { - log.Errorf("unable to describe target health %v", err) - // continue for processing of the rest of the target groups continue } - registeredInstances := make([]string, len(tgh.TargetHealthDescriptions)) - for i, target := range tgh.TargetHealthDescriptions { - registeredInstances[i] = *target.Target.Id - } - toRegister := difference(endpoints, registeredInstances) - if len(toRegister) > 0 { - log.Info("Registering CNI targets: ", toRegister) - err := registerTargetsOnTargetGroups(a.elbv2, []string{targetGroupARN}, toRegister) - if err != nil { - return err + var matchingEndpoints []string + for _, endpoint := range endpoints { + if endpoint.Podlabel == targetGroup.PodLabel && endpoint.Namespace == targetGroup.PodNamespace { + matchingEndpoints = append(matchingEndpoints, endpoint.IPAddress) } } - toDeregister := difference(registeredInstances, endpoints) - if len(toDeregister) > 0 { - log.Info("Deregistering CNI targets: ", toDeregister) - err := deregisterTargetsOnTargetGroups(a.elbv2, []string{targetGroupARN}, toDeregister) - if err != nil { - return err - } + if err := a.registerAndDeregister(matchingEndpoints, registeredTargets, targetGroup.ARN); err != nil { + return err } } return nil diff --git a/aws/adapter_test.go b/aws/adapter_test.go index e095b4cd..0840f39f 100644 --- a/aws/adapter_test.go +++ b/aws/adapter_test.go @@ -949,7 +949,7 @@ func TestWithxlbHealthyThresholdCount(t *testing.T) { } func TestAdapter_SetTargetsOnCNITargetGroups(t *testing.T) { - tgARNs := []string{"asg1"} + tgARNs := []TargetGroupWithLabels{{ARN: "asg1"}} thOut := elbv2.DescribeTargetHealthOutput{TargetHealthDescriptions: []*elbv2.TargetHealthDescription{}} m := &mockElbv2Client{ outputs: elbv2MockOutputs{ @@ -961,7 +961,7 @@ func TestAdapter_SetTargetsOnCNITargetGroups(t *testing.T) { a := &Adapter{elbv2: m, TargetCNI: &TargetCNIconfig{}} t.Run("adding a new endpoint", func(t *testing.T) { - require.NoError(t, a.SetTargetsOnCNITargetGroups([]string{"1.1.1.1"}, tgARNs)) + require.NoError(t, a.SetTargetsOnCNITargetGroups([]CNIEndpoint{{IPAddress: "1.1.1.1"}}, tgARNs)) require.Equal(t, []*elbv2.RegisterTargetsInput{{ TargetGroupArn: aws.String("asg1"), Targets: []*elbv2.TargetDescription{{Id: aws.String("1.1.1.1")}}, @@ -975,7 +975,8 @@ func TestAdapter_SetTargetsOnCNITargetGroups(t *testing.T) { } m.rtinputs, m.dtinputs = nil, nil - require.NoError(t, a.SetTargetsOnCNITargetGroups([]string{"1.1.1.1", "2.2.2.2", "3.3.3.3"}, tgARNs)) + require.NoError(t, a.SetTargetsOnCNITargetGroups( + []CNIEndpoint{{IPAddress: "1.1.1.1"}, {IPAddress: "2.2.2.2"}, {IPAddress: "3.3.3.3"}}, tgARNs)) require.Equal(t, []*elbv2.TargetDescription{ {Id: aws.String("2.2.2.2")}, {Id: aws.String("3.3.3.3")}, @@ -991,7 +992,7 @@ func TestAdapter_SetTargetsOnCNITargetGroups(t *testing.T) { }} m.rtinputs, m.dtinputs = nil, nil - require.NoError(t, a.SetTargetsOnCNITargetGroups([]string{"1.1.1.1", "3.3.3.3"}, tgARNs)) + require.NoError(t, a.SetTargetsOnCNITargetGroups([]CNIEndpoint{{IPAddress: "1.1.1.1"}, {IPAddress: "3.3.3.3"}}, tgARNs)) require.Equal(t, []*elbv2.RegisterTargetsInput(nil), m.rtinputs) require.Equal(t, []*elbv2.TargetDescription{{Id: aws.String("2.2.2.2")}}, m.dtinputs[0].Targets) }) @@ -1004,7 +1005,8 @@ func TestAdapter_SetTargetsOnCNITargetGroups(t *testing.T) { }} m.rtinputs, m.dtinputs = nil, nil - require.NoError(t, a.SetTargetsOnCNITargetGroups([]string{"1.1.1.1", "2.2.2.2", "3.3.3.3"}, tgARNs)) + require.NoError(t, a.SetTargetsOnCNITargetGroups( + []CNIEndpoint{{IPAddress: "1.1.1.1"}, {IPAddress: "2.2.2.2"}, {IPAddress: "3.3.3.3"}}, tgARNs)) require.Equal(t, []*elbv2.TargetDescription{{Id: aws.String("3.3.3.3")}}, m.rtinputs[0].Targets) require.Equal(t, []*elbv2.TargetDescription{{Id: aws.String("4.4.4.4")}}, m.dtinputs[0].Targets) }) diff --git a/aws/asg.go b/aws/asg.go index 794e0131..1a440bf9 100644 --- a/aws/asg.go +++ b/aws/asg.go @@ -261,8 +261,8 @@ func describeTargetGroups(elbv2svc elbv2iface.ELBV2API) (map[string]struct{}, er } // map the target group slice into specific types such as instance, ip, etc -func categorizeTargetTypeInstance(elbv2svc elbv2iface.ELBV2API, allTGARNs []string) (map[string][]string, error) { - targetTypes := make(map[string][]string) +func categorizeTargetTypeInstance(elbv2svc elbv2iface.ELBV2API, allTGARNs []string) (map[string][]TargetGroupWithLabels, error) { + targetTypes := make(map[string][]TargetGroupWithLabels) err := elbv2svc.DescribeTargetGroupsPagesWithContext(context.TODO(), &elbv2.DescribeTargetGroupsInput{}, func(resp *elbv2.DescribeTargetGroupsOutput, lastPage bool) bool { for _, tg := range resp.TargetGroups { @@ -270,7 +270,30 @@ func categorizeTargetTypeInstance(elbv2svc elbv2iface.ELBV2API, allTGARNs []stri if v != aws.StringValue(tg.TargetGroupArn) { continue } - targetTypes[aws.StringValue(tg.TargetType)] = append(targetTypes[aws.StringValue(tg.TargetType)], aws.StringValue(tg.TargetGroupArn)) + var podlabel, podnamespace string + log.Debugf("Looking for tags on %s", aws.StringValue(tg.TargetGroupArn)) + out, err := elbv2svc.DescribeTags(&elbv2.DescribeTagsInput{ResourceArns: []*string{tg.TargetGroupArn}}) + if err != nil { + log.Errorf("cannot describe tags on target group: %v", err) + } else { + for _, desc := range out.TagDescriptions { + for _, tag := range desc.Tags { + switch aws.StringValue(tag.Key) { + case podLabelTag: + podlabel = aws.StringValue(tag.Value) + case podNamespaceTag: + podnamespace = aws.StringValue(tag.Value) + } + } + } + } + log.Debugf("Adding tg with label: '%s' in namespace: '%s'", podlabel, podnamespace) + targetTypes[aws.StringValue(tg.TargetType)] = append( + targetTypes[aws.StringValue(tg.TargetType)], + TargetGroupWithLabels{ + ARN: aws.StringValue(tg.TargetGroupArn), + PodLabel: podlabel, + PodNamespace: podnamespace}) } } return true diff --git a/aws/asg_test.go b/aws/asg_test.go index 87e825b3..00354b61 100644 --- a/aws/asg_test.go +++ b/aws/asg_test.go @@ -682,29 +682,29 @@ func TestProcessChunked(t *testing.T) { func Test_categorizeTargetTypeInstance(t *testing.T) { for _, test := range []struct { name string - targetGroups map[string][]string + targetGroups map[string][]TargetGroupWithLabels }{ { name: "one from any type", - targetGroups: map[string][]string{ - elbv2.TargetTypeEnumInstance: {"instancy"}, - elbv2.TargetTypeEnumAlb: {"albly"}, - elbv2.TargetTypeEnumIp: {"ipvy"}, - elbv2.TargetTypeEnumLambda: {"lambada"}, + targetGroups: map[string][]TargetGroupWithLabels{ + elbv2.TargetTypeEnumInstance: {{ARN: "instancy"}}, + elbv2.TargetTypeEnumAlb: {{ARN: "albly"}}, + elbv2.TargetTypeEnumIp: {{ARN: "ipvy"}}, + elbv2.TargetTypeEnumLambda: {{ARN: "lambada"}}, }, }, { name: "one type many target groups", - targetGroups: map[string][]string{ - elbv2.TargetTypeEnumInstance: {"instancy", "foo", "void", "bar", "blank"}, + targetGroups: map[string][]TargetGroupWithLabels{ + elbv2.TargetTypeEnumInstance: {{ARN: "instancy"}, {ARN: "foo"}, {ARN: "void"}, {ARN: "bar"}, {ARN: "blank"}}, }, }, { name: "several types many target groups", - targetGroups: map[string][]string{ - elbv2.TargetTypeEnumInstance: {"instancy", "foo", "void", "bar", "blank"}, - elbv2.TargetTypeEnumAlb: {"albly", "alblily"}, - elbv2.TargetTypeEnumIp: {"ipvy"}, + targetGroups: map[string][]TargetGroupWithLabels{ + elbv2.TargetTypeEnumInstance: {{ARN: "instancy"}, {ARN: "foo"}, {ARN: "void"}, {ARN: "bar"}, {ARN: "blank"}}, + elbv2.TargetTypeEnumAlb: {{ARN: "albly"}, {ARN: "alblily"}}, + elbv2.TargetTypeEnumIp: {{ARN: "ipvy"}}, }, }, } { @@ -713,17 +713,20 @@ func Test_categorizeTargetTypeInstance(t *testing.T) { tgResponse := []*elbv2.TargetGroup{} for k, v := range test.targetGroups { for _, i := range v { - tg = append(tg, i) - tgResponse = append(tgResponse, &elbv2.TargetGroup{TargetGroupArn: aws.String(i), TargetType: aws.String(k)}) + tg = append(tg, i.ARN) + tgResponse = append(tgResponse, &elbv2.TargetGroup{TargetGroupArn: aws.String(i.ARN), TargetType: aws.String(k)}) } } - mockElbv2Svc := &mockElbv2Client{outputs: elbv2MockOutputs{describeTargetGroups: R(&elbv2.DescribeTargetGroupsOutput{TargetGroups: tgResponse}, nil)}} + mockElbv2Svc := &mockElbv2Client{outputs: elbv2MockOutputs{ + describeTargetGroups: R(&elbv2.DescribeTargetGroupsOutput{TargetGroups: tgResponse}, nil), + describeTags: R(&elbv2.DescribeTagsOutput{TagDescriptions: []*elbv2.TagDescription{}}, nil), + }} got, err := categorizeTargetTypeInstance(mockElbv2Svc, tg) assert.NoError(t, err) for k, v := range test.targetGroups { assert.Len(t, got[k], len(v)) - assert.Equal(t, got[k], v) + assert.Equal(t, v, got[k]) } }) } diff --git a/aws/cf.go b/aws/cf.go index d04a37fe..28519746 100644 --- a/aws/cf.go +++ b/aws/cf.go @@ -1,6 +1,8 @@ package aws import ( + "encoding/base64" + "encoding/json" "fmt" "strings" "time" @@ -15,6 +17,9 @@ const ( certificateARNTagPrefix = "ingress:certificate-arn/" ingressOwnerTag = "ingress:owner" cwAlarmConfigHashTag = "cloudwatch:alarm-config-hash" + extraListenersTag = "ingress:extra-listeners" + podLabelTag = "ingress:podlabel" + podNamespaceTag = "ingress:podnamespace" ) // Stack is a simple wrapper around a CloudFormation Stack. @@ -29,12 +34,24 @@ type Stack struct { IpAddressType string LoadBalancerType string HTTP2 bool + ExtraListeners []ExtraListener + Cascade bool OwnerIngress string CWAlarmConfigHash string TargetGroupARNs []string WAFWebACLID string CertificateARNs map[string]time.Time tags map[string]string + loadbalancerARN string +} + +type ExtraListener struct { + ListenProtocol string `json:"protocol"` + ListenPort int64 `json:"listenport"` + TargetPort int64 `json:"targetport"` + PodLabel string `json:"podlabel,omitempty"` + Namespace string + cascade bool } // IsComplete returns true if the stack status is a complete state. @@ -107,12 +124,15 @@ func (o stackOutput) dnsName() string { return o[outputLoadBalancerDNSName] } +func (o stackOutput) lbARN() string { + return o[outputLoadBalancerARN] +} + func (o stackOutput) targetGroupARNs() (arns []string) { - if arn, ok := o[outputTargetGroupARN]; ok { - arns = append(arns, arn) - } - if arn, ok := o[outputHTTPTargetGroupARN]; ok { - arns = append(arns, arn) + for k, v := range o { + if strings.Contains(k, "TargetGroupARN") { + arns = append(arns, v) + } } return } @@ -130,6 +150,7 @@ func convertStackParameters(parameters []*cloudformation.Parameter) map[string]s const ( // The following constants should be part of the Output section of the CloudFormation template outputLoadBalancerDNSName = "LoadBalancerDNSName" + outputLoadBalancerARN = "LoadBalancerARN" outputTargetGroupARN = "TargetGroupARN" outputHTTPTargetGroupARN = "HTTPTargetGroupARN" @@ -149,6 +170,7 @@ const ( parameterLoadBalancerTypeParameter = "Type" parameterLoadBalancerWAFWebACLIDParameter = "LoadBalancerWAFWebACLIDParameter" parameterHTTP2Parameter = "HTTP2" + parameterCascadeParameter = "Cascade" ) type stackSpec struct { @@ -183,6 +205,8 @@ type stackSpec struct { cwAlarms CloudWatchAlarmList httpRedirectToHTTPS bool nlbCrossZone bool + extraListeners []ExtraListener + cascade bool http2 bool denyInternalDomains bool denyInternalDomainsResponse denyResp @@ -230,6 +254,7 @@ func createStack(svc cloudformationiface.CloudFormationAPI, spec *stackSpec) (st cfParam(parameterIpAddressTypeParameter, spec.ipAddressType), cfParam(parameterLoadBalancerTypeParameter, spec.loadbalancerType), cfParam(parameterHTTP2Parameter, fmt.Sprintf("%t", spec.http2)), + cfParam(parameterCascadeParameter, fmt.Sprintf("%t", spec.cascade)), }, Tags: tagMapToCloudformationTags(tags), TemplateBody: aws.String(template), @@ -272,6 +297,11 @@ func createStack(svc cloudformationiface.CloudFormationAPI, spec *stackSpec) (st params.Tags = append(params.Tags, cfTag(cwAlarmConfigHashTag, spec.cwAlarms.Hash())) } + if len(spec.extraListeners) > 0 { + listeners, _ := json.Marshal(spec.extraListeners) + params.Tags = append(params.Tags, cfTag(extraListenersTag, base64.StdEncoding.EncodeToString(listeners))) + } + resp, err := svc.CreateStack(params) if err != nil { return spec.name, err @@ -305,6 +335,7 @@ func updateStack(svc cloudformationiface.CloudFormationAPI, spec *stackSpec) (st cfParam(parameterIpAddressTypeParameter, spec.ipAddressType), cfParam(parameterLoadBalancerTypeParameter, spec.loadbalancerType), cfParam(parameterHTTP2Parameter, fmt.Sprintf("%t", spec.http2)), + cfParam(parameterCascadeParameter, fmt.Sprintf("%t", spec.cascade)), }, Tags: tagMapToCloudformationTags(tags), TemplateBody: aws.String(template), @@ -345,6 +376,11 @@ func updateStack(svc cloudformationiface.CloudFormationAPI, spec *stackSpec) (st params.Tags = append(params.Tags, cfTag(cwAlarmConfigHashTag, spec.cwAlarms.Hash())) } + if len(spec.extraListeners) > 0 { + listeners, _ := json.Marshal(spec.extraListeners) + params.Tags = append(params.Tags, cfTag(extraListenersTag, base64.StdEncoding.EncodeToString(listeners))) + } + if spec.stackTerminationProtection { params := &cloudformation.UpdateTerminationProtectionInput{ StackName: aws.String(spec.name), @@ -456,6 +492,7 @@ func mapToManagedStack(stack *cloudformation.Stack) *Stack { certificateARNs := make(map[string]time.Time, len(tags)) ownerIngress := "" + var extraListeners []ExtraListener for key, value := range tags { if strings.HasPrefix(key, certificateARNTagPrefix) { arn := strings.TrimPrefix(key, certificateARNTagPrefix) @@ -475,6 +512,11 @@ func mapToManagedStack(stack *cloudformation.Stack) *Stack { if key == ingressOwnerTag { ownerIngress = value } + + if key == extraListenersTag { + decodedListeners, _ := base64.StdEncoding.DecodeString(value) + json.Unmarshal(decodedListeners, &extraListeners) + } } http2 := true @@ -482,6 +524,11 @@ func mapToManagedStack(stack *cloudformation.Stack) *Stack { http2 = false } + cascade := false + if parameters[parameterCascadeParameter] == "true" { + cascade = true + } + return &Stack{ Name: aws.StringValue(stack.StackName), DNSName: outputs.dnsName(), @@ -499,6 +546,9 @@ func mapToManagedStack(stack *cloudformation.Stack) *Stack { statusReason: aws.StringValue(stack.StackStatusReason), CWAlarmConfigHash: tags[cwAlarmConfigHashTag], WAFWebACLID: parameters[parameterLoadBalancerWAFWebACLIDParameter], + ExtraListeners: extraListeners, + Cascade: cascade, + loadbalancerARN: outputs.lbARN(), } } diff --git a/aws/cf_template.go b/aws/cf_template.go index 75a2b7b2..1ddaed35 100644 --- a/aws/cf_template.go +++ b/aws/cf_template.go @@ -104,6 +104,11 @@ func generateTemplate(spec *stackSpec) (string, error) { Description: "H2 Enabled", Default: "true", }, + parameterCascadeParameter: &cloudformation.Parameter{ + Type: "String", + Description: "Cascade to ALB Enabled", + Default: "false", + }, } if spec.wafWebAclId != "" { @@ -120,28 +125,41 @@ func generateTemplate(spec *stackSpec) (string, error) { Description: "DNS name for the LoadBalancer", Value: cloudformation.GetAtt("LB", "DNSName").String(), }, + outputLoadBalancerARN: &cloudformation.Output{ + Description: "ARN of the LoadBalancer", + Value: cloudformation.Ref("LB").String(), + }, outputTargetGroupARN: &cloudformation.Output{ - Description: "The ARN of the TargetGroup", + Description: "The ARN of the main TargetGroup", Value: cloudformation.Ref(httpsTargetGroupName).String(), }, } - template.AddResource(httpsTargetGroupName, newTargetGroup(spec, parameterTargetGroupTargetPortParameter)) + listener := ExtraListener{} + if spec.cascade { + listener = ExtraListener{ListenProtocol: "TCP", ListenPort: 443, TargetPort: 443, cascade: true} + } + template.AddResource(httpsTargetGroupName, newTargetGroup(spec, parameterTargetGroupTargetPortParameter, listener)) if !spec.httpDisabled { // Use the same target group for HTTP Listener or create another one if needed httpTargetGroupName := httpsTargetGroupName - if spec.httpTargetPort != spec.targetPort { + if spec.httpTargetPort != spec.targetPort || spec.cascade { httpTargetGroupName = "TGHTTP" - template.Parameters[parameterTargetGroupHTTPTargetPortParameter] = &cloudformation.Parameter{ - Type: "Number", - Description: "The HTTP target port", + listener := ExtraListener{} + if spec.cascade { + listener = ExtraListener{ListenProtocol: "TCP", ListenPort: 80, TargetPort: 80, cascade: true} + } else { + template.Parameters[parameterTargetGroupHTTPTargetPortParameter] = &cloudformation.Parameter{ + Type: "Number", + Description: "The HTTP target port", + } } template.Outputs[outputHTTPTargetGroupARN] = &cloudformation.Output{ Description: "The ARN of the HTTP TargetGroup", Value: cloudformation.Ref(httpTargetGroupName).String(), } - template.AddResource(httpTargetGroupName, newTargetGroup(spec, parameterTargetGroupHTTPTargetPortParameter)) + template.AddResource(httpTargetGroupName, newTargetGroup(spec, parameterTargetGroupHTTPTargetPortParameter, listener)) } // Add an HTTP Listener resource @@ -204,7 +222,7 @@ func generateTemplate(spec *stackSpec) (string, error) { } } - if len(spec.certificateARNs) > 0 { + if len(spec.certificateARNs) > 0 && !spec.cascade { // Sort the certificate names so we have a stable order. certificateARNs := make([]string, 0, len(spec.certificateARNs)) for certARN := range spec.certificateARNs { @@ -244,7 +262,7 @@ func generateTemplate(spec *stackSpec) (string, error) { ), ) } - } else if spec.loadbalancerType == LoadBalancerTypeNetwork { + } else if spec.loadbalancerType == LoadBalancerTypeNetwork && !spec.cascade { template.AddResource("HTTPSListener", &cloudformation.ElasticLoadBalancingV2Listener{ DefaultActions: &cloudformation.ElasticLoadBalancingV2ListenerActionList{ { @@ -282,6 +300,41 @@ func generateTemplate(spec *stackSpec) (string, error) { } + if spec.loadbalancerType == LoadBalancerTypeNetwork { + if spec.cascade { + template.AddResource("HTTPSListener", &cloudformation.ElasticLoadBalancingV2Listener{ + DefaultActions: &cloudformation.ElasticLoadBalancingV2ListenerActionList{ + { + Type: cloudformation.String("forward"), + TargetGroupArn: cloudformation.Ref(httpsTargetGroupName).String(), + }, + }, + LoadBalancerArn: cloudformation.Ref("LB").String(), + Port: cloudformation.Integer(443), + Protocol: cloudformation.String("TCP"), + }) + } + for idx, listener := range spec.extraListeners { + tgName := fmt.Sprintf("ExtraTG%d", idx) + template.Outputs[fmt.Sprintf("%sTargetGroupARN", tgName)] = &cloudformation.Output{ + Description: fmt.Sprintf("The ARN of the %s TargetGroup", tgName), + Value: cloudformation.Ref(tgName).String(), + } + template.AddResource(tgName, newTargetGroup(spec, parameterTargetGroupHTTPTargetPortParameter, listener)) + template.AddResource(fmt.Sprintf("ExtraListener%d", idx), &cloudformation.ElasticLoadBalancingV2Listener{ + DefaultActions: &cloudformation.ElasticLoadBalancingV2ListenerActionList{ + { + Type: cloudformation.String("forward"), + TargetGroupArn: cloudformation.Ref(tgName).String(), + }, + }, + LoadBalancerArn: cloudformation.Ref("LB").String(), + Port: cloudformation.Integer(listener.ListenPort), + Protocol: cloudformation.String(listener.ListenProtocol), + }) + } + } + // Build up the LoadBalancerAttributes list, as there is no way to make attributes conditional in the template lbAttrList := make(cloudformation.ElasticLoadBalancingV2LoadBalancerLoadBalancerAttributeList, 0, 4) @@ -444,17 +497,42 @@ func generateDenyInternalTrafficRule(listenerName string, rulePriority int64, in } } -func newTargetGroup(spec *stackSpec, targetPortParameter string) *cloudformation.ElasticLoadBalancingV2TargetGroup { +func newTargetGroup(spec *stackSpec, targetPortParameter string, listener ExtraListener) *cloudformation.ElasticLoadBalancingV2TargetGroup { targetType := elbv2.TargetTypeEnumInstance if spec.targetAccessModeCNI { targetType = elbv2.TargetTypeEnumIp } protocol := "HTTP" healthCheckProtocol := "HTTP" + port := cloudformation.Ref(targetPortParameter).Integer() + healthCheckPort := cloudformation.Ref(parameterTargetGroupHealthCheckPortParameter).String() healthyThresholdCount, unhealthyThresholdCount := spec.albHealthyThresholdCount, spec.albUnhealthyThresholdCount + + tgAttributes := &cloudformation.ElasticLoadBalancingV2TargetGroupTargetGroupAttributeList{ + { + Key: cloudformation.String("deregistration_delay.timeout_seconds"), + Value: cloudformation.String(fmt.Sprintf("%d", spec.deregistrationDelayTimeoutSeconds)), + }, + } + if spec.loadbalancerType == LoadBalancerTypeNetwork { protocol = "TCP" - healthCheckProtocol = "HTTP" + if listener.ListenProtocol != "" { + protocol, healthCheckProtocol = listener.ListenProtocol, listener.ListenProtocol + port = cloudformation.Integer(listener.TargetPort) + healthCheckPort = cloudformation.String(fmt.Sprintf("%d", listener.TargetPort)) + if listener.cascade { + targetType = elbv2.TargetTypeEnumAlb + tgAttributes = &cloudformation.ElasticLoadBalancingV2TargetGroupTargetGroupAttributeList{} + switch listener.TargetPort { + case 443: + healthCheckProtocol = "HTTPS" + case 80: + healthCheckProtocol = "HTTP" + } + } + } + // For NLBs the healthy and unhealthy threshold count value must be equal healthyThresholdCount, unhealthyThresholdCount = spec.nlbHealthyThresholdCount, spec.nlbHealthyThresholdCount } else if spec.targetHTTPS { @@ -462,25 +540,42 @@ func newTargetGroup(spec *stackSpec, targetPortParameter string) *cloudformation healthCheckProtocol = "HTTPS" } + interval := cloudformation.Ref(parameterTargetGroupHealthCheckIntervalParameter).Integer() + healthCheckPath := cloudformation.Ref(parameterTargetGroupHealthCheckPathParameter).String() + if protocol == "TCP" || healthCheckProtocol == "TCP" { + interval = cloudformation.Integer(10) + if healthCheckProtocol == "TCP" { + healthCheckPath = cloudformation.Ref("AWS::NoValue").String() + } + } + targetGroup := &cloudformation.ElasticLoadBalancingV2TargetGroup{ - TargetGroupAttributes: &cloudformation.ElasticLoadBalancingV2TargetGroupTargetGroupAttributeList{ - { - Key: cloudformation.String("deregistration_delay.timeout_seconds"), - Value: cloudformation.String(fmt.Sprintf("%d", spec.deregistrationDelayTimeoutSeconds)), - }, - }, - HealthCheckIntervalSeconds: cloudformation.Ref(parameterTargetGroupHealthCheckIntervalParameter).Integer(), - HealthCheckPath: cloudformation.Ref(parameterTargetGroupHealthCheckPathParameter).String(), - HealthCheckPort: cloudformation.Ref(parameterTargetGroupHealthCheckPortParameter).String(), + TargetGroupAttributes: tgAttributes, + HealthCheckIntervalSeconds: interval, + HealthCheckPath: healthCheckPath, + HealthCheckPort: healthCheckPort, HealthCheckProtocol: cloudformation.String(healthCheckProtocol), HealthyThresholdCount: cloudformation.Integer(int64(healthyThresholdCount)), UnhealthyThresholdCount: cloudformation.Integer(int64(unhealthyThresholdCount)), - Port: cloudformation.Ref(targetPortParameter).Integer(), + Port: port, Protocol: cloudformation.String(protocol), TargetType: cloudformation.String(targetType), VPCID: cloudformation.Ref(parameterTargetGroupVPCIDParameter).String(), } + if listener.PodLabel != "" { + targetGroup.Tags = &cloudformation.TagList{ + { + Key: cloudformation.String(podLabelTag), + Value: cloudformation.String(listener.PodLabel), + }, + { + Key: cloudformation.String(podNamespaceTag), + Value: cloudformation.String(listener.Namespace), + }, + } + } + // custom target group healthcheck only supported when the target group protocol is != TCP if protocol != "TCP" { targetGroup.HealthCheckTimeoutSeconds = cloudformation.Ref(parameterTargetGroupHealthCheckTimeoutParameter).Integer() diff --git a/kubernetes/adapter.go b/kubernetes/adapter.go index 9263982d..8dee1e1c 100644 --- a/kubernetes/adapter.go +++ b/kubernetes/adapter.go @@ -1,6 +1,7 @@ package kubernetes import ( + "encoding/json" "errors" "fmt" "strings" @@ -23,6 +24,7 @@ type Adapter struct { ingressDefaultLoadBalancerType string clusterLocalDomain string routeGroupSupport bool + extraCNIEndpoints []aws.CNIEndpoint } type IngressType string @@ -78,6 +80,8 @@ type Ingress struct { ClusterLocal bool CertificateARN string Hostname string + ExtraListeners []aws.ExtraListener + Cascade bool Scheme string SecurityGroup string SSLPolicy string @@ -210,6 +214,32 @@ func (a *Adapter) newIngress(typ IngressType, metadata kubeItemMetadata, host st wafWebAclId, hasWAF := annotations[ingressWAFWebACLIDAnnotation] + var extraListeners []aws.ExtraListener + rawlisteners, hasExtraListeners := annotations[ingressNLBExtraListenersAnnotation] + if hasExtraListeners { + if loadBalancerType != loadBalancerTypeNLB { + return nil, errors.New("extra listeners are only supported on NLBs") + } + if err := json.Unmarshal([]byte(rawlisteners), &extraListeners); err != nil { + return nil, fmt.Errorf("unable to parse aws-nlb-extra-listeners annotation: %v", err) + } + for idx, listener := range extraListeners { + if listener.ListenProtocol != "TCP" && listener.ListenProtocol != "UDP" && listener.ListenProtocol != "TCP_UDP" { + return nil, errors.New("only TCP, UDP, or TCP_UDP are allowed as protocols for extra listeners") + } + extraListeners[idx].Namespace = metadata.Namespace + a.extraCNIEndpoints = append(a.extraCNIEndpoints, aws.CNIEndpoint{Namespace: metadata.Namespace, Podlabel: listener.PodLabel}) + } + } + + cascade := true + if getAnnotationsString(annotations, ingressNLBCascadeAnnotation, "false") == "false" { + cascade = false + } + if cascade && loadBalancerType != loadBalancerTypeNLB { + return nil, errors.New("cascading is only supported on NLBs") + } + if (loadBalancerType == loadBalancerTypeNLB) && (hasSG || hasWAF) { if hasLB { return nil, errors.New("security group or WAF are not supported by NLB (configured by annotation)") @@ -251,6 +281,8 @@ func (a *Adapter) newIngress(typ IngressType, metadata kubeItemMetadata, host st LoadBalancerType: loadBalancerType, WAFWebACLID: wafWebAclId, HTTP2: http2, + ExtraListeners: extraListeners, + Cascade: cascade, }, nil } diff --git a/kubernetes/ingress.go b/kubernetes/ingress.go index e1da9fcb..1e63f370 100644 --- a/kubernetes/ingress.go +++ b/kubernetes/ingress.go @@ -61,21 +61,23 @@ type ingressLoadBalancer struct { const ( // ingressALBIPAddressType is used in external-dns, https://github.com/kubernetes-incubator/external-dns/pull/1079 - ingressALBIPAddressType = "alb.ingress.kubernetes.io/ip-address-type" - IngressAPIVersionExtensions = "extensions/v1beta1" - IngressAPIVersionNetworking = "networking.k8s.io/v1beta1" - IngressAPIVersionNetworkingV1 = "networking.k8s.io/v1" - ingressListResource = "/apis/%s/ingresses" - ingressPatchStatusResource = "/apis/%s/namespaces/%s/ingresses/%s/status" - ingressCertificateARNAnnotation = "zalando.org/aws-load-balancer-ssl-cert" - ingressSchemeAnnotation = "zalando.org/aws-load-balancer-scheme" - ingressSharedAnnotation = "zalando.org/aws-load-balancer-shared" - ingressSecurityGroupAnnotation = "zalando.org/aws-load-balancer-security-group" - ingressSSLPolicyAnnotation = "zalando.org/aws-load-balancer-ssl-policy" - ingressLoadBalancerTypeAnnotation = "zalando.org/aws-load-balancer-type" - ingressHTTP2Annotation = "zalando.org/aws-load-balancer-http2" - ingressWAFWebACLIDAnnotation = "zalando.org/aws-waf-web-acl-id" - ingressClassAnnotation = "kubernetes.io/ingress.class" + ingressALBIPAddressType = "alb.ingress.kubernetes.io/ip-address-type" + IngressAPIVersionExtensions = "extensions/v1beta1" + IngressAPIVersionNetworking = "networking.k8s.io/v1beta1" + IngressAPIVersionNetworkingV1 = "networking.k8s.io/v1" + ingressListResource = "/apis/%s/ingresses" + ingressPatchStatusResource = "/apis/%s/namespaces/%s/ingresses/%s/status" + ingressCertificateARNAnnotation = "zalando.org/aws-load-balancer-ssl-cert" + ingressSchemeAnnotation = "zalando.org/aws-load-balancer-scheme" + ingressSharedAnnotation = "zalando.org/aws-load-balancer-shared" + ingressSecurityGroupAnnotation = "zalando.org/aws-load-balancer-security-group" + ingressSSLPolicyAnnotation = "zalando.org/aws-load-balancer-ssl-policy" + ingressLoadBalancerTypeAnnotation = "zalando.org/aws-load-balancer-type" + ingressHTTP2Annotation = "zalando.org/aws-load-balancer-http2" + ingressWAFWebACLIDAnnotation = "zalando.org/aws-waf-web-acl-id" + ingressNLBExtraListenersAnnotation = "zalando.org/aws-nlb-extra-listeners" + ingressNLBCascadeAnnotation = "zalando.org/aws-nlb-cascade-http-to-alb" + ingressClassAnnotation = "kubernetes.io/ingress.class" ) func getAnnotationsString(annotations map[string]string, key string, defaultValue string) string { diff --git a/kubernetes/pods.go b/kubernetes/pods.go index a4f42c3a..04c50090 100644 --- a/kubernetes/pods.go +++ b/kubernetes/pods.go @@ -3,13 +3,12 @@ package kubernetes import ( "context" "fmt" - "sort" "sync" "time" log "github.com/sirupsen/logrus" + "github.com/zalando-incubator/kube-ingress-aws-controller/aws" corev1 "k8s.io/api/core/v1" - apisv1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" @@ -17,19 +16,34 @@ import ( const resyncInterval = 1 * time.Minute +func (a *Adapter) storeWatchedPods(pod *corev1.Pod, podEndpoints *sync.Map) { + for k, v := range pod.Labels { + selector := fmt.Sprintf("%s=%s", k, v) + if pod.Namespace == a.cniPodNamespace && selector == a.cniPodLabelSelector { + log.Infof("New discovered pod: %s IP: %s", pod.Name, pod.Status.PodIP) + podEndpoints.LoadOrStore(pod.Name, aws.CNIEndpoint{IPAddress: pod.Status.PodIP}) + } + for _, endpoint := range a.extraCNIEndpoints { + if endpoint.Namespace == pod.Namespace && endpoint.Podlabel == selector { + log.Infof("New discovered pod: %s IP: %s", pod.Name, pod.Status.PodIP) + podEndpoints.LoadOrStore(pod.Name, aws.CNIEndpoint{IPAddress: pod.Status.PodIP, Namespace: pod.Namespace, Podlabel: selector}) + } + } + } +} + // PodInformer is a event handler for Pod events registered to, that builds a local list of valid and relevant pods // and sends an event to the endpoint channel, triggering a resync of the targets. -func (a *Adapter) PodInformer(ctx context.Context, endpointChan chan<- []string) (err error) { +func (a *Adapter) PodInformer(ctx context.Context, endpointChan chan<- []aws.CNIEndpoint) (err error) { podEndpoints := sync.Map{} - log.Infof("Watching for Pods with labelselector %s in namespace %s", a.cniPodLabelSelector, a.cniPodNamespace) - factory := informers.NewSharedInformerFactoryWithOptions(a.clientset, resyncInterval, informers.WithNamespace(a.cniPodNamespace), - informers.WithTweakListOptions(func(options *apisv1.ListOptions) { options.LabelSelector = a.cniPodLabelSelector })) + // log.Infof("Watching for Pods with labelselector %s in namespace %s", a.cniPodLabelSelector, a.cniPodNamespace) + factory := informers.NewSharedInformerFactoryWithOptions(a.clientset, resyncInterval) informer := factory.Core().V1().Pods().Informer() factory.Start(ctx.Done()) if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) { - return fmt.Errorf("Timed out waiting for caches to sync") + return fmt.Errorf("timed out waiting for caches to sync") } // list warms the pod cache and verifies whether pods for given specs can be found, preventing to fail silently @@ -39,12 +53,12 @@ func (a *Adapter) PodInformer(ctx context.Context, endpointChan chan<- []string) if err == nil && len(podList) > 0 { break } - log.Errorf("Error listing Pods with labelselector %s in namespace %s: %v", a.cniPodNamespace, a.cniPodLabelSelector, err) + log.Errorf("error listing Pods: %v", err) time.Sleep(resyncInterval) } for _, pod := range podList { if !isPodTerminating(pod) && isPodRunning(pod) { - podEndpoints.Store(pod.Name, pod.Status.PodIP) + a.storeWatchedPods(pod, &podEndpoints) } } queueEndpoints(&podEndpoints, endpointChan) @@ -68,11 +82,12 @@ func (a *Adapter) PodInformer(ctx context.Context, endpointChan chan<- []string) queueEndpoints(&podEndpoints, endpointChan) case isPodRunning(pod): - if _, isStored := podEndpoints.LoadOrStore(pod.Name, pod.Status.PodIP); isStored { + if _, isStored := podEndpoints.Load(pod.Name); isStored { return } - log.Infof("New discovered pod: %s IP: %s", pod.Name, pod.Status.PodIP) + a.storeWatchedPods(pod, &podEndpoints) queueEndpoints(&podEndpoints, endpointChan) + } }, }) @@ -80,13 +95,12 @@ func (a *Adapter) PodInformer(ctx context.Context, endpointChan chan<- []string) return nil } -func queueEndpoints(podEndpoints *sync.Map, endpointChan chan<- []string) { - podList := []string{} +func queueEndpoints(podEndpoints *sync.Map, endpointChan chan<- []aws.CNIEndpoint) { + podList := []aws.CNIEndpoint{} podEndpoints.Range(func(key, value interface{}) bool { - podList = append(podList, value.(string)) + podList = append(podList, value.(aws.CNIEndpoint)) return true }) - sort.StringSlice(podList).Sort() endpointChan <- podList } diff --git a/kubernetes/pods_test.go b/kubernetes/pods_test.go index a75aa3d2..ab75182b 100644 --- a/kubernetes/pods_test.go +++ b/kubernetes/pods_test.go @@ -12,6 +12,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" + "github.com/zalando-incubator/kube-ingress-aws-controller/aws" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" @@ -34,7 +35,7 @@ func kubeconfig() *kubernetes.Clientset { } func ExampleAdapter_PodInformer() { - epCh := make(chan []string, 10) + epCh := make(chan []aws.CNIEndpoint, 10) a := Adapter{ clientset: kubeconfig(), cniPodNamespace: "kube-system", @@ -63,7 +64,7 @@ func TestAdapter_PodInformer(t *testing.T) { client := fake.NewSimpleClientset() a.clientset = client - pods := make(chan []string, 10) + pods := make(chan []aws.CNIEndpoint, 10) t.Run("initial state of five ready pods, a terminating and pending one", func(t *testing.T) { for i := 1; i <= 5; i++ { diff --git a/worker.go b/worker.go index f7fceedd..d55098cd 100644 --- a/worker.go +++ b/worker.go @@ -36,6 +36,8 @@ type loadBalancer struct { certTTL time.Duration cwAlarms aws.CloudWatchAlarmList loadBalancerType string + extraListeners []aws.ExtraListener + cascade bool } const ( @@ -379,6 +381,8 @@ func getAllLoadBalancers(certs CertificatesFinder, certTTL time.Duration, stacks ipAddressType: stack.IpAddressType, loadBalancerType: stack.LoadBalancerType, http2: stack.HTTP2, + extraListeners: stack.ExtraListeners, + cascade: stack.Cascade, wafWebACLID: stack.WAFWebACLID, certTTL: certTTL, } @@ -472,6 +476,8 @@ func matchIngressesToLoadBalancers( loadBalancerType: ingress.LoadBalancerType, http2: ingress.HTTP2, wafWebACLID: ingress.WAFWebACLID, + extraListeners: ingress.ExtraListeners, + cascade: ingress.Cascade, }, ) } @@ -529,7 +535,7 @@ func createStack(awsAdapter *aws.Adapter, lb *loadBalancer, problems *problem.Li log.Infof("Creating stack for certificates %q / ingress %q", certificates, lb.ingresses) - stackId, err := awsAdapter.CreateStack(certificates, lb.scheme, lb.securityGroup, lb.Owner(), lb.sslPolicy, lb.ipAddressType, lb.wafWebACLID, lb.cwAlarms, lb.loadBalancerType, lb.http2) + stackId, err := awsAdapter.CreateStack(certificates, lb.scheme, lb.securityGroup, lb.Owner(), lb.sslPolicy, lb.ipAddressType, lb.wafWebACLID, lb.cwAlarms, lb.loadBalancerType, lb.http2, lb.extraListeners, lb.cascade) if err != nil { if isAlreadyExistsError(err) { lb.stack, err = awsAdapter.GetStack(stackId) @@ -549,7 +555,7 @@ func updateStack(awsAdapter *aws.Adapter, lb *loadBalancer, problems *problem.Li log.Infof("Updating %q stack for %d certificates / %d ingresses", lb.scheme, len(certificates), len(lb.ingresses)) - stackId, err := awsAdapter.UpdateStack(lb.stack.Name, certificates, lb.scheme, lb.securityGroup, lb.Owner(), lb.sslPolicy, lb.ipAddressType, lb.wafWebACLID, lb.cwAlarms, lb.loadBalancerType, lb.http2) + stackId, err := awsAdapter.UpdateStack(lb.stack.Name, certificates, lb.scheme, lb.securityGroup, lb.Owner(), lb.sslPolicy, lb.ipAddressType, lb.wafWebACLID, lb.cwAlarms, lb.loadBalancerType, lb.http2, lb.extraListeners, lb.cascade) if isNoUpdatesToBePerformedError(err) { log.Debugf("Stack(%q) is already up to date", certificates) } else if err != nil { @@ -664,19 +670,21 @@ func getCloudWatchAlarmsFromConfigMap(configMap *kubernetes.ConfigMap) aws.Cloud // cniEventHandler syncronizes the events from kubernetes and the status updates from the load balancer controller. // Events updates a rate limited. func cniEventHandler(ctx context.Context, targetCNIcfg *aws.TargetCNIconfig, - targetSetter func([]string, []string) error, informer func(context.Context, chan<- []string) error) { + targetSetter func([]aws.CNIEndpoint, []aws.TargetGroupWithLabels) error, informer func(context.Context, chan<- []aws.CNIEndpoint) error) { log.Infoln("Starting CNI event handler") rateLimiter := time.NewTicker(cniEventRateLimit) defer rateLimiter.Stop() - endpointCh := make(chan []string, 10) + endpointCh := make(chan []aws.CNIEndpoint, 10) go informer(ctx, endpointCh) - var cniTargetGroupARNs, endpoints []string + var cniTargetGroupARNs []aws.TargetGroupWithLabels + var endpoints []aws.CNIEndpoint for { select { case <-ctx.Done(): + log.Debugln("received a cancel signal, eventHandler exiting") return case cniTargetGroupARNs = <-targetCNIcfg.TargetGroupCh: log.Debugf("new message target groups: %v", cniTargetGroupARNs) diff --git a/worker_test.go b/worker_test.go index 87abccba..353b1241 100644 --- a/worker_test.go +++ b/worker_test.go @@ -1006,21 +1006,22 @@ func TestDoWorkPanicReturnsProblem(t *testing.T) { func Test_cniEventHandler(t *testing.T) { t.Run("handles messages from channels and calls update functions", func(t *testing.T) { - targetCNIcfg := &aws.TargetCNIconfig{TargetGroupCh: make(chan []string, 10)} - targetCNIcfg.TargetGroupCh <- []string{"bar", "baz"} - targetCNIcfg.TargetGroupCh <- []string{"foo"} // flush + targetCNIcfg := &aws.TargetCNIconfig{TargetGroupCh: make(chan []aws.TargetGroupWithLabels, 10)} + targetCNIcfg.TargetGroupCh <- []aws.TargetGroupWithLabels{{ARN: "bar"}, {ARN: "baz"}} + targetCNIcfg.TargetGroupCh <- []aws.TargetGroupWithLabels{{ARN: "foo"}} // flush mutex := &sync.Mutex{} - var targetSet, cniTGARNs []string - mockTargetSetter := func(endpoints, cniTargetGroupARNs []string) error { + var targetSet []aws.CNIEndpoint + var cniTGARNs []aws.TargetGroupWithLabels + mockTargetSetter := func(endpoints []aws.CNIEndpoint, cniTargetGroupARNs []aws.TargetGroupWithLabels) error { mutex.Lock() targetSet = endpoints cniTGARNs = cniTargetGroupARNs mutex.Unlock() return nil } - mockInformer := func(_ context.Context, c chan<- []string) error { - c <- []string{"4.3.2.1", "4.3.2.1"} - c <- []string{"1.2.3.4"} // flush + mockInformer := func(_ context.Context, c chan<- []aws.CNIEndpoint) error { + c <- []aws.CNIEndpoint{{IPAddress: "4.3.2.1"}, {IPAddress: "4.3.2.1"}} + c <- []aws.CNIEndpoint{{IPAddress: "1.2.3.4"}} // flush return nil } ctx, cl := context.WithCancel(context.Background()) @@ -1030,11 +1031,11 @@ func Test_cniEventHandler(t *testing.T) { require.Eventually(t, func() bool { mutex.Lock() defer mutex.Unlock() - return reflect.DeepEqual(targetSet, []string{"1.2.3.4"}) + return reflect.DeepEqual(targetSet, []aws.CNIEndpoint{{IPAddress: "1.2.3.4"}}) }, wait.ForeverTestTimeout, time.Millisecond*100) require.Eventually(t, func() bool { - return reflect.DeepEqual(cniTGARNs, []string{"foo"}) + return reflect.DeepEqual(cniTGARNs, []aws.TargetGroupWithLabels{{ARN: "foo"}}) }, wait.ForeverTestTimeout, time.Millisecond*100) }) }