-
Notifications
You must be signed in to change notification settings - Fork 276
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Service LB SKU support(reconcile path) #7138
base: master
Are you sure you want to change the base?
Changes from all commits
7cd70f4
1dbe725
1e09069
e248832
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ import ( | |
"github.com/Azure/azure-sdk-for-go/services/network/mgmt/2022-07-01/network" | ||
|
||
v1 "k8s.io/api/core/v1" | ||
discovery_v1 "k8s.io/api/discovery/v1" | ||
cloudprovider "k8s.io/cloud-provider" | ||
"k8s.io/klog/v2" | ||
utilnet "k8s.io/utils/net" | ||
|
@@ -911,3 +912,203 @@ func removeNodeIPAddressesFromBackendPool( | |
|
||
return changed | ||
} | ||
|
||
type backendPoolTypePodIP struct { | ||
*Cloud | ||
} | ||
|
||
func newBackendPoolTypePodIP(c *Cloud) BackendPool { | ||
return &backendPoolTypePodIP{c} | ||
} | ||
|
||
func (bpi *backendPoolTypePodIP) CleanupVMSetFromBackendPoolByCondition(_ *network.LoadBalancer, _ *v1.Service, _ []*v1.Node, _ string, _ func(string) bool) (*network.LoadBalancer, error) { | ||
return nil, errors.New("CleanupVMSetFromBackendPoolByCondition is not implemented for pod IP backend pool") | ||
} | ||
|
||
func (bpi *backendPoolTypePodIP) EnsureHostsInPool(service *v1.Service, _ []*v1.Node, _, _, clusterName, lbName string, backendPool network.BackendAddressPool) error { | ||
isIPv6 := isBackendPoolIPv6(ptr.Deref(backendPool.Name, "")) | ||
|
||
var ( | ||
changed bool | ||
err error | ||
podIPsToBeAdded []string | ||
endpointSliceName string | ||
endpointSliceNames [] string | ||
numOfAdd int | ||
) | ||
|
||
endpointSliceList, err := bpi.getEndpointSliceListForService(service) | ||
|
||
if err != nil { | ||
klog.Errorf("bpi.EnsureHostsInPool: failed to get endpoint slice list for service %q, error: %s", service.Name, err.Error()) | ||
return err | ||
} | ||
|
||
lbBackendPoolName := bpi.getBackendPoolNameForService(service, clusterName, isIPv6) | ||
|
||
/* Remove all addresses from the backend pool and add the addresses from all the | ||
endpoint-slices pertaining to a service.*/ | ||
if strings.EqualFold(ptr.Deref(backendPool.Name, ""), lbBackendPoolName) && | ||
backendPool.BackendAddressPoolPropertiesFormat != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we escape early to avoid nesting? Also, should we handle the error case that "if" checks here? |
||
if backendPool.LoadBalancerBackendAddresses == nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need these checks? |
||
lbBackendPoolAddresses := make([]network.LoadBalancerBackendAddress, 0) | ||
backendPool.LoadBalancerBackendAddresses = &lbBackendPoolAddresses | ||
} else { | ||
removeNodeIPAddressesFromBackendPool(backendPool, []string{}, true, false) | ||
} | ||
|
||
for _, ES := range endpointSliceList { | ||
|
||
if ES.AddressType == discovery_v1.AddressTypeIPv6 && !isIPv6 { | ||
continue | ||
} | ||
|
||
if ES.AddressType == discovery_v1.AddressTypeIPv4 && isIPv6 { | ||
continue | ||
} | ||
|
||
for _, endpoint := range ES.Endpoints { | ||
|
||
if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready { | ||
// Skip endpoints that are not ready | ||
continue | ||
} | ||
|
||
for _, address := range endpoint.Addresses { | ||
klog.V(6).Infof("bpi.EnsureHostsInPool: adding ip address %s", address) | ||
podIPsToBeAdded = append(podIPsToBeAdded, address) | ||
endpointsliceName = strings.ToLower(fmt.Sprintf("%s/%s", ES.Namespace, ES.Name)) | ||
endpointSliceNames = append(endpointSliceNames,endpointsliceName) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this be added for every address? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A new property is used in the Address for the slice/ipgroup name if we use the old createOrUpdateBackendPool API. I think we don't need to populate this field if we the updateBackendIpGroup API. |
||
numOfAdd++ | ||
} | ||
} | ||
} | ||
|
||
changed = bpi.addPodIPAddressesToBackendPool(&backendPool, podIPsToBeAdded,endpointSliceNames) | ||
} | ||
|
||
if changed { | ||
klog.V(2).Infof("bpi.EnsureHostsInPool: updating backend pool %s of load balancer %s to add %d pods", lbBackendPoolName, lbName, numOfAdd) | ||
if err := bpi.CreateOrUpdateLBBackendPool(lbName, backendPool); err != nil { | ||
return fmt.Errorf("bpi.EnsureHostsInPool: failed to update backend pool %s: %w", lbBackendPoolName, err) | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (bpi *backendPoolTypePodIP) GetBackendPrivateIPs(clusterName string, service *v1.Service, lb *network.LoadBalancer) ([]string, []string) { | ||
serviceName := getServiceName(service) | ||
|
||
lbBackendPoolNames := bpi.getBackendPoolNamesForService(service, clusterName) | ||
|
||
if lb.LoadBalancerPropertiesFormat == nil || lb.LoadBalancerPropertiesFormat.BackendAddressPools == nil { | ||
return nil, nil | ||
} | ||
|
||
backendPrivateIPv4s, backendPrivateIPv6s := utilsets.NewString(), utilsets.NewString() | ||
for _, bp := range *lb.BackendAddressPools { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we reduce nesting a little in this func? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure |
||
found, _ := isLBBackendPoolsExisting(lbBackendPoolNames, bp.Name) | ||
if found { | ||
klog.V(10).Infof("bpi.GetBackendPrivateIPs for service (%s): found wanted backendpool %s", serviceName, ptr.Deref(bp.Name, "")) | ||
if bp.BackendAddressPoolPropertiesFormat != nil && bp.LoadBalancerBackendAddresses != nil { | ||
for _, backendAddress := range *bp.LoadBalancerBackendAddresses { | ||
ipAddress := backendAddress.IPAddress | ||
if ipAddress != nil { | ||
klog.V(2).Infof("bpi.GetBackendPrivateIPs for service (%s): lb backendpool - found private IP %q", serviceName, *ipAddress) | ||
if utilnet.IsIPv4String(*ipAddress) { | ||
backendPrivateIPv4s.Insert(*ipAddress) | ||
} else if utilnet.IsIPv6String(*ipAddress) { | ||
backendPrivateIPv6s.Insert(*ipAddress) | ||
} | ||
} else { | ||
klog.V(4).Infof("bpi.GetBackendPrivateIPs for service (%s): lb backendpool - found null private IP", serviceName) | ||
} | ||
} | ||
} | ||
} else { | ||
klog.V(10).Infof("bpi.GetBackendPrivateIPs for service (%s): found unmanaged backendpool %s", serviceName, ptr.Deref(bp.Name, "")) | ||
} | ||
} | ||
return backendPrivateIPv4s.UnsortedList(), backendPrivateIPv6s.UnsortedList() | ||
} | ||
|
||
func (bpi *backendPoolTypePodIP) ReconcileBackendPools(clusterName string, service *v1.Service, lb *network.LoadBalancer) (bool, bool, *network.LoadBalancer, error) { | ||
var newBackendPools []network.BackendAddressPool | ||
if lb.BackendAddressPools != nil { | ||
newBackendPools = *lb.BackendAddressPools | ||
} | ||
|
||
var backendPoolsUpdated bool | ||
foundBackendPools := map[bool]bool{} | ||
serviceName := getServiceName(service) | ||
|
||
lbBackendPoolNames := bpi.getBackendPoolNamesForService(service, clusterName) | ||
// bp is never preconfigured in case of pods | ||
isBackendPoolPreConfigured := false | ||
|
||
for i := len(newBackendPools) - 1; i >= 0; i-- { | ||
bp := newBackendPools[i] | ||
found, isIPv6 := isLBBackendPoolsExisting(lbBackendPoolNames, bp.Name) | ||
if found { | ||
klog.V(10).Infof("bpi.ReconcileBackendPools for service (%s): found wanted backendpool. Not adding anything", serviceName) | ||
foundBackendPools[isIPv6] = true | ||
} else { | ||
klog.V(10).Infof("bpi.ReconcileBackendPools for service (%s): found unmanaged backendpool %s", serviceName, *bp.Name) | ||
} | ||
} | ||
|
||
for _, ipFamily := range service.Spec.IPFamilies { | ||
if foundBackendPools[ipFamily == v1.IPv6Protocol] { | ||
continue | ||
} | ||
isBackendPoolPreConfigured = newBackendPool(lb, isBackendPoolPreConfigured, | ||
bpi.PreConfiguredBackendPoolLoadBalancerTypes, serviceName, | ||
lbBackendPoolNames[ipFamily == v1.IPv6Protocol]) | ||
backendPoolsUpdated = true | ||
} | ||
|
||
return isBackendPoolPreConfigured, backendPoolsUpdated, lb, nil | ||
} | ||
|
||
func (az *Cloud) addPodIPAddressesToBackendPool(backendPool *network.BackendAddressPool, podIPAddresses []string, endpointSliceNames []string) bool { | ||
//TBD:(Kartick) Do we need to populate vnet Id as POD IPs are from overlay. Check... | ||
vnetID := az.getVnetResourceID() | ||
if backendPool.BackendAddressPoolPropertiesFormat != nil { | ||
if backendPool.VirtualNetwork == nil || | ||
backendPool.VirtualNetwork.ID == nil { | ||
backendPool.VirtualNetwork = &network.SubResource{ | ||
ID: &vnetID, | ||
} | ||
} | ||
} else { | ||
backendPool.BackendAddressPoolPropertiesFormat = &network.BackendAddressPoolPropertiesFormat{ | ||
VirtualNetwork: &network.SubResource{ | ||
ID: &vnetID, | ||
}, | ||
} | ||
} | ||
|
||
if backendPool.LoadBalancerBackendAddresses == nil { | ||
lbBackendPoolAddresses := make([]network.LoadBalancerBackendAddress, 0) | ||
backendPool.LoadBalancerBackendAddresses = &lbBackendPoolAddresses | ||
} | ||
|
||
var changed bool | ||
addresses := *backendPool.LoadBalancerBackendAddresses | ||
for _, ipAddress := range podIPAddresses { | ||
if !hasIPAddressInBackendPool(backendPool, ipAddress) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we do a duplicate check earlier instead of scanning the pool with each add? |
||
klog.V(4).Infof("bi.addPodIPAddressesToBackendPool: adding %s to the backend pool %s", ipAddress, ptr.Deref(backendPool.Name, "")) | ||
//TBD:(Kartick) Populate the slice_name later... | ||
addresses = append(addresses, network.LoadBalancerBackendAddress{ | ||
Name: ptr.To(ipAddress), | ||
LoadBalancerBackendAddressPropertiesFormat: &network.LoadBalancerBackendAddressPropertiesFormat{ | ||
IPAddress: ptr.To(ipAddress), | ||
}, | ||
}) | ||
changed = true | ||
} | ||
} | ||
backendPool.LoadBalancerBackendAddresses = &addresses | ||
return changed | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -409,7 +409,7 @@ func getLocalServiceBackendPoolName(serviceName string, ipv6 bool) string { | |
// getBackendPoolNameForService determine the expected backend pool name | ||
// by checking the external traffic policy of the service. | ||
func (az *Cloud) getBackendPoolNameForService(service *v1.Service, clusterName string, ipv6 bool) string { | ||
if !isLocalService(service) || !az.useMultipleStandardLoadBalancers() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Didn't we say that we should ignore local/not local for service sku? Now the name will vary based on that setting |
||
if !isLocalService(service) || (!az.useMultipleStandardLoadBalancers() && !az.useServiceLoadBalancer()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we reverse this if? I think it will be much simpler to read. |
||
return getBackendPoolName(clusterName, ipv6) | ||
} | ||
return getLocalServiceBackendPoolName(getServiceName(service), ipv6) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can service name be longer than allowed backend pool name? Should we use uuid instead? |
||
|
@@ -623,3 +623,23 @@ func (az *Cloud) reconcileIPsInLocalServiceBackendPoolsAsync( | |
} | ||
} | ||
} | ||
|
||
func (az *Cloud) getEndpointSliceListForService(service *v1.Service) ([]*discovery_v1.EndpointSlice, error) { | ||
|
||
var ( | ||
esList []*discovery_v1.EndpointSlice | ||
) | ||
|
||
//Retrieving only from the cache to avoid expensive listing from k8 server as Informer | ||
//code path would listen to updates to k8 api-server and store in the cache. | ||
az.endpointSlicesCache.Range(func(key, value interface{}) bool { | ||
endpointSlice := value.(*discovery_v1.EndpointSlice) | ||
if strings.EqualFold(getServiceNameOfEndpointSlice(endpointSlice), service.Name) && | ||
strings.EqualFold(endpointSlice.Namespace, service.Namespace) { | ||
esList = append(esList, endpointSlice) | ||
} | ||
return true | ||
}) | ||
|
||
return esList, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What will happen if we hit this? Shouldn't we error out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think better to error out as we are overridding the customer intent.