Skip to content

Commit

Permalink
Merge pull request #533 from vmware-tanzu/fallback-namespaces
Browse files Browse the repository at this point in the history
Add resource namespaces saved during deploy to fallbackAllowedNamespaces
  • Loading branch information
praveenrewar authored Sep 19, 2022
2 parents 094843d + 9288eb6 commit 308ce26
Show file tree
Hide file tree
Showing 11 changed files with 476 additions and 69 deletions.
8 changes: 7 additions & 1 deletion pkg/kapp/cmd/app/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,13 @@ func (o *DeleteOptions) existingResources(app ctlapp.App,
return nil, false, err
}

existingResources, err := supportObjs.IdentifiedResources.List(labelSelector, nil, ctlres.IdentifiedResourcesListOpts{})
meta, err := app.Meta()
if err != nil {
return nil, false, err
}

existingResources, err := supportObjs.IdentifiedResources.List(labelSelector, nil, ctlres.IdentifiedResourcesListOpts{
ResourceNamespaces: meta.LastChange.Namespaces})
if err != nil {
return nil, false, err
}
Expand Down
18 changes: 12 additions & 6 deletions pkg/kapp/cmd/app/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,13 @@ func (o *DeployOptions) Run() error {
return err
}

meta, err := app.Meta()
if err != nil {
return err
}

existingResources, existingPodRs, err := o.existingResources(
newResources, labeledResources, resourceFilter, supportObjs.Apps, usedGKs)
newResources, labeledResources, resourceFilter, supportObjs.Apps, usedGKs, append(meta.LastChange.Namespaces, nsNames...))
if err != nil {
return err
}
Expand Down Expand Up @@ -204,7 +209,7 @@ func (o *DeployOptions) Run() error {
if o.DeployFlags.Logs {
cancelLogsCh := make(chan struct{})
defer func() { close(cancelLogsCh) }()
go o.showLogs(supportObjs.CoreClient, supportObjs.IdentifiedResources, existingPodRs, labelSelector, cancelLogsCh)
go o.showLogs(supportObjs.CoreClient, supportObjs.IdentifiedResources, existingPodRs, labelSelector, cancelLogsCh, append(meta.LastChange.Namespaces, nsNames...))
}

defer func() {
Expand Down Expand Up @@ -347,7 +352,7 @@ func (o *DeployOptions) newResourcesFromFiles() ([]ctlres.Resource, error) {

func (o *DeployOptions) existingResources(newResources []ctlres.Resource,
labeledResources *ctlres.LabeledResources, resourceFilter ctlres.ResourceFilter,
apps ctlapp.Apps, usedGKs []schema.GroupKind) ([]ctlres.Resource, []ctlres.Resource, error) {
apps ctlapp.Apps, usedGKs []schema.GroupKind, resourceNamespaces []string) ([]ctlres.Resource, []ctlres.Resource, error) {

labelErrorResolutionFunc := func(key string, val string) string {
items, _ := apps.List(nil)
Expand All @@ -371,7 +376,8 @@ func (o *DeployOptions) existingResources(newResources []ctlres.Resource,

//Scope resource searching to UsedGKs
IdentifiedResourcesListOpts: ctlres.IdentifiedResourcesListOpts{
GKsScope: usedGKs,
GKsScope: usedGKs,
ResourceNamespaces: resourceNamespaces,
},
}

Expand Down Expand Up @@ -486,7 +492,7 @@ const (

func (o *DeployOptions) showLogs(
coreClient kubernetes.Interface, identifiedResources ctlres.IdentifiedResources,
existingPodRs []ctlres.Resource, labelSelector labels.Selector, cancelCh chan struct{}) {
existingPodRs []ctlres.Resource, labelSelector labels.Selector, cancelCh chan struct{}, resourceNamespaces []string) {

existingPodsByUID := map[string]struct{}{}

Expand Down Expand Up @@ -520,7 +526,7 @@ func (o *DeployOptions) showLogs(

podWatcher := ctlres.FilteringPodWatcher{
podMatcherFunc,
identifiedResources.PodResources(labelSelector),
identifiedResources.PodResources(labelSelector, resourceNamespaces),
}

contFilterFunc := func(pod corev1.Pod) []string {
Expand Down
8 changes: 7 additions & 1 deletion pkg/kapp/cmd/app/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,13 @@ func (o *InspectOptions) Run() error {
return err
}

resources, err := supportObjs.IdentifiedResources.List(labelSelector, nil, resources.IdentifiedResourcesListOpts{})
meta, err := app.Meta()
if err != nil {
return err
}

resources, err := supportObjs.IdentifiedResources.List(labelSelector, nil, resources.IdentifiedResourcesListOpts{
ResourceNamespaces: meta.LastChange.Namespaces})
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kapp/cmd/app/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (o *LogsOptions) Run() error {
}
return true
},
supportObjs.IdentifiedResources.PodResources(labelSelector),
supportObjs.IdentifiedResources.PodResources(labelSelector, nil),
}

contFilter := func(pod corev1.Pod) []string {
Expand Down
2 changes: 2 additions & 0 deletions pkg/kapp/resources/identified_resources_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
type IdentifiedResourcesListOpts struct {
IgnoreCachedResTypes bool
GKsScope []schema.GroupKind
ResourceNamespaces []string
}

func (r IdentifiedResources) List(labelSelector labels.Selector, resRefs []ResourceRef, opts IdentifiedResourcesListOpts) ([]Resource, error) {
Expand Down Expand Up @@ -49,6 +50,7 @@ func (r IdentifiedResources) List(labelSelector labels.Selector, resRefs []Resou
ListOpts: &metav1.ListOptions{
LabelSelector: labelSelector.String(),
},
ResourceNamespaces: opts.ResourceNamespaces,
}

resources, err := r.resources.All(resTypes, allOpts)
Expand Down
36 changes: 20 additions & 16 deletions pkg/kapp/resources/identified_resources_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@ package resources

import (
"fmt"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
)

func (r IdentifiedResources) PodResources(labelSelector labels.Selector) UniquePodWatcher {
return UniquePodWatcher{labelSelector, r.fallbackAllowedNamespaces, r.coreClient}
func (r IdentifiedResources) PodResources(labelSelector labels.Selector, resourceNamespaces []string) UniquePodWatcher {
return UniquePodWatcher{labelSelector, uniqAndValidNamespaces(append(r.fallbackAllowedNamespaces, resourceNamespaces...)), r.coreClient}
}

type PodWatcherI interface {
Expand All @@ -34,32 +36,34 @@ func (w UniquePodWatcher) Watch(podsToWatchCh chan corev1.Pod, cancelCh chan str
go func() {
// Watch Pods in all namespaces first and fallback to the
// fallbackAllowedNamespaces if lack of permission
namespace := ""
for {
namespaces := []string{""}
namespaces = append(namespaces, w.fallbackAllowedNamespaces...)
var forbiddenNamespaces []string

for _, namespace := range namespaces {
podWatcher := NewPodWatcher(
w.coreClient.CoreV1().Pods(namespace),
metav1.ListOptions{LabelSelector: w.labelSelector.String()},
)

err := podWatcher.Watch(nonUniquePodsToWatchCh, cancelCh)
if err == nil {
break
}
if errors.IsForbidden(err) && namespace == "" {
// The '-n' flag or default state namespace can specify only 1 namespace, so there
// should be at most 1 item in fallbackAllowedNamespaces
if len(w.fallbackAllowedNamespaces) > 0 {
namespace = w.fallbackAllowedNamespaces[0]
if namespace == "" {
break
}
if namespace == "" {
break
}
} else {
continue
}
if !errors.IsForbidden(err) {
fmt.Printf("Pod watching error: %s\n", err) // TODO
break
}
if namespace != "" {
forbiddenNamespaces = append(forbiddenNamespaces, fmt.Sprintf(`"%s"`, namespace))
}
}

if len(forbiddenNamespaces) > 0 {
fmt.Printf(`Pod watching error: pods is forbidden: User cannot list resource "pods" in API group "" in the namespace(s) %s`, strings.Join(forbiddenNamespaces, ", "))
}
close(nonUniquePodsToWatchCh)
}()

Expand Down
89 changes: 55 additions & 34 deletions pkg/kapp/resources/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,8 @@ func (c *ResourcesImpl) All(resTypes []ResourceType, opts AllOpts) ([]Resource,
opts.ListOpts = &metav1.ListOptions{}
}

nsScope := "" // all namespaces by default
nsScopeLimited := c.opts.ScopeToFallbackAllowedNamespaces && len(c.opts.FallbackAllowedNamespaces) == 1

// Eagerly use single fallback namespace to avoid making all-namespaces request
// just to see it fail, and fallback to making namespace-scoped request
if nsScopeLimited {
nsScope = c.opts.FallbackAllowedNamespaces[0]
c.logger.Info("Scoping listings to single namespace: %s", nsScope)
}
// Populate FallbackAllowedNamespace with resource namespaces stored during deploy
c.opts.FallbackAllowedNamespaces = uniqAndValidNamespaces(append(c.opts.FallbackAllowedNamespaces, opts.ResourceNamespaces...))

unstructItemsCh := make(chan unstructItems, len(resTypes))
fatalErrsCh := make(chan error, len(resTypes))
Expand All @@ -124,16 +117,23 @@ func (c *ResourcesImpl) All(resTypes []ResourceType, opts AllOpts) ([]Resource,

client := c.mutedDynamicClient.Resource(resType.GroupVersionResource)

err = util.Retry2(time.Second, 5*time.Second, c.isServerRescaleErr, func() error {
if resType.Namespaced() {
list, err = client.Namespace(nsScope).List(context.TODO(), *opts.ListOpts)
} else {
list, err = client.List(context.TODO(), *opts.ListOpts)
// If resource is cluster scoped or request is not scoped to fallback
// allowed namespaces manually, then scope list to all namespaces
if !c.opts.ScopeToFallbackAllowedNamespaces || !resType.Namespaced() {
err = util.Retry2(time.Second, 5*time.Second, c.isServerRescaleErr, func() error {
if resType.Namespaced() {
list, err = client.Namespace("").List(context.TODO(), *opts.ListOpts)
} else {
list, err = client.List(context.TODO(), *opts.ListOpts)
}
return err
})

if err == nil {
unstructItemsCh <- unstructItems{resType, list.Items}
return
}
return err
})

if err != nil {
if !errors.IsForbidden(err) {
// Ignore certain GVs due to failing API backing
if c.resourceTypes.CanIgnoreFailingGroupVersion(resType.GroupVersion()) {
Expand All @@ -143,27 +143,24 @@ func (c *ResourcesImpl) All(resTypes []ResourceType, opts AllOpts) ([]Resource,
}
return
}
// At this point err==Forbidden...

// In case ns scope is limited already, we will not gain anything
// by trying to run namespace scoped lists for allowed namespaced
// (ie since it's would be same request that just failed)
if !resType.Namespaced() || nsScopeLimited {
if !resType.Namespaced() {
c.logger.Debug("Skipping forbidden group version: %#v", resType.GroupVersionResource)
return
}
}

// TODO improve perf somehow
list, err = c.allForNamespaces(client, opts.ListOpts)
if err != nil {
// Ignore certain GVs due to failing API backing
if c.resourceTypes.CanIgnoreFailingGroupVersion(resType.GroupVersion()) {
c.logger.Info("Ignoring group version: %#v", resType.GroupVersionResource)
} else {
fatalErrsCh <- fmt.Errorf("Listing %#v, namespaced: %t: %w", resType.GroupVersionResource, resType.Namespaced(), err)
}
return
// At this point err==Forbidden...
// or requests are scoped to fallback allowed namespaces manually
list, err = c.allForNamespaces(client, opts.ListOpts)
if err != nil {
// Ignore certain GVs due to failing API backing
if c.resourceTypes.CanIgnoreFailingGroupVersion(resType.GroupVersion()) {
c.logger.Info("Ignoring group version: %#v", resType.GroupVersionResource)
} else {
fatalErrsCh <- fmt.Errorf("Listing %#v, namespaced: %t: %w", resType.GroupVersionResource, resType.Namespaced(), err)
}
return
}

unstructItemsCh <- unstructItems{resType, list.Items}
Expand Down Expand Up @@ -207,8 +204,13 @@ func (c *ResourcesImpl) allForNamespaces(client dynamic.NamespaceableResourceInt

go func() {
defer itemsDone.Done()
var resList *unstructured.UnstructuredList
var err error

resList, err := client.Namespace(ns).List(context.TODO(), *listOpts)
err = util.Retry2(time.Second, 5*time.Second, c.isServerRescaleErr, func() error {
resList, err = client.Namespace(ns).List(context.TODO(), *listOpts)
return err
})
if err != nil {
if !errors.IsForbidden(err) {
fatalErrsCh <- err
Expand Down Expand Up @@ -538,6 +540,10 @@ func (c *ResourcesImpl) assumedAllowedNamespaces() ([]string, error) {
return *c.assumedAllowedNamespacesMemo, nil
}

if c.opts.ScopeToFallbackAllowedNamespaces {
return c.opts.FallbackAllowedNamespaces, nil
}

nsList, err := c.coreClient.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{})
if err != nil {
if errors.IsForbidden(err) {
Expand Down Expand Up @@ -593,8 +599,23 @@ func (c *ResourcesImpl) isEtcdRetryableError(err error) bool {
return etcdserverRetryableErrCheck.MatchString(err.Error())
}

func uniqAndValidNamespaces(in []string) []string {
var out []string
if len(in) > 0 {
uniqNamespaces := map[string]struct{}{}
for _, ns := range in {
if _, exists := uniqNamespaces[ns]; !exists && ns != "(cluster)" {
out = append(out, ns)
uniqNamespaces[ns] = struct{}{}
}
}
}
return out
}

type AllOpts struct {
ListOpts *metav1.ListOptions
ListOpts *metav1.ListOptions
ResourceNamespaces []string
}

type resourceStatusErr struct {
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/cluster_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func NewPresentClusterResource(kind, name, ns string, kubectl Kubectl) ClusterRe
args = append(args, "--show-managed-fields")
}

out, _ := kubectl.RunWithOpts(args, RunOpts{})
out, _ := kubectl.RunWithOpts(args, RunOpts{NoNamespace: true})
return ClusterResource{ctlres.MustNewResourceFromBytes([]byte(out))}
}

Expand All @@ -60,7 +60,7 @@ func RemoveClusterResource(t *testing.T, kind, name, ns string, kubectl Kubectl)
}

func PatchClusterResource(kind, name, ns, patch string, kubectl Kubectl) {
kubectl.Run([]string{"patch", kind, name, "--type=json", "--patch", patch, "-n", ns})
kubectl.RunWithOpts([]string{"patch", kind, name, "--type=json", "--patch", patch, "-n", ns}, RunOpts{NoNamespace: true})
}

func ClusterResourceExists(kind, name string, kubectl Kubectl) (bool, error) {
Expand Down
Loading

0 comments on commit 308ce26

Please sign in to comment.