diff --git a/pkg/openstack/loadbalancer.go b/pkg/openstack/loadbalancer.go index d31e41085a..c5402b4794 100644 --- a/pkg/openstack/loadbalancer.go +++ b/pkg/openstack/loadbalancer.go @@ -431,15 +431,6 @@ func getSecurityGroupName(service *corev1.Service) string { return securityGroupName } -func getSecurityGroupRules(client *gophercloud.ServiceClient, opts rules.ListOpts) ([]rules.SecGroupRule, error) { - mc := metrics.NewMetricContext("security_group_rule", "list") - page, err := rules.List(client, opts).AllPages() - if mc.ObserveRequest(err) != nil { - return nil, err - } - return rules.ExtractRules(page) -} - func getListenerProtocol(protocol corev1.Protocol, svcConf *serviceConfig) listeners.Protocol { // Make neutron-lbaas code work if svcConf != nil { @@ -2338,7 +2329,7 @@ func (lbaas *LbaasV2) ensureAndUpdateOctaviaSecurityGroup(clusterName string, ap cidrs = svcConf.allowedCIDR } - existingRules, err := getSecurityGroupRules(lbaas.network, rules.ListOpts{SecGroupID: lbSecGroupID}) + existingRules, err := openstackutil.GetSecurityGroupRules(lbaas.network, rules.ListOpts{SecGroupID: lbSecGroupID}) if err != nil { return fmt.Errorf( "failed to find security group rules in %s: %v", lbSecGroupID, err) diff --git a/pkg/openstack/routes.go b/pkg/openstack/routes.go index 716ad6a172..e02e24d9f5 100644 --- a/pkg/openstack/routes.go +++ b/pkg/openstack/routes.go @@ -18,27 +18,33 @@ package openstack import ( "context" + "fmt" "net" "sync" "github.com/gophercloud/gophercloud" "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/layer3/extraroutes" "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/layer3/routers" + "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/security/groups" + "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/security/rules" "github.com/gophercloud/gophercloud/openstack/networking/v2/ports" + secgroups "github.com/gophercloud/utils/openstack/networking/v2/extensions/security/groups" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" cloudprovider "k8s.io/cloud-provider" "k8s.io/cloud-provider-openstack/pkg/metrics" - "k8s.io/cloud-provider-openstack/pkg/util/errors" + cpoerrors "k8s.io/cloud-provider-openstack/pkg/util/errors" + openstackutil "k8s.io/cloud-provider-openstack/pkg/util/openstack" "k8s.io/klog/v2" ) // Routes implements the cloudprovider.Routes for OpenStack clouds type Routes struct { - network *gophercloud.ServiceClient - os *OpenStack + network *gophercloud.ServiceClient + os *OpenStack + nodeSGId string // router's private network IDs networkIDs []string // whether Neutron supports "extraroute-atomic" extension @@ -53,38 +59,111 @@ var _ cloudprovider.Routes = &Routes{} // NewRoutes creates a new instance of Routes func NewRoutes(os *OpenStack, network *gophercloud.ServiceClient, atomicRoutes bool) (cloudprovider.Routes, error) { if os.routeOpts.RouterID == "" { - return nil, errors.ErrNoRouterID + return nil, cpoerrors.ErrNoRouterID } - return &Routes{ + routes := Routes{ network: network, os: os, atomicRoutes: atomicRoutes, - }, nil + } + + err := routes.getOrCreateNodeSecurityGroup(os.routeOpts.RouterID) + if err != nil { + return nil, err + } + + return &routes, nil +} + +func (r *Routes) getOrCreateNodeSecurityGroup(router_id string) error { + sgName := fmt.Sprintf("k8s-node-sg-%s", router_id) + sgId, err := secgroups.IDFromName(r.network, sgName) + if err != nil { + if cpoerrors.IsNotFound(err) { + mc := metrics.NewMetricContext("security_group", "create") + group, err := groups.Create(r.network, groups.CreateOpts{Name: sgName}).Extract() + if mc.ObserveRequest(err) != nil { + return err + } + sgId = group.ID + } else { + return err + } + } + r.nodeSGId = sgId + return nil } // ListRoutes lists all managed routes that belong to the specified clusterName func (r *Routes) ListRoutes(ctx context.Context, clusterName string) ([]*cloudprovider.Route, error) { klog.V(4).Infof("ListRoutes(%v)", clusterName) - if r.os.nodeInformerHasSynced == nil || !r.os.nodeInformerHasSynced() { - return nil, errors.ErrNoNodeInformer + return nil, cpoerrors.ErrNoNodeInformer } - nodes, err := r.os.nodeInformer.Lister().List(labels.Everything()) if err != nil { return nil, err } - mc := metrics.NewMetricContext("router", "get") router, err := routers.Get(r.network, r.os.routeOpts.RouterID).Extract() if mc.ObserveRequest(err) != nil { return nil, err } - routes := make([]*cloudprovider.Route, 0, len(router.Routes)) + + // detect router's private network ID for further VM ports filtering + r.networkIDs, err = getRouterNetworkIDs(r.network, r.os.routeOpts.RouterID) + if err != nil { + return nil, err + } + + var sgRuleForPodCidrFound, sgRuleForNodeAddress, nodePortBindSecurityGroup bool for _, item := range router.Routes { nodeName, foundNode := getNodeNameByAddr(item.NextHop, nodes) + if foundNode { + nodePortBindSecurityGroup = false + sgRuleForPodCidrFound = false + sgRuleForNodeAddress = false + // get the node port that the route next hop addr belong to + port, err := getPortByIP(r.network, item.NextHop, r.networkIDs) + if err != nil { + return nil, err + } + + // check if the node security group bind to the port + for _, sg := range port.SecurityGroups { + if sg == r.nodeSGId { + nodePortBindSecurityGroup = true + break + } + } + + // check whether the related ingress security group rule is existing + ip, _, _ := net.ParseCIDR(item.DestinationCIDR) + isCIDRv6 := ip.To4() == nil + nodeAddr := getAddrByNodeName(nodeName, isCIDRv6, nodes) + nodeAddrCidr := fmt.Sprintf("%s/32", nodeAddr) + rules, err := openstackutil.GetSecurityGroupRules(r.network, rules.ListOpts{SecGroupID: r.nodeSGId, Direction: string(rules.DirIngress)}) + if err != nil { + return nil, err + } + for _, rule := range rules { + if item.DestinationCIDR == rule.RemoteIPPrefix { + sgRuleForPodCidrFound = true + } + if rule.RemoteGroupID == r.nodeSGId || rule.RemoteIPPrefix == nodeAddrCidr { + sgRuleForNodeAddress = true + } + if sgRuleForPodCidrFound && sgRuleForNodeAddress { + break + } + } + + if !nodePortBindSecurityGroup || !sgRuleForPodCidrFound || !sgRuleForNodeAddress { + break + } + } route := cloudprovider.Route{ Name: item.DestinationCIDR, TargetNode: nodeName, //contains the nexthop address if node name was not found @@ -94,12 +173,6 @@ func (r *Routes) ListRoutes(ctx context.Context, clusterName string) ([]*cloudpr routes = append(routes, &route) } - // detect router's private network ID for further VM ports filtering - r.networkIDs, err = getRouterNetworkIDs(r.network, r.os.routeOpts.RouterID) - if err != nil { - return nil, err - } - return routes, nil } @@ -264,6 +337,64 @@ func updateAllowedAddressPairs(network *gophercloud.ServiceClient, port *ports.P return unwinder, nil } +func updateSecurityGroup(network *gophercloud.ServiceClient, port *ports.Port, sgs []string) (func(), error) { + origSgs := port.SecurityGroups + mc := metrics.NewMetricContext("port", "update") + _, err := ports.Update(network, port.ID, ports.UpdateOpts{ + SecurityGroups: &sgs, + }).Extract() + if mc.ObserveRequest(err) != nil { + return nil, err + } + + unwinder := func() { + klog.V(4).Infof("Reverting security-groups change to port %v", port.ID) + mc := metrics.NewMetricContext("port", "update") + _, err := ports.Update(network, port.ID, ports.UpdateOpts{ + SecurityGroups: &origSgs, + }).Extract() + if mc.ObserveRequest(err) != nil { + klog.Warningf("Unable to reset port's security-groups during error unwind: %v", err) + } + } + + return unwinder, nil +} + +func createSecurityGroupRule(network *gophercloud.ServiceClient, rule rules.CreateOpts) (func(), error) { + mc := metrics.NewMetricContext("security_group)rule", "create") + newRule, err := rules.Create(network, rule).Extract() + if mc.ObserveRequest(err) != nil { + return nil, err + } + unwinder := func() { + klog.V(4).Infof("Reverting security-group-rule create %v", newRule.ID) + mc := metrics.NewMetricContext("security_group_rule", "delete") + err := rules.Delete(network, newRule.ID).ExtractErr() + if mc.ObserveRequest(err) != nil { + klog.Warningf("Unable to revert security-group-rule create error unwind: %v", err) + } + } + return unwinder, nil +} + +func deleteSecurityGroupRule(network *gophercloud.ServiceClient, rule *rules.SecGroupRule) (func(), error) { + mc := metrics.NewMetricContext("security-group-rule", "delete") + err := rules.Delete(network, rule.ID).ExtractErr() + if mc.ObserveRequest(err) != nil { + return nil, err + } + unwinder := func() { + klog.V(4).Infof("Reverting security_group_rule delete %v", rule) + mc := metrics.NewMetricContext("security-group-rule", "create") + _, err := rules.Create(network, rules.CreateOpts{SecGroupID: rule.ID, RemoteIPPrefix: rule.RemoteIPPrefix}).Extract() + if mc.ObserveRequest(err) != nil { + klog.Warningf("Unable to revert security-group-rule delete error unwind: %v", err) + } + } + return unwinder, nil +} + // CreateRoute creates the described managed route func (r *Routes) CreateRoute(ctx context.Context, clusterName string, nameHint string, route *cloudprovider.Route) error { ip, _, _ := net.ParseCIDR(route.DestinationCIDR) @@ -273,9 +404,10 @@ func (r *Routes) CreateRoute(ctx context.Context, clusterName string, nameHint s if err != nil { return err } + addr := getAddrByNodeName(route.TargetNode, isCIDRv6, nodes) if addr == "" { - return errors.ErrNoAddressFound + return cpoerrors.ErrNoAddressFound } klog.V(4).Infof("CreateRoute(%v, %v, %v)", clusterName, nameHint, route) @@ -284,68 +416,139 @@ func (r *Routes) CreateRoute(ctx context.Context, clusterName string, nameHint s klog.V(4).Infof("Using nexthop %v for node %v", addr, route.TargetNode) - if !r.atomicRoutes { - // classical logic - r.Lock() - defer r.Unlock() + mc := metrics.NewMetricContext("router", "get") + router, err := routers.Get(r.network, r.os.routeOpts.RouterID).Extract() + if mc.ObserveRequest(err) != nil { + return err + } - mc := metrics.NewMetricContext("router", "get") - router, err := routers.Get(r.network, r.os.routeOpts.RouterID).Extract() - if mc.ObserveRequest(err) != nil { - return err + routeFound := false + for _, item := range router.Routes { + if item.DestinationCIDR == route.DestinationCIDR && item.NextHop == addr { + routeFound = true + break } + } - routes := router.Routes + if !routeFound { + if !r.atomicRoutes { + // classical logic + r.Lock() + defer r.Unlock() - for _, item := range routes { - if item.DestinationCIDR == route.DestinationCIDR && item.NextHop == addr { - klog.V(4).Infof("Skipping existing route: %v", route) - return nil + routes := append(router.Routes, routers.Route{ + DestinationCIDR: route.DestinationCIDR, + NextHop: addr, + }) + + unwind, err := updateRoutes(r.network, router, routes) + if err != nil { + return err } - } - routes = append(routes, routers.Route{ - DestinationCIDR: route.DestinationCIDR, - NextHop: addr, - }) + defer onFailure.call(unwind) + } else { + // atomic route update + route := []routers.Route{{ + DestinationCIDR: route.DestinationCIDR, + NextHop: addr, + }} + unwind, err := addRoute(r.network, r.os.routeOpts.RouterID, route) + if err != nil { + return err + } - unwind, err := updateRoutes(r.network, router, routes) - if err != nil { - return err + defer onFailure.call(unwind) } + } - defer onFailure.call(unwind) - } else { - // atomic route update - route := []routers.Route{{ - DestinationCIDR: route.DestinationCIDR, - NextHop: addr, - }} - unwind, err := addRoute(r.network, r.os.routeOpts.RouterID, route) + // get the port of addr on target node. + port, err := getPortByIP(r.network, addr, r.networkIDs) + if err != nil { + return err + } + + // var sgRuleForPodCidrFound, sgRuleForNodeAddress, nodePortBindSecurityGroup bool + nodePortBindSecurityGroup := false + for _, sg := range port.SecurityGroups { + if sg == r.nodeSGId { + nodePortBindSecurityGroup = true + break + } + } + if !nodePortBindSecurityGroup { + newSgs := append(port.SecurityGroups, r.nodeSGId) + unwind, err := updateSecurityGroup(r.network, port, newSgs) if err != nil { return err } - defer onFailure.call(unwind) } - // get the port of addr on target node. - port, err := getPortByIP(r.network, addr, r.networkIDs) + origRules, err := openstackutil.GetSecurityGroupRules(r.network, rules.ListOpts{SecGroupID: r.nodeSGId, Direction: string(rules.DirIngress)}) if err != nil { return err } + sgRuleForPodCidrFound := false + sgRuleForNodeAddress := false + nodeAddrCidr := fmt.Sprintf("%s/32", addr) + for _, rule := range origRules { + if rule.RemoteIPPrefix == route.DestinationCIDR { + sgRuleForPodCidrFound = true + } + if rule.RemoteGroupID == r.nodeSGId || rule.RemoteIPPrefix == nodeAddrCidr { + sgRuleForNodeAddress = true + } + if sgRuleForPodCidrFound && sgRuleForNodeAddress { + break + } + } + if !sgRuleForPodCidrFound { + etherType := rules.EtherType4 + if isCIDRv6 { + etherType = rules.EtherType6 + } + unwind, err := createSecurityGroupRule( + r.network, + rules.CreateOpts{ + SecGroupID: r.nodeSGId, + RemoteIPPrefix: route.DestinationCIDR, + Direction: rules.DirIngress, + EtherType: etherType}) + if err != nil { + return err + } + defer onFailure.call(unwind) + } + if !sgRuleForNodeAddress { + etherType := rules.EtherType4 + if isCIDRv6 { + etherType = rules.EtherType6 + } + unwind, err := createSecurityGroupRule( + r.network, + rules.CreateOpts{ + SecGroupID: r.nodeSGId, + RemoteIPPrefix: nodeAddrCidr, + Direction: rules.DirIngress, + EtherType: etherType}) + if err != nil { + return err + } + defer onFailure.call(unwind) + } - found := false + aapFound := false for _, item := range port.AllowedAddressPairs { if item.IPAddress == route.DestinationCIDR { klog.V(4).Infof("Found existing allowed-address-pair: %v", item) - found = true + aapFound = true break } } - - if !found { - newPairs := append(port.AllowedAddressPairs, ports.AddressPair{ + var newPairs []ports.AddressPair + if !aapFound { + newPairs = append(port.AllowedAddressPairs, ports.AddressPair{ IPAddress: route.DestinationCIDR, }) unwind, err := updateAllowedAddressPairs(r.network, port, newPairs) @@ -378,7 +581,7 @@ func (r *Routes) DeleteRoute(ctx context.Context, clusterName string, route *clo } addr = getAddrByNodeName(route.TargetNode, isCIDRv6, nodes) if addr == "" { - return errors.ErrNoAddressFound + return cpoerrors.ErrNoAddressFound } } @@ -444,17 +647,16 @@ func (r *Routes) DeleteRoute(ctx context.Context, clusterName string, route *clo } addrPairs := port.AllowedAddressPairs - index := -1 + addrPairIndex := -1 for i, item := range addrPairs { if item.IPAddress == route.DestinationCIDR { - index = i + addrPairIndex = i break } } - - if index != -1 { + if addrPairIndex != -1 { // Delete element `index` - addrPairs[index] = addrPairs[len(addrPairs)-1] + addrPairs[addrPairIndex] = addrPairs[len(addrPairs)-1] addrPairs = addrPairs[:len(addrPairs)-1] unwind, err := updateAllowedAddressPairs(r.network, port, addrPairs) @@ -464,6 +666,42 @@ func (r *Routes) DeleteRoute(ctx context.Context, clusterName string, route *clo defer onFailure.call(unwind) } + sgs := port.SecurityGroups + sgIndex := -1 + for i, item := range sgs { + if item == r.nodeSGId { + sgIndex = i + } + } + if sgIndex != -1 { + // Delete element `index` + sgs[sgIndex] = sgs[len(sgs)-1] + sgs = sgs[:len(sgs)-1] + unwind, err := updateSecurityGroup(r.network, port, sgs) + if err != nil { + return err + } + defer onFailure.call(unwind) + } + + origRules, err := openstackutil.GetSecurityGroupRules(r.network, rules.ListOpts{SecGroupID: r.nodeSGId, Direction: string(rules.DirIngress)}) + if err != nil { + return err + } + var staleRule *rules.SecGroupRule + for _, rule := range origRules { + if rule.RemoteIPPrefix == route.DestinationCIDR { + staleRule = &rule + } + } + if staleRule != nil { + unwind, err := deleteSecurityGroupRule(r.network, staleRule) + if err != nil { + return err + } + defer onFailure.call(unwind) + } + klog.V(4).Infof("Route deleted: %v", route) onFailure.disarm() return nil @@ -494,5 +732,5 @@ func getPortByIP(network *gophercloud.ServiceClient, addr string, networkIDs []s return &ports[0], nil } - return nil, errors.ErrNotFound + return nil, cpoerrors.ErrNotFound } diff --git a/pkg/util/openstack/security_group.go b/pkg/util/openstack/security_group.go new file mode 100644 index 0000000000..0e31eb9edd --- /dev/null +++ b/pkg/util/openstack/security_group.go @@ -0,0 +1,32 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package openstack + +import ( + "github.com/gophercloud/gophercloud" + "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/security/rules" + "k8s.io/cloud-provider-openstack/pkg/metrics" +) + +func GetSecurityGroupRules(client *gophercloud.ServiceClient, opts rules.ListOpts) ([]rules.SecGroupRule, error) { + mc := metrics.NewMetricContext("security_group_rule", "list") + page, err := rules.List(client, opts).AllPages() + if mc.ObserveRequest(err) != nil { + return nil, err + } + return rules.ExtractRules(page) +}