Skip to content

Commit

Permalink
Client (mesh egress) discovery for Jobs and Numaflow (vertices and mo…
Browse files Browse the repository at this point in the history
…novertices) (#339)

### Description
What does this change do and why?
This change introduces client discovery where there is no ingress but
only mesh egress required for types k8s Jobs, numaflow Vertices and
Monovertices.

Signed-off-by: Anil Attuluri <[email protected]>
  • Loading branch information
aattuluri authored Oct 9, 2024
1 parent 80bbc9c commit fd81dfe
Show file tree
Hide file tree
Showing 48 changed files with 3,357 additions and 746 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,8 @@ gen-yaml:
kustomize build ./install/sample/overlays/rollout-canary > ./out/yaml/sample-greeting-rollout-canary.yaml
kustomize build ./install/sample/overlays/rollout-bluegreen > ./out/yaml/sample-greeting-rollout-bluegreen.yaml
kustomize build ./install/sample/overlays/remote > ./out/yaml/remotecluster_sample.yaml
kustomize build ./install/sample/overlays/job > ./out/yaml/job_sample.yaml
kustomize build ./install/sample/overlays/numaflow > ./out/yaml/numaflow_sample.yaml
cp ./install/sample/proxy.yaml ./out/yaml/proxy.yaml
cp ./install/sample/sample_dep.yaml ./out/yaml/sample_dep.yaml
cp ./install/sample/depProxyExample.yaml ./out/yaml/depProxyExample.yaml
Expand Down
4 changes: 4 additions & 0 deletions admiral/cmd/admiral/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,10 @@ func GetRootCmd(args []string) *cobra.Command {
rootCmd.PersistentFlags().StringSliceVar(&params.IngressVSExportToNamespaces, "ingress_vs_export_to_namespaces", []string{"istio-system"}, "List of namespaces where the ingress VS should be exported")
rootCmd.PersistentFlags().StringVar(&params.IngressLBPolicy, "ingress_lb_policy", "round_robin", "loadbalancer policy for ingress destination rule (round_robin/random/passthrough/least_request)")

rootCmd.PersistentFlags().BoolVar(&params.EnableClientDiscovery, "enable_client_discovery", true, "Enable/Disable Client (mesh egress) Discovery")
rootCmd.PersistentFlags().StringArrayVar(&params.ClientDiscoveryClustersForJobs, "client_discovery_clusters_for_jobs", []string{}, "List of clusters for client discovery for k8s jobs")
rootCmd.PersistentFlags().StringArrayVar(&params.DiscoveryClustersForNumaflow, "client_discovery_clusters_for_numaflow", []string{}, "List of clusters for client discovery for numaflow types")

return rootCmd
}

Expand Down
4 changes: 4 additions & 0 deletions admiral/pkg/client/loader/client_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
argo "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned"
admiralapi "github.com/istio-ecosystem/admiral-api/pkg/client/clientset/versioned"
admiral "github.com/istio-ecosystem/admiral/admiral/pkg/client/clientset/versioned"
numaflow "github.com/numaproj/numaflow/pkg/client/clientset/versioned"
istio "istio.io/client-go/pkg/clientset/versioned"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand All @@ -26,4 +27,7 @@ type ClientLoader interface {

LoadKubeClientFromPath(path string) (kubernetes.Interface, error)
LoadKubeClientFromConfig(config *rest.Config) (kubernetes.Interface, error)

LoadNumaflowClientFromPath(path string) (numaflow.Interface, error)
LoadNumaflowClientFromConfig(config *rest.Config) (numaflow.Interface, error)
}
17 changes: 17 additions & 0 deletions admiral/pkg/client/loader/fake_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
admiralfake "github.com/istio-ecosystem/admiral/admiral/pkg/client/clientset/versioned/fake"
istio "istio.io/client-go/pkg/clientset/versioned"
istiofake "istio.io/client-go/pkg/clientset/versioned/fake"
numaflow "github.com/numaproj/numaflow/pkg/client/clientset/versioned"
numaflowfake "github.com/numaproj/numaflow/pkg/client/clientset/versioned/fake"
"k8s.io/client-go/kubernetes"
kubefake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
Expand All @@ -22,13 +24,15 @@ var FakeAdmiralApiClient admiralapi.Interface = admiralapifake.NewSimpleClientse
var FakeIstioClient istio.Interface = istiofake.NewSimpleClientset()
var FakeKubeClient kubernetes.Interface = kubefake.NewSimpleClientset()
var FakeArgoClient argo.Interface = argofake.NewSimpleClientset()
var FakeNumaflowClient numaflow.Interface = numaflowfake.NewSimpleClientset()

// fake clients for dependent clusters
var FakeAdmiralClientMap map[string]admiral.Interface = make(map[string]admiral.Interface)
var FakeAdmiralApiClientMap map[string]admiralapi.Interface = make(map[string]admiralapi.Interface)
var FakeIstioClientMap map[string]istio.Interface = make(map[string]istio.Interface)
var FakeKubeClientMap map[string]kubernetes.Interface = make(map[string]kubernetes.Interface)
var FakeArgoClientMap map[string]argo.Interface = make(map[string]argo.Interface)
var FakeNumaflowClientMap map[string]numaflow.Interface = make(map[string]numaflow.Interface)

type FakeClientLoader struct{}

Expand Down Expand Up @@ -91,6 +95,19 @@ func (loader *FakeClientLoader) LoadArgoClientFromConfig(config *rest.Config) (a
return argoClient, nil
}

func (loader *FakeClientLoader) LoadNumaflowClientFromPath(path string) (numaflow.Interface, error) {
return FakeNumaflowClient, nil
}

func (loader *FakeClientLoader) LoadNumaflowClientFromConfig(config *rest.Config) (numaflow.Interface, error) {
numaflowClient, ok := FakeNumaflowClientMap[config.Host]
if !ok {
numaflowClient = numaflowfake.NewSimpleClientset()
FakeNumaflowClientMap[config.Host] = numaflowClient
}
return numaflowClient, nil
}

func (loader *FakeClientLoader) LoadKubeClientFromPath(path string) (kubernetes.Interface, error) {
return FakeKubeClient, nil
}
Expand Down
14 changes: 14 additions & 0 deletions admiral/pkg/client/loader/kube_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
argo "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned"
admiralapi "github.com/istio-ecosystem/admiral-api/pkg/client/clientset/versioned"
admiral "github.com/istio-ecosystem/admiral/admiral/pkg/client/clientset/versioned"
numaflow "github.com/numaproj/numaflow/pkg/client/clientset/versioned"
log "github.com/sirupsen/logrus"
istio "istio.io/client-go/pkg/clientset/versioned"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -85,6 +86,19 @@ func (loader *KubeClientLoader) LoadKubeClientFromConfig(config *rest.Config) (k
return kubernetes.NewForConfig(config)
}

func (loader *KubeClientLoader) LoadNumaflowClientFromPath(kubeConfigPath string) (numaflow.Interface, error) {
config, err := getConfig(kubeConfigPath)
if err != nil || config == nil {
return nil, err
}

return loader.LoadNumaflowClientFromConfig(config)
}

func (loader *KubeClientLoader) LoadNumaflowClientFromConfig(config *rest.Config) (numaflow.Interface, error) {
return numaflow.NewForConfig(config)
}

func getConfig(kubeConfigPath string) (*rest.Config, error) {
log.Infof("getting kubeconfig from: %#v", kubeConfigPath)
// create the config from the path
Expand Down
86 changes: 86 additions & 0 deletions admiral/pkg/clusters/clientdiscovery_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package clusters

import (
"context"
"fmt"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/admiral"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/common"
commonUtil "github.com/istio-ecosystem/admiral/admiral/pkg/util"
log "github.com/sirupsen/logrus"
)

type ClientDiscoveryHandler struct {
RemoteRegistry *RemoteRegistry
ClusterID string
}

func (cdh *ClientDiscoveryHandler) Added(ctx context.Context, obj *common.K8sObject) error {
err := HandleEventForClientDiscovery(ctx, admiral.Add, obj, cdh.RemoteRegistry, cdh.ClusterID)
if err != nil {
return fmt.Errorf(LogErrFormat, common.Add, common.JobResourceType, obj.Name, cdh.ClusterID, err)
}
return err
}

func HandleEventForClientDiscovery(ctx context.Context, event admiral.EventType, obj *common.K8sObject,
remoteRegistry *RemoteRegistry, clusterName string) error {
log.Infof(LogFormat, event, obj.Type, obj.Name, clusterName, common.ReceivedStatus)
globalIdentifier := common.GetGlobalIdentifier(obj.Annotations, obj.Labels)
originalIdentifier := common.GetOriginalIdentifier(obj.Annotations, obj.Labels)
if len(globalIdentifier) == 0 {
log.Infof(LogFormat, event, obj.Type, obj.Name, clusterName, "Skipped as '"+common.GetWorkloadIdentifier()+" was not found', namespace="+obj.Namespace)
return nil
}
ctxLogger := common.GetCtxLogger(ctx, globalIdentifier, "")

ctx = context.WithValue(ctx, "clusterName", clusterName)
ctx = context.WithValue(ctx, "eventResourceType", obj.Type)

if remoteRegistry.AdmiralCache != nil {

UpdateIdentityClusterCache(remoteRegistry, globalIdentifier, clusterName)

if common.EnableSWAwareNSCaches() {
if remoteRegistry.AdmiralCache.IdentityClusterNamespaceCache != nil {
remoteRegistry.AdmiralCache.IdentityClusterNamespaceCache.Put(globalIdentifier, clusterName, obj.Namespace, obj.Namespace)
}
if remoteRegistry.AdmiralCache.PartitionIdentityCache != nil && len(common.GetIdentityPartition(obj.Annotations, obj.Labels)) > 0 {
remoteRegistry.AdmiralCache.PartitionIdentityCache.Put(globalIdentifier, originalIdentifier)
}
}
} else {
log.Warnf(LogFormatAdv, "Process", obj.Type, obj.Name, obj.Namespace, clusterName, "Skipping client discovery as Admiral cache is not initialized for identity="+globalIdentifier)
return fmt.Errorf(common.CtxLogFormat, event, obj.Name, obj.Namespace, clusterName, "processing skipped as Admiral cache is not initialized for identity="+globalIdentifier)
}

if commonUtil.IsAdmiralReadOnly() {
ctxLogger.Infof(common.CtxLogFormat, event, "", "", "", "processing skipped as Admiral is in Read-only mode")
return nil
}

if IsCacheWarmupTime(remoteRegistry) {
ctxLogger.Infof(common.CtxLogFormat, event, "", "", "", "processing skipped during cache warm up state")
return fmt.Errorf(common.CtxLogFormat, event, obj.Name, obj.Namespace, clusterName, "processing skipped during cache warm up state for env="+" identity="+globalIdentifier)
}

//if we have a deployment/rollout in this namespace skip processing to save some cycles
if DeploymentOrRolloutExistsInNamespace(remoteRegistry, globalIdentifier, clusterName, obj.Namespace) {
log.Infof(LogFormatAdv, "Process", obj.Type, obj.Name, obj.Namespace, clusterName, "Skipping client discovery as Deployment/Rollout already present in namespace for client="+globalIdentifier)
return nil
}

//write SEs required for this client
depRecord := remoteRegistry.DependencyController.Cache.Get(globalIdentifier)

if depRecord == nil {
log.Warnf(LogFormatAdv, "Process", obj.Type, obj.Name, obj.Namespace, clusterName, "Skipping client discovery as no dependency record found for client="+globalIdentifier)
return nil
}
err := remoteRegistry.DependencyController.DepHandler.Added(ctx, depRecord)

if err != nil {
return fmt.Errorf(LogFormatAdv, "Process", obj.Type, obj.Name, obj.Namespace, clusterName, "Error processing client discovery")
}

return nil
}
179 changes: 179 additions & 0 deletions admiral/pkg/clusters/clientdiscovery_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package clusters

import (
"context"
"github.com/istio-ecosystem/admiral/admiral/pkg/apis/admiral/model"
"github.com/istio-ecosystem/admiral/admiral/pkg/apis/admiral/v1alpha1"
"github.com/stretchr/testify/assert"
"testing"
"time"

argo "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/istio-ecosystem/admiral/admiral/pkg/client/loader"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/admiral"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/common"
"github.com/istio-ecosystem/admiral/admiral/pkg/test"
coreV1 "k8s.io/api/core/v1"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/rest"
)

func TestClientDiscoveryHandler_Added(t *testing.T) {
//Struct of test case info. Name is required.
const (
namespace = "namespace"
namespace_no_deps = "namespace-no-deps"
namespaceForRollout = "rollout-namespace"
cluster1 = "cluster1"
cluster2 = "cluster2"
cluster3 = "cluster3"
identity1 = "identity1"
identity2 = "identity2"
identity3 = "identity3"
identity4 = "identity4"
)
var (
stop = make(chan struct{})
config = rest.Config{
Host: "localhost",
}
matchLabel = map[string]string{
"app": "test",
}
labelSelector = metaV1.LabelSelector{
MatchLabels: matchLabel,
}
dep = v1alpha1.Dependency{
Spec: model.Dependency{
Source: identity1,
Destinations: []string{identity2},
},
}
rollout = argo.Rollout{
Spec: argo.RolloutSpec{
Selector: &labelSelector,
Strategy: argo.RolloutStrategy{
BlueGreen: &argo.BlueGreenStrategy{},
},
Template: coreV1.PodTemplateSpec{
ObjectMeta: metaV1.ObjectMeta{Annotations: map[string]string{
"sidecar.istio.io/inject": "true",
"identity": identity1,
}, Labels: map[string]string{
"env": "stage",
}},
},
},
ObjectMeta: metaV1.ObjectMeta{
Namespace: namespaceForRollout,
},
}
objRolloutNamespace = &common.K8sObject{
Namespace: namespaceForRollout,
Name: "obj-rollout-namespace",
Annotations: map[string]string{"identity": identity1},
}
objValid = &common.K8sObject{
Namespace: namespace,
Name: "obj",
Annotations: map[string]string{"identity": identity1},
}
objValidWithNoDeps = &common.K8sObject{
Namespace: namespace_no_deps,
Name: "obj-no-deps",
Annotations: map[string]string{"identity": identity4},
}
)

setupForHandlerTests(true)

depController, err := admiral.NewDependencyController(stop, &test.MockDependencyHandler{}, loader.FakeKubeconfigPath, "", time.Second*time.Duration(300), loader.GetFakeClientLoader())
if err != nil {
t.Fatalf("failed to initialize dependency controller, err: %v", err)
}
r, err := admiral.NewRolloutsController(stop, &test.MockRolloutHandler{}, &config, time.Second*time.Duration(300), loader.GetFakeClientLoader())
if err != nil {
t.Fatalf("failed to initialize rollout controller, err: %v", err)
}
d, err := admiral.NewDeploymentController(stop, &test.MockDeploymentHandler{}, &config, time.Second*time.Duration(300), loader.GetFakeClientLoader())
if err != nil {
t.Fatalf("failed to initialize rollout controller, err: %v", err)
}

rcCluster1 := &RemoteController{
RolloutController: r,
DeploymentController: d,
}

rcCluster2 := &RemoteController{
RolloutController: r,
DeploymentController: d,
}

ctx := context.Background()

remoteRegistry := NewRemoteRegistry(context.TODO(), common.AdmiralParams{})
remoteRegistry.DependencyController = depController
remoteRegistry.remoteControllers[cluster1] = rcCluster1
remoteRegistry.remoteControllers[cluster2] = rcCluster2

remoteRegistry.AdmiralCache = &AdmiralCache{
IdentityClusterCache: common.NewMapOfMaps(),
IdentityClusterNamespaceCache: common.NewMapOfMapOfMaps(),
PartitionIdentityCache: common.NewMap(),
}

err = r.Added(ctx, &rollout)
assert.NoError(t, err)
rolloutFromCache := r.Cache.Get(identity1, "stage")
assert.NotNil(t, rolloutFromCache)
err = depController.Added(ctx, &dep)
assert.NoError(t, err)
depFromCache := depController.Cache.Get(identity1)
assert.NotNil(t, depFromCache)

testCases := []struct {
name string
obj *common.K8sObject
rr *RemoteRegistry
clusterName string
expectedError error
}{
{
name: "No global identifier on obj results in no errors",
obj: &common.K8sObject{Annotations: map[string]string{}, Labels: map[string]string{}},
rr: remoteRegistry,
clusterName: cluster1,
expectedError: nil,
}, {
name: "Rollout with same identifier present in the namespace should " +
"return without processing and no errors",
obj: objRolloutNamespace,
rr: remoteRegistry,
clusterName: cluster1,
expectedError: nil,
}, {
name: "Valid client with no dependency record returns without processing",
obj: objValidWithNoDeps,
rr: remoteRegistry,
clusterName: cluster1,
expectedError: nil,
}, {
name: "Valid client with valid dependency record results in writing all dependencies endpoints to this cluster",
obj: objValid,
rr: remoteRegistry,
clusterName: cluster1,
expectedError: nil,
},
}

//Run the test for every provided case
for _, c := range testCases {
t.Run(c.name, func(t *testing.T) {
result := HandleEventForClientDiscovery(ctx, admiral.Add, c.obj, remoteRegistry, c.clusterName)
if c.expectedError == nil && result != nil {
t.Fatalf("Expected error %v got %v", c.expectedError, result)
}
})
}
}
Loading

0 comments on commit fd81dfe

Please sign in to comment.