From 2244cee0ed4b7512126d40a77535c96fe3d380c9 Mon Sep 17 00:00:00 2001 From: David Lee Date: Thu, 18 Jul 2024 17:47:37 -0700 Subject: [PATCH] fix: #3146 Support close open/fail for Ready Tracker & surface errors swallowed by grp.Wait() (#3308) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: David-Jaeyoon-Lee Co-authored-by: Sertaç Özercan <852750+sozercan@users.noreply.github.com> Co-authored-by: Jaydipkumar Arvindbhai Gabani --- pkg/readiness/list.go | 5 + pkg/readiness/ready_tracker.go | 530 ++++++----- pkg/readiness/ready_tracker_unit_test.go | 1019 +++++++++++++++++++++- pkg/syncutil/concurrent_slice.go | 39 + pkg/syncutil/single_runner.go | 39 +- pkg/syncutil/single_runner_test.go | 12 +- 6 files changed, 1409 insertions(+), 235 deletions(-) create mode 100644 pkg/syncutil/concurrent_slice.go diff --git a/pkg/readiness/list.go b/pkg/readiness/list.go index 514c8692eba..5b929c99d8a 100644 --- a/pkg/readiness/list.go +++ b/pkg/readiness/list.go @@ -82,6 +82,11 @@ func retryAll(_ error) bool { return true } +// retryNone is a retryPredicate that will never retry an error. +func retryNone(_ error) bool { + return false +} + // retryUnlessUnregistered is a retryPredicate that retries all errors except // *NoResourceMatchError, *NoKindMatchError, e.g. a resource was not registered to // the RESTMapper. diff --git a/pkg/readiness/ready_tracker.go b/pkg/readiness/ready_tracker.go index 94c86cadeeb..4c806b888f9 100644 --- a/pkg/readiness/ready_tracker.go +++ b/pkg/readiness/ready_tracker.go @@ -17,6 +17,7 @@ package readiness import ( "context" + "flag" "fmt" "net/http" "sync" @@ -35,7 +36,6 @@ import ( "github.com/open-policy-agent/gatekeeper/v3/pkg/operations" "github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil" "github.com/pkg/errors" - "golang.org/x/sync/errgroup" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" @@ -46,6 +46,8 @@ import ( var log = logf.Log.WithName("readiness-tracker") +var crashOnFailureFetchingExpectations = flag.Bool("crash-on-failure-fetching-expectations", false, "Unless set (defaults to false), gatekeeper will ignore errors that occur when gathering expectations. This prevents bootstrapping errors from crashing Gatekeeper at the cost of increasing the risk Gatekeeper will under-enforce policy by serving before it has loaded in all policies. Enabling this will help prevent under-enforcement at the risk of crashing during startup for issues like network errors. Note that enabling this flag currently does not achieve the aforementioned effect since fetching expectations are set to retry until success so failures during fetching expectations currently do not occur.") + const ( constraintGroup = "constraints.gatekeeper.sh" statsPeriod = 1 * time.Second @@ -75,20 +77,23 @@ type Tracker struct { constraints *trackerMap data *trackerMap - ready chan struct{} - constraintTrackers *syncutil.SingleRunner - statsEnabled syncutil.SyncBool - mutationEnabled bool - externalDataEnabled bool - expansionEnabled bool + initialized chan struct{} + constraintTrackers *syncutil.SingleRunner + dataTrackers *syncutil.SingleRunner + statsEnabled syncutil.SyncBool + mutationEnabled bool + externalDataEnabled bool + expansionEnabled bool + crashOnFailure bool + trackListerPredicateOverride retryPredicate } // NewTracker creates a new Tracker and initializes the internal trackers. func NewTracker(lister Lister, mutationEnabled, externalDataEnabled, expansionEnabled bool) *Tracker { - return newTracker(lister, mutationEnabled, externalDataEnabled, expansionEnabled, nil) + return newTracker(lister, mutationEnabled, externalDataEnabled, expansionEnabled, *crashOnFailureFetchingExpectations, nil, nil) } -func newTracker(lister Lister, mutationEnabled, externalDataEnabled, expansionEnabled bool, fn objDataFactory) *Tracker { +func newTracker(lister Lister, mutationEnabled, externalDataEnabled, expansionEnabled bool, crashOnFailure bool, trackListerPredicateOverride retryPredicate, fn objDataFactory) *Tracker { tracker := Tracker{ lister: lister, templates: newObjTracker(v1beta1.SchemeGroupVersion.WithKind("ConstraintTemplate"), fn), @@ -96,12 +101,15 @@ func newTracker(lister Lister, mutationEnabled, externalDataEnabled, expansionEn syncsets: newObjTracker(syncsetv1alpha1.GroupVersion.WithKind("SyncSet"), fn), constraints: newTrackerMap(fn), data: newTrackerMap(fn), - ready: make(chan struct{}), + initialized: make(chan struct{}), constraintTrackers: &syncutil.SingleRunner{}, + dataTrackers: &syncutil.SingleRunner{}, - mutationEnabled: mutationEnabled, - externalDataEnabled: externalDataEnabled, - expansionEnabled: expansionEnabled, + mutationEnabled: mutationEnabled, + externalDataEnabled: externalDataEnabled, + expansionEnabled: expansionEnabled, + crashOnFailure: crashOnFailure, + trackListerPredicateOverride: trackListerPredicateOverride, } if mutationEnabled { tracker.assignMetadata = newObjTracker(mutationv1.GroupVersion.WithKind("AssignMetadata"), fn) @@ -201,7 +209,7 @@ func (t *Tracker) DataGVKs() []schema.GroupVersionKind { func (t *Tracker) templateCleanup(ct *templates.ConstraintTemplate) { gvk := constraintGVK(ct) t.constraints.Remove(gvk) - <-t.ready // constraintTrackers are setup in Run() + <-t.initialized // constraintTrackers are setup in Run() t.constraintTrackers.Cancel(gvk.String()) } @@ -226,11 +234,16 @@ func (t *Tracker) TryCancelTemplate(ct *templates.ConstraintTemplate) { func (t *Tracker) CancelData(gvk schema.GroupVersionKind) { log.V(logging.DebugLevel).Info("cancel tracking for data", "gvk", gvk) t.data.Remove(gvk) + <-t.initialized + t.dataTrackers.Cancel(gvk.String()) } func (t *Tracker) TryCancelData(gvk schema.GroupVersionKind) { log.V(logging.DebugLevel).Info("try to cancel tracking for data", "gvk", gvk) - t.data.TryCancel(gvk) + if t.data.TryCancel(gvk) { + <-t.initialized + t.dataTrackers.Cancel(gvk.String()) + } } // Satisfied returns true if all tracked expectations have been satisfied. @@ -312,57 +325,76 @@ func (t *Tracker) Run(ctx context.Context) error { // Any failure in the errgroup will cancel goroutines in the group using gctx. // The odd one out is the statsPrinter which is meant to outlive the tracking // routines. - grp, gctx := errgroup.WithContext(ctx) - t.constraintTrackers = syncutil.RunnerWithContext(gctx) - close(t.ready) // The constraintTrackers SingleRunner is ready. + errChan := make(chan error) + wg := &sync.WaitGroup{} + t.constraintTrackers = syncutil.NewSingleRunner(errChan) + t.dataTrackers = syncutil.NewSingleRunner(errChan) + close(t.initialized) // The constraintTrackers and dataTrackers SingleRunners are ready. if t.mutationEnabled { - grp.Go(func() error { - return t.trackAssignMetadata(gctx) - }) - grp.Go(func() error { - return t.trackAssign(gctx) - }) - grp.Go(func() error { - return t.trackModifySet(gctx) - }) - grp.Go(func() error { - return t.trackAssignImage(gctx) - }) + wg.Add(1) + go func() { + defer wg.Done() + t.trackAssignMetadata(ctx, errChan) + }() + wg.Add(1) + go func() { + defer wg.Done() + t.trackAssign(ctx, errChan) + }() + wg.Add(1) + go func() { + defer wg.Done() + t.trackModifySet(ctx, errChan) + }() + wg.Add(1) + go func() { + defer wg.Done() + t.trackAssignImage(ctx, errChan) + }() } + if t.externalDataEnabled { - grp.Go(func() error { - return t.trackExternalDataProvider(gctx) - }) + wg.Add(1) + go func() { + defer wg.Done() + t.trackExternalDataProvider(ctx, errChan) + }() } + if t.expansionEnabled { - grp.Go(func() error { - return t.trackExpansionTemplates(gctx) - }) + wg.Add(1) + go func() { + defer wg.Done() + t.trackExpansionTemplates(ctx, errChan) + }() } + if operations.HasValidationOperations() { - grp.Go(func() error { - return t.trackConstraintTemplates(gctx) - }) + wg.Add(1) + go func() { + defer wg.Done() + t.trackConstraintTemplates(ctx, errChan) + }() } - grp.Go(func() error { - return t.trackConfigAndSyncSets(gctx) - }) - grp.Go(func() error { - t.statsPrinter(ctx) - return nil - }) + wg.Add(1) + go func() { + defer wg.Done() + t.trackConfigAndSyncSets(ctx, errChan) + }() + + go t.statsPrinter(ctx) // start deleted object polling. Periodically collects // objects that are expected by the Tracker, but are deleted - grp.Go(func() error { + go func() { // wait before proceeding, hoping // that the tracker will be satisfied by then timer := time.NewTimer(2000 * time.Millisecond) select { case <-ctx.Done(): - return nil + return case <-timer.C: } @@ -370,21 +402,37 @@ func (t *Tracker) Run(ctx context.Context) error { for { select { case <-ctx.Done(): - return nil + return case <-ticker.C: if t.Satisfied() { log.Info("readiness satisfied, no further collection") ticker.Stop() - return nil + return } t.collectInvalidExpectations(ctx) } } - }) + }() - _ = grp.Wait() - _ = t.constraintTrackers.Wait() // Must appear after grp.Wait() - allows trackConstraintTemplates() time to schedule its sub-tasks. - return nil + go func() { + wg.Wait() + t.constraintTrackers.Wait() + t.dataTrackers.Wait() + close(errChan) + }() + + for { + err, ok := <-errChan + if !ok { + return nil + } + + if t.crashOnFailure { + return err + } + + log.Error(err, "listing expectations") + } } func (t *Tracker) Populated() bool { @@ -533,22 +581,30 @@ func (t *Tracker) collectInvalidExpectations(ctx context.Context) { } } -func (t *Tracker) trackAssignMetadata(ctx context.Context) error { +func (t *Tracker) trackAssignMetadata(ctx context.Context, errChan chan<- error) { + hadError := false defer func() { - t.assignMetadata.ExpectationsDone() - log.V(logging.DebugLevel).Info("AssignMetadata expectations populated") - - _ = t.constraintTrackers.Wait() + // If we are ignoring errors when tracking expecations, we need to set expectations to done to prevent readiness tracker never being satisfied + if !t.crashOnFailure || !hadError { + t.assignMetadata.ExpectationsDone() + log.V(logging.DebugLevel).Info("AssignMetadata expectations populated") + } }() if !t.mutationEnabled { - return nil + return } assignMetadataList := &mutationv1.AssignMetadataList{} - lister := retryLister(t.lister, retryAll) + listerRetryPredicate := retryAll + if t.trackListerPredicateOverride != nil { + listerRetryPredicate = t.trackListerPredicateOverride + } + lister := retryLister(t.lister, listerRetryPredicate) if err := lister.List(ctx, assignMetadataList); err != nil { - return fmt.Errorf("listing AssignMetadata: %w", err) + hadError = true + errChan <- fmt.Errorf("listing AssignMetadata: %w", err) + return } log.V(logging.DebugLevel).Info("setting expectations for AssignMetadata", "AssignMetadata Count", len(assignMetadataList.Items)) @@ -556,24 +612,32 @@ func (t *Tracker) trackAssignMetadata(ctx context.Context) error { log.V(logging.DebugLevel).Info("expecting AssignMetadata", "name", assignMetadataList.Items[index].GetName()) t.assignMetadata.Expect(&assignMetadataList.Items[index]) } - return nil } -func (t *Tracker) trackAssign(ctx context.Context) error { +func (t *Tracker) trackAssign(ctx context.Context, errChan chan<- error) { + hadError := false defer func() { - t.assign.ExpectationsDone() - log.V(logging.DebugLevel).Info("Assign expectations populated") - _ = t.constraintTrackers.Wait() + // If we are ignoring errors when tracking expecations, we need to set expectations to done to prevent readiness tracker never being satisfied + if !t.crashOnFailure || !hadError { + t.assign.ExpectationsDone() + log.V(logging.DebugLevel).Info("Assign expectations populated") + } }() if !t.mutationEnabled { - return nil + return } assignList := &mutationv1.AssignList{} - lister := retryLister(t.lister, retryAll) + listerRetryPredicate := retryAll + if t.trackListerPredicateOverride != nil { + listerRetryPredicate = t.trackListerPredicateOverride + } + lister := retryLister(t.lister, listerRetryPredicate) if err := lister.List(ctx, assignList); err != nil { - return fmt.Errorf("listing Assign: %w", err) + hadError = true + errChan <- fmt.Errorf("listing Assign: %w", err) + return } log.V(logging.DebugLevel).Info("setting expectations for Assign", "Assign Count", len(assignList.Items)) @@ -581,24 +645,32 @@ func (t *Tracker) trackAssign(ctx context.Context) error { log.V(logging.DebugLevel).Info("expecting Assign", "name", assignList.Items[index].GetName()) t.assign.Expect(&assignList.Items[index]) } - return nil } -func (t *Tracker) trackModifySet(ctx context.Context) error { +func (t *Tracker) trackModifySet(ctx context.Context, errChan chan<- error) { + hadError := false defer func() { - t.modifySet.ExpectationsDone() - log.V(logging.DebugLevel).Info("ModifySet expectations populated") - _ = t.constraintTrackers.Wait() + // If we are ignoring errors when tracking expecations, we need to set expectations to done to prevent readiness tracker never being satisfied + if !t.crashOnFailure || !hadError { + t.modifySet.ExpectationsDone() + log.V(logging.DebugLevel).Info("ModifySet expectations populated") + } }() if !t.mutationEnabled { - return nil + return } modifySetList := &mutationv1.ModifySetList{} - lister := retryLister(t.lister, retryAll) + listerRetryPredicate := retryAll + if t.trackListerPredicateOverride != nil { + listerRetryPredicate = t.trackListerPredicateOverride + } + lister := retryLister(t.lister, listerRetryPredicate) if err := lister.List(ctx, modifySetList); err != nil { - return fmt.Errorf("listing ModifySet: %w", err) + hadError = true + errChan <- fmt.Errorf("listing ModifySet: %w", err) + return } log.V(logging.DebugLevel).Info("setting expectations for ModifySet", "ModifySet Count", len(modifySetList.Items)) @@ -606,24 +678,32 @@ func (t *Tracker) trackModifySet(ctx context.Context) error { log.V(logging.DebugLevel).Info("expecting ModifySet", "name", modifySetList.Items[index].GetName()) t.modifySet.Expect(&modifySetList.Items[index]) } - return nil } -func (t *Tracker) trackAssignImage(ctx context.Context) error { +func (t *Tracker) trackAssignImage(ctx context.Context, errChan chan<- error) { + hadError := false defer func() { - t.assignImage.ExpectationsDone() - log.V(logging.DebugLevel).Info("AssignImage expectations populated") - _ = t.constraintTrackers.Wait() + // If we are ignoring errors when tracking expecations, we need to set expectations to done to prevent readiness tracker never being satisfied + if !t.crashOnFailure || !hadError { + t.assignImage.ExpectationsDone() + log.V(logging.DebugLevel).Info("AssignImage expectations populated") + } }() if !t.mutationEnabled { - return nil + return } assignImageList := &mutationsv1alpha1.AssignImageList{} - lister := retryLister(t.lister, retryAll) + listerRetryPredicate := retryAll + if t.trackListerPredicateOverride != nil { + listerRetryPredicate = t.trackListerPredicateOverride + } + lister := retryLister(t.lister, listerRetryPredicate) if err := lister.List(ctx, assignImageList); err != nil { - return fmt.Errorf("listing AssignImage: %w", err) + hadError = true + errChan <- fmt.Errorf("listing AssignImage: %w", err) + return } log.V(logging.DebugLevel).Info("setting expectations for AssignImage", "AssignImage Count", len(assignImageList.Items)) @@ -631,24 +711,32 @@ func (t *Tracker) trackAssignImage(ctx context.Context) error { log.V(logging.DebugLevel).Info("expecting AssignImage", "name", assignImageList.Items[index].GetName()) t.assignImage.Expect(&assignImageList.Items[index]) } - return nil } -func (t *Tracker) trackExpansionTemplates(ctx context.Context) error { +func (t *Tracker) trackExpansionTemplates(ctx context.Context, errChan chan<- error) { + hadError := false defer func() { - t.expansions.ExpectationsDone() - log.V(logging.DebugLevel).Info("ExpansionTemplate expectations populated") - _ = t.constraintTrackers.Wait() + // If we are ignoring errors when tracking expecations, we need to set expectations to done to prevent readiness tracker never being satisfied + if !t.crashOnFailure || !hadError { + t.expansions.ExpectationsDone() + log.V(logging.DebugLevel).Info("ExpansionTemplate expectations populated") + } }() if !t.expansionEnabled { - return nil + return } expansionList := &expansionv1alpha1.ExpansionTemplateList{} - lister := retryLister(t.lister, retryAll) + listerRetryPredicate := retryAll + if t.trackListerPredicateOverride != nil { + listerRetryPredicate = t.trackListerPredicateOverride + } + lister := retryLister(t.lister, listerRetryPredicate) if err := lister.List(ctx, expansionList); err != nil { - return fmt.Errorf("listing ExpansionTemplate: %w", err) + hadError = true + errChan <- fmt.Errorf("listing ExpansionTemplates: %w", err) + return } log.V(logging.DebugLevel).Info("setting expectations for ExpansionTemplate", "ExpansionTemplate Count", len(expansionList.Items)) @@ -656,24 +744,32 @@ func (t *Tracker) trackExpansionTemplates(ctx context.Context) error { log.V(logging.DebugLevel).Info("expecting ExpansionTemplate", "name", expansionList.Items[index].GetName()) t.expansions.Expect(&expansionList.Items[index]) } - return nil } -func (t *Tracker) trackExternalDataProvider(ctx context.Context) error { +func (t *Tracker) trackExternalDataProvider(ctx context.Context, errChan chan<- error) { + hadError := false defer func() { - t.externalDataProvider.ExpectationsDone() - log.V(logging.DebugLevel).Info("Provider expectations populated") - _ = t.constraintTrackers.Wait() + // If we are ignoring errors when tracking expecations, we need to set expectations to done to prevent readiness tracker never being satisfied + if !t.crashOnFailure || !hadError { + t.externalDataProvider.ExpectationsDone() + log.V(logging.DebugLevel).Info("Provider expectations populated") + } }() if !t.externalDataEnabled { - return nil + return } providerList := &externaldatav1beta1.ProviderList{} - lister := retryLister(t.lister, retryAll) + listerRetryPredicate := retryAll + if t.trackListerPredicateOverride != nil { + listerRetryPredicate = t.trackListerPredicateOverride + } + lister := retryLister(t.lister, listerRetryPredicate) if err := lister.List(ctx, providerList); err != nil { - return fmt.Errorf("listing Provider: %w", err) + hadError = true + errChan <- fmt.Errorf("listing Provider: %w", err) + return } log.V(logging.DebugLevel).Info("setting expectations for Provider", "Provider Count", len(providerList.Items)) @@ -681,21 +777,28 @@ func (t *Tracker) trackExternalDataProvider(ctx context.Context) error { log.V(logging.DebugLevel).Info("expecting Provider", "name", providerList.Items[index].GetName()) t.externalDataProvider.Expect(&providerList.Items[index]) } - return nil } -func (t *Tracker) trackConstraintTemplates(ctx context.Context) error { +func (t *Tracker) trackConstraintTemplates(ctx context.Context, errChan chan<- error) { + hadError := false defer func() { - t.templates.ExpectationsDone() - log.V(logging.DebugLevel).Info("template expectations populated") - - _ = t.constraintTrackers.Wait() + // If we are ignoring errors when tracking expecations, we need to set expectations to done to prevent readiness tracker never being satisfied + if !t.crashOnFailure || !hadError { + t.templates.ExpectationsDone() + log.V(logging.DebugLevel).Info("template expectations populated") + } }() templates := &v1beta1.ConstraintTemplateList{} - lister := retryLister(t.lister, retryAll) + listerRetryPredicate := retryAll + if t.trackListerPredicateOverride != nil { + listerRetryPredicate = t.trackListerPredicateOverride + } + lister := retryLister(t.lister, listerRetryPredicate) if err := lister.List(ctx, templates); err != nil { - return fmt.Errorf("listing templates: %w", err) + hadError = true + errChan <- fmt.Errorf("listing templates: %w", err) + return } log.V(logging.DebugLevel).Info("setting expectations for templates", "templateCount", len(templates.Items)) @@ -721,75 +824,79 @@ func (t *Tracker) trackConstraintTemplates(ctx context.Context) error { handled[gvk] = true // Set an expectation for this constraint type ot := t.constraints.Get(gvk) - t.constraintTrackers.Go(ctx, gvk.String(), func(ctx context.Context) error { - err := t.trackConstraints(ctx, gvk, ot) - if err != nil { - log.Error(err, "aborted trackConstraints", "gvk", gvk) - } - return nil // do not return an error, this would abort other constraint trackers! - }) + t.constraintTrackers.Go(ctx, gvk.String(), t.makeConstraintTrackerFor(gvk, ot)) } - return nil } // trackConfigAndSyncSets sets expectations for cached data as specified by the singleton Config resource. // and any SyncSet resources present on the cluster. // Works best effort and fails-open if a resource cannot be fetched or does not exist. -func (t *Tracker) trackConfigAndSyncSets(ctx context.Context) error { +func (t *Tracker) trackConfigAndSyncSets(ctx context.Context, errChan chan<- error) { + hadErr := false defer func() { - t.config.ExpectationsDone() - log.V(logging.DebugLevel).Info("config expectations populated") + // If we are ignoring errors when tracking expecations, we need to set expectations to done to prevent readiness tracker never being satisfied + if !t.crashOnFailure || !hadErr { + t.config.ExpectationsDone() + log.V(logging.DebugLevel).Info("config expectations populated") - t.syncsets.ExpectationsDone() - log.V(logging.DebugLevel).Info("syncset expectations populated") + t.syncsets.ExpectationsDone() + log.V(logging.DebugLevel).Info("syncset expectations populated") + } }() dataGVKs := make(map[schema.GroupVersionKind]struct{}) cfg, err := t.getConfigResource(ctx) if err != nil { - return fmt.Errorf("fetching config resource: %w", err) - } - if cfg == nil { - log.Info("config resource not found - skipping for readiness") + hadErr = true + errChan <- err } else { - if !cfg.GetDeletionTimestamp().IsZero() { - log.Info("config resource is being deleted - skipping for readiness") + if cfg == nil { + log.Info("config resource not found - skipping for readiness") } else { - t.config.Expect(cfg) - log.V(logging.DebugLevel).Info("setting expectations for config", "configCount", 1) - - for _, entry := range cfg.Spec.Sync.SyncOnly { - dataGVKs[entry.ToGroupVersionKind()] = struct{}{} + if !cfg.GetDeletionTimestamp().IsZero() { + log.Info("config resource is being deleted - skipping for readiness") + } else { + t.config.Expect(cfg) + log.V(logging.DebugLevel).Info("setting expectations for config", "configCount", 1) + + for _, entry := range cfg.Spec.Sync.SyncOnly { + dataGVKs[entry.ToGroupVersionKind()] = struct{}{} + } } } } // Without validation operations, there is no reason to wait for referential data when deciding readiness. if !operations.HasValidationOperations() { - return nil + return } syncsets := &syncsetv1alpha1.SyncSetList{} - lister := retryLister(t.lister, retryAll) - if err := lister.List(ctx, syncsets); err != nil { - return fmt.Errorf("fetching syncset resources: %w", err) + listerRetryPredicate := retryAll + if t.trackListerPredicateOverride != nil { + listerRetryPredicate = t.trackListerPredicateOverride } + lister := retryLister(t.lister, listerRetryPredicate) + if err := lister.List(ctx, syncsets); err != nil { + hadErr = true + errChan <- fmt.Errorf("listing syncsets: %w", err) + } else { + log.V(logging.DebugLevel).Info("setting expectations for syncsets", "syncsetCount", len(syncsets.Items)) + for i := range syncsets.Items { + syncset := syncsets.Items[i] - log.V(logging.DebugLevel).Info("setting expectations for syncsets", "syncsetCount", len(syncsets.Items)) - for i := range syncsets.Items { - syncset := syncsets.Items[i] + t.syncsets.Expect(&syncset) + log.V(logging.DebugLevel).Info("expecting syncset", "name", syncset.GetName(), "namespace", syncset.GetNamespace()) - t.syncsets.Expect(&syncset) - log.V(logging.DebugLevel).Info("expecting syncset", "name", syncset.GetName(), "namespace", syncset.GetNamespace()) + for i := range syncset.Spec.GVKs { + gvk := syncset.Spec.GVKs[i].ToGroupVersionKind() + if _, ok := dataGVKs[gvk]; ok { + log.Info("duplicate GVK to sync", "gvk", gvk) + } - for i := range syncset.Spec.GVKs { - gvk := syncset.Spec.GVKs[i].ToGroupVersionKind() - if _, ok := dataGVKs[gvk]; ok { - log.Info("duplicate GVK to sync", "gvk", gvk) + dataGVKs[gvk] = struct{}{} } - - dataGVKs[gvk] = struct{}{} } } @@ -800,24 +907,21 @@ func (t *Tracker) trackConfigAndSyncSets(ctx context.Context) error { // Set expectations for individual cached resources dt := t.ForData(gvkCpy) - go func() { - err := t.trackData(ctx, gvkCpy, dt) - if err != nil { - log.Error(err, "aborted trackData", "gvk", gvkCpy) - } - }() + t.dataTrackers.Go(ctx, gvk.String(), t.makeDataTrackerFor(gvkCpy, dt)) } - - return nil } // getConfigResource returns the Config singleton if present. // Returns a nil reference if it is not found. func (t *Tracker) getConfigResource(ctx context.Context) (*configv1alpha1.Config, error) { lst := &configv1alpha1.ConfigList{} - lister := retryLister(t.lister, nil) + listerRetryPredicate := retryAll + if t.trackListerPredicateOverride != nil { + listerRetryPredicate = t.trackListerPredicateOverride + } + lister := retryLister(t.lister, listerRetryPredicate) if err := lister.List(ctx, lst); err != nil { - return nil, fmt.Errorf("listing config: %w", err) + return nil, fmt.Errorf("listing configs: %w", err) } for i := range lst.Items { @@ -833,60 +937,82 @@ func (t *Tracker) getConfigResource(ctx context.Context) (*configv1alpha1.Config return nil, nil } -// trackData sets expectations for all cached data expected by Gatekeeper. +// makeDataTrackerFor returns a function that sets expectations for all cached data expected by Gatekeeper. // If the provided gvk is registered, blocks until data can be listed or context is canceled. // Invalid GVKs (not registered to the RESTMapper) will fail-open. -func (t *Tracker) trackData(ctx context.Context, gvk schema.GroupVersionKind, dt Expectations) error { - defer func() { - dt.ExpectationsDone() - log.V(logging.DebugLevel).Info("data expectations populated", "gvk", gvk) - }() +func (t *Tracker) makeDataTrackerFor(gvk schema.GroupVersionKind, dt Expectations) func(context.Context, chan<- error) { + return func(ctx context.Context, errChan chan<- error) { + hadError := false + defer func() { + // If we are ignoring errors when tracking expecations, we need to set expectations to done to prevent readiness tracker never being satisfied + if !t.crashOnFailure || !hadError { + dt.ExpectationsDone() + log.V(logging.DebugLevel).Info("data expectations populated", "gvk", gvk) + } + }() - // List individual resources and expect observations of each in the sync controller. - u := &unstructured.UnstructuredList{} - u.SetGroupVersionKind(schema.GroupVersionKind{ - Group: gvk.Group, - Version: gvk.Version, - Kind: gvk.Kind + "List", - }) - // NoKindMatchError is non-recoverable, otherwise we'll retry. - lister := retryLister(t.lister, retryUnlessUnregistered) - err := lister.List(ctx, u) - if err != nil { - log.Error(err, "listing data", "gvk", gvk) - return err - } + // List individual resources and expect observations of each in the sync controller. + u := &unstructured.UnstructuredList{} + u.SetGroupVersionKind(schema.GroupVersionKind{ + Group: gvk.Group, + Version: gvk.Version, + Kind: gvk.Kind + "List", + }) + // NoKindMatchError is non-recoverable, otherwise we'll retry. + listerRetryPredicate := retryUnlessUnregistered + if t.trackListerPredicateOverride != nil { + listerRetryPredicate = t.trackListerPredicateOverride + } + lister := retryLister(t.lister, listerRetryPredicate) + err := lister.List(ctx, u) + if err != nil { + hadError = true + log.Error(err, "aborted trackData", "gvk", gvk) + errChan <- fmt.Errorf("listing data: %w", err) + return + } - for i := range u.Items { - item := &u.Items[i] - dt.Expect(item) - log.V(logging.DebugLevel).Info("expecting data", "gvk", item.GroupVersionKind(), "namespace", item.GetNamespace(), "name", item.GetName()) + for i := range u.Items { + item := &u.Items[i] + dt.Expect(item) + log.V(logging.DebugLevel).Info("expecting data", "gvk", item.GroupVersionKind(), "namespace", item.GetNamespace(), "name", item.GetName()) + } } - return nil } -// trackConstraints sets expectations for all constraints managed by a template. +// makeConstraintTrackerFor sets expectations for all constraints managed by a template. // Blocks until constraints can be listed or context is canceled. -func (t *Tracker) trackConstraints(ctx context.Context, gvk schema.GroupVersionKind, constraints Expectations) error { - defer func() { - constraints.ExpectationsDone() - log.V(logging.DebugLevel).Info("constraint expectations populated", "gvk", gvk) - }() +func (t *Tracker) makeConstraintTrackerFor(gvk schema.GroupVersionKind, constraints Expectations) func(context.Context, chan<- error) { + return func(ctx context.Context, errChan chan<- error) { + hadError := false + defer func() { + // If we are ignoring errors when tracking expecations, we need to set expectations to done to prevent readiness tracker never being satisfied + if !t.crashOnFailure || !hadError { + constraints.ExpectationsDone() + log.V(logging.DebugLevel).Info("constraint expectations populated", "gvk", gvk) + } + }() - u := unstructured.UnstructuredList{} - u.SetGroupVersionKind(gvk) - lister := retryLister(t.lister, retryAll) - if err := lister.List(ctx, &u); err != nil { - return err - } + u := unstructured.UnstructuredList{} + u.SetGroupVersionKind(gvk) + listerRetryPredicate := retryAll + if t.trackListerPredicateOverride != nil { + listerRetryPredicate = t.trackListerPredicateOverride + } + lister := retryLister(t.lister, listerRetryPredicate) + if err := lister.List(ctx, &u); err != nil { + hadError = true + log.Error(err, "aborted trackConstraints", "gvk", gvk) + errChan <- fmt.Errorf("listing constraints: %w", err) + return + } - for i := range u.Items { - o := u.Items[i] - constraints.Expect(&o) - log.V(logging.DebugLevel).Info("expecting Constraint", "gvk", gvk, "name", objectName(&o)) + for i := range u.Items { + o := u.Items[i] + constraints.Expect(&o) + log.V(logging.DebugLevel).Info("expecting Constraint", "gvk", gvk, "name", objectName(&o)) + } } - - return nil } // EnableStats enables the verbose logging routine for the readiness tracker. diff --git a/pkg/readiness/ready_tracker_unit_test.go b/pkg/readiness/ready_tracker_unit_test.go index 8cc084440ad..ba5d039b941 100644 --- a/pkg/readiness/ready_tracker_unit_test.go +++ b/pkg/readiness/ready_tracker_unit_test.go @@ -18,18 +18,30 @@ package readiness import ( "context" "fmt" + "strings" "sync" "testing" "time" + externaldatav1beta1 "github.com/open-policy-agent/frameworks/constraint/pkg/apis/externaldata/v1beta1" "github.com/open-policy-agent/frameworks/constraint/pkg/apis/templates/v1beta1" "github.com/open-policy-agent/frameworks/constraint/pkg/core/templates" + "github.com/open-policy-agent/gatekeeper/v3/apis" + configv1alpha1 "github.com/open-policy-agent/gatekeeper/v3/apis/config/v1alpha1" + expansionv1alpha1 "github.com/open-policy-agent/gatekeeper/v3/apis/expansion/v1alpha1" + mutationv1 "github.com/open-policy-agent/gatekeeper/v3/apis/mutations/v1" + mutationsv1alpha1 "github.com/open-policy-agent/gatekeeper/v3/apis/mutations/v1alpha1" syncsetv1alpha1 "github.com/open-policy-agent/gatekeeper/v3/apis/syncset/v1alpha1" "github.com/open-policy-agent/gatekeeper/v3/pkg/fakes" + "github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil" "github.com/stretchr/testify/require" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/client/interceptor" ) const ( @@ -64,6 +76,67 @@ var ( }, } + testConfig = configv1alpha1.Config{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-config", + }, + Spec: configv1alpha1.ConfigSpec{}, + } + + testAssignMetadata = mutationsv1alpha1.AssignMetadata{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-assign-metadata", + }, + Spec: mutationsv1alpha1.AssignMetadataSpec{ + Location: "", + }, + } + + testAssign = mutationsv1alpha1.Assign{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-assign", + }, + Spec: mutationsv1alpha1.AssignSpec{ + Location: "", + }, + } + + testModifySet = mutationsv1alpha1.ModifySet{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-modify-set", + }, + Spec: mutationsv1alpha1.ModifySetSpec{ + Location: "", + }, + } + + testAssignImage = mutationsv1alpha1.AssignImage{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-assign-image", + }, + Spec: mutationsv1alpha1.AssignImageSpec{ + Location: "", + }, + } + + testExternalDataProvider = externaldatav1beta1.Provider{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-external-data-provider", + }, + Spec: externaldatav1beta1.ProviderSpec{ + URL: "", + }, + } + + testExpansionTemplate = expansionv1alpha1.ExpansionTemplate{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-expansion-templates", + }, + Spec: expansionv1alpha1.ExpansionTemplateSpec{ + TemplateSource: "", + }, + } + podGVK = schema.GroupVersionKind{Version: "v1", Kind: "Pod"} ) @@ -75,10 +148,30 @@ func init() { } } +func getTestConstraint() *unstructured.Unstructured { + u := &unstructured.Unstructured{} + u.SetName("test-constraint") + gvk := schema.GroupVersionKind{ + Group: "constraints.gatekeeper.sh", + Version: "v1beta1", + Kind: "FooKind", + } + u.SetGroupVersionKind(gvk) + return u +} + +func mustInitializeScheme(scheme *runtime.Scheme) *runtime.Scheme { + if err := apis.AddToScheme(scheme); err != nil { + panic(err) + } + + return scheme +} + // Verify that TryCancelTemplate functions the same as regular CancelTemplate if readinessRetries is set to 0. func Test_ReadyTracker_TryCancelTemplate_No_Retries(t *testing.T) { - lister := fake.NewClientBuilder().WithRuntimeObjects(convertedTemplate.DeepCopyObject()).Build() - rt := newTracker(lister, false, false, false, func() objData { + lister := fake.NewClientBuilder().WithScheme(mustInitializeScheme(runtime.NewScheme())).WithRuntimeObjects(convertedTemplate.DeepCopyObject()).Build() + rt := newTracker(lister, false, false, false, false, nil, func() objData { return objData{retries: 0} }) @@ -117,8 +210,8 @@ func Test_ReadyTracker_TryCancelTemplate_No_Retries(t *testing.T) { // Verify that TryCancelTemplate must be called enough times to remove all retries before canceling a template. func Test_ReadyTracker_TryCancelTemplate_Retries(t *testing.T) { - lister := fake.NewClientBuilder().WithRuntimeObjects(convertedTemplate.DeepCopyObject()).Build() - rt := newTracker(lister, false, false, false, func() objData { + lister := fake.NewClientBuilder().WithScheme(mustInitializeScheme(runtime.NewScheme())).WithRuntimeObjects(convertedTemplate.DeepCopyObject()).Build() + rt := newTracker(lister, false, false, false, false, nil, func() objData { return objData{retries: 2} }) @@ -168,7 +261,7 @@ func Test_ReadyTracker_TryCancelTemplate_Retries(t *testing.T) { } func Test_Tracker_TryCancelData(t *testing.T) { - lister := fake.NewClientBuilder().WithRuntimeObjects( + lister := fake.NewClientBuilder().WithScheme(mustInitializeScheme(runtime.NewScheme())).WithRuntimeObjects( &testSyncSet, fakes.UnstructuredFor(podGVK, "", "pod1-name"), ).Build() tcs := []struct { @@ -184,7 +277,7 @@ func Test_Tracker_TryCancelData(t *testing.T) { objDataFn := func() objData { return objData{retries: tc.retries} } - rt := newTracker(lister, false, false, false, objDataFn) + rt := newTracker(lister, false, false, false, false, nil, objDataFn) ctx, cancel := context.WithCancel(context.Background()) var runErr error @@ -225,3 +318,917 @@ func Test_Tracker_TryCancelData(t *testing.T) { }) } } + +func Test_ReadyTracker_TrackAssignMetadata(t *testing.T) { + tcs := []struct { + name string + expectedErrMsgs []string + failClose bool + }{ + { + name: "TrackAssignMetadata fail close", + expectedErrMsgs: []string{"listing AssignMetadata"}, + failClose: true, + }, + { + name: "TrackAssignMetadata fail open", + expectedErrMsgs: []string{"listing AssignMetadata"}, + failClose: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + funcs := &interceptor.Funcs{} + funcs.List = func(ctx context.Context, client client.WithWatch, list client.ObjectList, opts ...client.ListOption) error { + if _, ok := list.(*mutationv1.AssignMetadataList); ok { + return fmt.Errorf("Force Test AssignMetadataList Failure") + } + return client.List(ctx, list, opts...) + } + + lister := fake.NewClientBuilder().WithScheme(mustInitializeScheme(runtime.NewScheme())).WithRuntimeObjects(&testAssignMetadata).WithInterceptorFuncs(*funcs).Build() + rt := newTracker(lister, true, false, false, tc.failClose, retryNone, func() objData { + return objData{retries: 0} + }) + errChan := make(chan error) + errGatheringDone := make(chan struct{}) + + errs := syncutil.NewConcurrentErrorSlice() + go func() { + defer close(errGatheringDone) + for { + err, ok := <-errChan + if !ok { + return + } + errs = errs.Append(err) + } + }() + + ctx := context.Background() + rt.trackAssignMetadata(ctx, errChan) + close(errChan) + <-errGatheringDone + + gotErrs := errs.GetSlice() + if len(gotErrs) != 1 { + t.Errorf("unexpected number of errors returned: %+v, expected: 1, got: %v ", gotErrs, len(gotErrs)) + } + + if !strings.Contains(gotErrs[0].Error(), tc.expectedErrMsgs[0]) { + t.Errorf("expected error to contain %v, but got error: %v ", tc.expectedErrMsgs[0], gotErrs[0]) + } + + expectPopulated := !tc.failClose + if rt.assignMetadata.Populated() != expectPopulated { + t.Fatalf("assignMetadata object tracker's populated field is marked as %v but should be %v", rt.assignMetadata.Populated(), expectPopulated) + } + }) + } +} + +func Test_ReadyTracker_TrackAssign(t *testing.T) { + tcs := []struct { + name string + expectedErrMsgs []string + failClose bool + }{ + { + name: "TrackAssign fail close", + expectedErrMsgs: []string{"listing Assign"}, + failClose: true, + }, + { + name: "TrackAssign fail open", + expectedErrMsgs: []string{"listing Assign"}, + failClose: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + funcs := &interceptor.Funcs{} + funcs.List = func(ctx context.Context, client client.WithWatch, list client.ObjectList, opts ...client.ListOption) error { + if _, ok := list.(*mutationv1.AssignList); ok { + return fmt.Errorf("Force Test AssignList Failure") + } + return client.List(ctx, list, opts...) + } + + lister := fake.NewClientBuilder().WithScheme(mustInitializeScheme(runtime.NewScheme())).WithRuntimeObjects(&testAssign).WithInterceptorFuncs(*funcs).Build() + rt := newTracker(lister, true, false, false, tc.failClose, retryNone, func() objData { + return objData{retries: 0} + }) + errChan := make(chan error) + errGatheringDone := make(chan struct{}) + + errs := syncutil.NewConcurrentErrorSlice() + go func() { + defer close(errGatheringDone) + for { + err, ok := <-errChan + if !ok { + return + } + errs = errs.Append(err) + } + }() + + ctx := context.Background() + rt.trackAssign(ctx, errChan) + close(errChan) + <-errGatheringDone + + gotErrs := errs.GetSlice() + if len(gotErrs) != 1 { + t.Errorf("unexpected number of errors returned: %+v, expected: 1, got: %v ", gotErrs, len(gotErrs)) + } + + if !strings.Contains(gotErrs[0].Error(), tc.expectedErrMsgs[0]) { + t.Errorf("expected error to contain %v, but got error: %v ", tc.expectedErrMsgs[0], gotErrs[0]) + } + + expectPopulated := !tc.failClose + if rt.assign.Populated() != expectPopulated { + t.Fatalf("assign object tracker's populated field is marked as %v but should be %v", rt.assign.Populated(), expectPopulated) + } + }) + } +} + +func Test_ReadyTracker_TrackModifySet(t *testing.T) { + tcs := []struct { + name string + expectedErrMsgs []string + failClose bool + }{ + { + name: "TrackModifySet fail close", + expectedErrMsgs: []string{"listing ModifySet"}, + failClose: true, + }, + { + name: "TrackModifySet fail open", + expectedErrMsgs: []string{"listing ModifySet"}, + failClose: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + funcs := &interceptor.Funcs{} + funcs.List = func(ctx context.Context, client client.WithWatch, list client.ObjectList, opts ...client.ListOption) error { + if _, ok := list.(*mutationv1.ModifySetList); ok { + return fmt.Errorf("Force Test ModifySetList Failure") + } + return client.List(ctx, list, opts...) + } + + lister := fake.NewClientBuilder().WithScheme(mustInitializeScheme(runtime.NewScheme())).WithRuntimeObjects(&testModifySet).WithInterceptorFuncs(*funcs).Build() + rt := newTracker(lister, true, false, false, tc.failClose, retryNone, func() objData { + return objData{retries: 0} + }) + errChan := make(chan error) + errGatheringDone := make(chan struct{}) + + errs := syncutil.NewConcurrentErrorSlice() + go func() { + defer close(errGatheringDone) + for { + err, ok := <-errChan + if !ok { + return + } + errs = errs.Append(err) + } + }() + + // Run test + ctx := context.Background() + rt.trackModifySet(ctx, errChan) + close(errChan) + <-errGatheringDone + + gotErrs := errs.GetSlice() + if len(gotErrs) != 1 { + t.Errorf("unexpected number of errors returned: %+v, expected: 1, got: %v ", gotErrs, len(gotErrs)) + } + + if !strings.Contains(gotErrs[0].Error(), tc.expectedErrMsgs[0]) { + t.Errorf("expected error to contain %v, but got error: %v ", tc.expectedErrMsgs[0], gotErrs[0]) + } + + expectPopulated := !tc.failClose + if rt.modifySet.Populated() != expectPopulated { + t.Fatalf("modifySet object tracker's populated field is marked as %v but should be %v", rt.modifySet.Populated(), expectPopulated) + } + }) + } +} + +func Test_ReadyTracker_TrackAssignImage(t *testing.T) { + tcs := []struct { + name string + expectedErrMsgs []string + failClose bool + }{ + { + name: "TrackAssignImage fail close", + expectedErrMsgs: []string{"listing AssignImage"}, + failClose: true, + }, + { + name: "TrackAssignImage fail open", + expectedErrMsgs: []string{"listing AssignImage"}, + failClose: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + funcs := &interceptor.Funcs{} + funcs.List = func(ctx context.Context, client client.WithWatch, list client.ObjectList, opts ...client.ListOption) error { + if _, ok := list.(*mutationsv1alpha1.AssignImageList); ok { + return fmt.Errorf("Force Test AssignImageList Failure") + } + return client.List(ctx, list, opts...) + } + + lister := fake.NewClientBuilder().WithScheme(mustInitializeScheme(runtime.NewScheme())).WithRuntimeObjects(&testAssignImage).WithInterceptorFuncs(*funcs).Build() + rt := newTracker(lister, true, false, false, tc.failClose, retryNone, func() objData { + return objData{retries: 0} + }) + errChan := make(chan error) + errGatheringDone := make(chan struct{}) + + errs := syncutil.NewConcurrentErrorSlice() + go func() { + defer close(errGatheringDone) + for { + err, ok := <-errChan + if !ok { + return + } + errs = errs.Append(err) + } + }() + + ctx := context.Background() + rt.trackAssignImage(ctx, errChan) + close(errChan) + <-errGatheringDone + + gotErrs := errs.GetSlice() + if len(gotErrs) != 1 { + t.Errorf("unexpected number of errors returned: %+v, expected: 1, got: %v ", gotErrs, len(gotErrs)) + } + + if !strings.Contains(gotErrs[0].Error(), tc.expectedErrMsgs[0]) { + t.Errorf("expected error to contain %v, but got error: %v ", tc.expectedErrMsgs[0], gotErrs[0]) + } + + expectPopulated := !tc.failClose + if rt.assignImage.Populated() != expectPopulated { + t.Fatalf("assignImage object tracker's populated field is marked as %v but should be %v", rt.assignImage.Populated(), expectPopulated) + } + }) + } +} + +func Test_ReadyTracker_TrackExternalDataProvider(t *testing.T) { + tcs := []struct { + name string + expectedErrMsgs []string + failClose bool + }{ + { + name: "TrackExternalDataProvider fail close", + expectedErrMsgs: []string{"listing Provider"}, + failClose: true, + }, + { + name: "TrackExternalDataProvider fail open", + expectedErrMsgs: []string{"listing Provider"}, + failClose: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + funcs := &interceptor.Funcs{} + funcs.List = func(ctx context.Context, client client.WithWatch, list client.ObjectList, opts ...client.ListOption) error { + if _, ok := list.(*externaldatav1beta1.ProviderList); ok { + return fmt.Errorf("Force Test ProviderList Failure") + } + return client.List(ctx, list, opts...) + } + + lister := fake.NewClientBuilder().WithScheme(mustInitializeScheme(runtime.NewScheme())).WithRuntimeObjects(&testExternalDataProvider).WithInterceptorFuncs(*funcs).Build() + rt := newTracker(lister, false, true, false, tc.failClose, retryNone, func() objData { + return objData{retries: 0} + }) + errChan := make(chan error) + errGatheringDone := make(chan struct{}) + + errs := syncutil.NewConcurrentErrorSlice() + go func() { + defer close(errGatheringDone) + for { + err, ok := <-errChan + if !ok { + return + } + errs = errs.Append(err) + } + }() + + ctx := context.Background() + rt.trackExternalDataProvider(ctx, errChan) + close(errChan) + <-errGatheringDone + + gotErrs := errs.GetSlice() + if len(gotErrs) != 1 { + t.Errorf("unexpected number of errors returned: %+v, expected: 1, got: %v ", gotErrs, len(gotErrs)) + } + + if !strings.Contains(gotErrs[0].Error(), tc.expectedErrMsgs[0]) { + t.Errorf("expected error to contain %v, but got error: %v ", tc.expectedErrMsgs[0], gotErrs[0]) + } + + expectPopulated := !tc.failClose + if rt.externalDataProvider.Populated() != expectPopulated { + t.Fatalf("externalDataProvider object tracker's populated field is marked as %v but should be %v", rt.externalDataProvider.Populated(), expectPopulated) + } + }) + } +} + +func Test_ReadyTracker_TrackExpansionTemplates(t *testing.T) { + tcs := []struct { + name string + expectedErrMsgs []string + failClose bool + }{ + { + name: "TrackExpansionTemplates fail close", + expectedErrMsgs: []string{"listing ExpansionTemplates"}, + failClose: true, + }, + { + name: "TrackExpansionTemplates fail open", + expectedErrMsgs: []string{"listing ExpansionTemplates"}, + failClose: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + funcs := &interceptor.Funcs{} + funcs.List = func(ctx context.Context, client client.WithWatch, list client.ObjectList, opts ...client.ListOption) error { + if _, ok := list.(*expansionv1alpha1.ExpansionTemplateList); ok { + return fmt.Errorf("Force Test ExpansionTemplateList Failure") + } + return client.List(ctx, list, opts...) + } + + lister := fake.NewClientBuilder().WithScheme(mustInitializeScheme(runtime.NewScheme())).WithRuntimeObjects(&testExpansionTemplate).WithInterceptorFuncs(*funcs).Build() + rt := newTracker(lister, false, false, true, tc.failClose, retryNone, func() objData { + return objData{retries: 0} + }) + errChan := make(chan error) + errGatheringDone := make(chan struct{}) + + errs := syncutil.NewConcurrentErrorSlice() + go func() { + defer close(errGatheringDone) + for { + err, ok := <-errChan + if !ok { + return + } + errs = errs.Append(err) + } + }() + + ctx := context.Background() + rt.trackExpansionTemplates(ctx, errChan) + close(errChan) + <-errGatheringDone + + gotErrs := errs.GetSlice() + if len(gotErrs) != 1 { + t.Errorf("unexpected number of errors returned: %+v, expected: 1, got: %v ", gotErrs, len(gotErrs)) + } + + if !strings.Contains(gotErrs[0].Error(), tc.expectedErrMsgs[0]) { + t.Errorf("expected error to contain %v, but got error: %v ", tc.expectedErrMsgs[0], gotErrs[0]) + } + + expectPopulated := !tc.failClose + if rt.expansions.Populated() != expectPopulated { + t.Fatalf("expansions object tracker's populated field is marked as %v but should be %v", rt.expansions.Populated(), expectPopulated) + } + }) + } +} + +func Test_ReadyTracker_TrackConstraintTemplates(t *testing.T) { + tcs := []struct { + name string + expectedErrMsgs []string + failClose bool + }{ + { + name: "TrackConstraintTemplates fail close", + expectedErrMsgs: []string{"listing templates"}, + failClose: true, + }, + { + name: "TrackConstraintTemplates fail open", + expectedErrMsgs: []string{"listing templates"}, + failClose: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + funcs := &interceptor.Funcs{} + funcs.List = func(ctx context.Context, client client.WithWatch, list client.ObjectList, opts ...client.ListOption) error { + if _, ok := list.(*v1beta1.ConstraintTemplateList); ok { + return fmt.Errorf("Force Test ConstraintTemplateList Failure") + } + return client.List(ctx, list, opts...) + } + + lister := fake.NewClientBuilder().WithScheme(mustInitializeScheme(runtime.NewScheme())).WithRuntimeObjects(convertedTemplate.DeepCopyObject()).WithInterceptorFuncs(*funcs).Build() + rt := newTracker(lister, false, false, false, tc.failClose, retryNone, func() objData { + return objData{retries: 0} + }) + errChan := make(chan error) + errGatheringDone := make(chan struct{}) + + errs := syncutil.NewConcurrentErrorSlice() + go func() { + defer close(errGatheringDone) + for { + err, ok := <-errChan + if !ok { + return + } + errs = errs.Append(err) + } + }() + + ctx := context.Background() + rt.constraintTrackers = syncutil.NewSingleRunner(errChan) + rt.trackConstraintTemplates(ctx, errChan) + close(errChan) + <-errGatheringDone + + gotErrs := errs.GetSlice() + if len(gotErrs) != 1 { + t.Errorf("unexpected number of errors returned: %+v, expected: 1, got: %v ", gotErrs, len(gotErrs)) + } + + if !strings.Contains(gotErrs[0].Error(), tc.expectedErrMsgs[0]) { + t.Errorf("expected error to contain %v, but got error: %v ", tc.expectedErrMsgs[0], gotErrs[0]) + } + + expectPopulated := !tc.failClose + if rt.templates.Populated() != expectPopulated { + t.Fatalf("templates object tracker's populated field is marked as %v but should be %v", rt.templates.Populated(), expectPopulated) + } + }) + } +} + +func Test_ReadyTracker_TrackConfigAndSyncSets(t *testing.T) { + tcs := []struct { + name string + configForceErr bool + syncsetForceErr bool + expectedErrMsgs []string + failClose bool + }{ + { + name: "TrackConfigAndSyncSets config err fail close", + configForceErr: true, + expectedErrMsgs: []string{"listing configs"}, + failClose: true, + }, + { + name: "TrackConfigAndSyncSets config err fail open", + configForceErr: true, + expectedErrMsgs: []string{"listing configs"}, + failClose: false, + }, + { + name: "TrackConfigAndSyncSets syncset err fail close", + syncsetForceErr: true, + expectedErrMsgs: []string{"listing syncsets"}, + failClose: true, + }, + { + name: "TrackConfigAndSyncSets syncset err fail open", + expectedErrMsgs: []string{"listing syncsets"}, + syncsetForceErr: true, + failClose: false, + }, + { + name: "TrackConfigAndSyncSets both err fail close", + expectedErrMsgs: []string{"listing configs", "listing syncsets"}, + configForceErr: true, + syncsetForceErr: true, + failClose: true, + }, + { + name: "TrackConfigAndSyncSets both err fail open", + expectedErrMsgs: []string{"listing configs", "listing syncsets"}, + configForceErr: true, + syncsetForceErr: true, + failClose: false, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + funcs := &interceptor.Funcs{} + funcs.List = func(ctx context.Context, client client.WithWatch, list client.ObjectList, opts ...client.ListOption) error { + if _, ok := list.(*configv1alpha1.ConfigList); ok && tc.configForceErr { + return fmt.Errorf("Force Test ConfigList Failure") + } + + if _, ok := list.(*syncsetv1alpha1.SyncSetList); ok && tc.syncsetForceErr { + return fmt.Errorf("Force Test SyncSetList Failure") + } + + return client.List(ctx, list, opts...) + } + lister := fake.NewClientBuilder().WithScheme(mustInitializeScheme(runtime.NewScheme())).WithRuntimeObjects(&testSyncSet, &testConfig).WithInterceptorFuncs(*funcs).Build() + rt := newTracker(lister, false, false, false, tc.failClose, retryNone, func() objData { + return objData{retries: 0} + }) + errChan := make(chan error) + errGatheringDone := make(chan struct{}) + + errs := syncutil.NewConcurrentErrorSlice() + go func() { + defer close(errGatheringDone) + for { + err, ok := <-errChan + if !ok { + return + } + errs = errs.Append(err) + } + }() + + ctx := context.Background() + rt.dataTrackers = syncutil.NewSingleRunner(errChan) + rt.trackConfigAndSyncSets(ctx, errChan) + close(errChan) + <-errGatheringDone + + gotErrs := errs.GetSlice() + if len(gotErrs) != len(tc.expectedErrMsgs) { + t.Errorf("unexpected number of errors returned: %+v, expected: %v, got: %v ", gotErrs, len(tc.expectedErrMsgs), len(gotErrs)) + } + + for _, expectedErrMsg := range tc.expectedErrMsgs { + match := false + for i, err := range gotErrs { + if strings.Contains(err.Error(), expectedErrMsg) { + match = true + gotErrs = append(gotErrs[:i], gotErrs[i+1:]...) + break + } + } + if !match { + t.Errorf("expected to get an error that contains: %v, but found none", expectedErrMsg) + } + } + + for _, err := range gotErrs { + t.Errorf("got unexpected error %v", err) + } + + if tc.failClose { + expectPopulated := !tc.configForceErr && !tc.syncsetForceErr + if rt.config.Populated() != expectPopulated || rt.syncsets.Populated() != expectPopulated { + t.Fatalf("config & syncset object trackers' populated fields are marked as config: %v & syncset: %v, but both should be %v", rt.config.Populated(), rt.syncsets.Populated(), expectPopulated) + } + } else if !rt.config.Populated() || !rt.syncsets.Populated() { + t.Fatalf("config & syncset object trackers' populated fields are marked as config: %v & syncset: %v, but both should be true", rt.config.Populated(), rt.syncsets.Populated()) + } + }) + } +} + +func Test_ReadyTracker_TrackConstraint(t *testing.T) { + tcs := []struct { + name string + expectedErrMsgs []string + failClose bool + }{ + { + name: "TrackConstraint fail close", + expectedErrMsgs: []string{"listing constraints"}, + failClose: true, + }, + { + name: "TrackConstraint fail open", + expectedErrMsgs: []string{"listing constraints"}, + failClose: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + funcs := &interceptor.Funcs{} + funcs.List = func(ctx context.Context, client client.WithWatch, list client.ObjectList, opts ...client.ListOption) error { + if v, ok := list.(*unstructured.UnstructuredList); ok && v.GroupVersionKind().Group == "constraints.gatekeeper.sh" { + return fmt.Errorf("Force Test constraint list Failure") + } + return client.List(ctx, list, opts...) + } + + lister := fake.NewClientBuilder().WithScheme(mustInitializeScheme(runtime.NewScheme())).WithRuntimeObjects(getTestConstraint()).WithInterceptorFuncs(*funcs).Build() + rt := newTracker(lister, false, false, false, tc.failClose, retryNone, func() objData { + return objData{retries: 0} + }) + errChan := make(chan error) + errGatheringDone := make(chan struct{}) + + errs := syncutil.NewConcurrentErrorSlice() + go func() { + defer close(errGatheringDone) + for { + err, ok := <-errChan + if !ok { + return + } + errs = errs.Append(err) + } + }() + + ctx := context.Background() + gvk := schema.GroupVersionKind{ + Group: constraintGroup, + Version: v1beta1.SchemeGroupVersion.Version, + Kind: "FooKind", + } + ot := rt.constraints.Get(gvk) + rt.makeConstraintTrackerFor(gvk, ot)(ctx, errChan) + close(errChan) + <-errGatheringDone + + gotErrs := errs.GetSlice() + if len(gotErrs) != 1 { + t.Errorf("unexpected number of errors returned: %+v, expected: 1, got: %v ", gotErrs, len(gotErrs)) + } + + if !strings.Contains(gotErrs[0].Error(), tc.expectedErrMsgs[0]) { + t.Errorf("expected error to contain %v, but got error: %v ", tc.expectedErrMsgs[0], gotErrs[0]) + } + + expectPopulated := !tc.failClose + if rt.constraints.Populated() != expectPopulated { + t.Fatalf("constraints object tracker's populated field is marked as %v but should be %v", rt.constraints.Populated(), expectPopulated) + } + }) + } +} + +func Test_ReadyTracker_TrackData(t *testing.T) { + tcs := []struct { + name string + expectedErrMsgs []string + failClose bool + }{ + { + name: "TrackData fail close", + expectedErrMsgs: []string{"listing data"}, + failClose: true, + }, + { + name: "TrackData fail open", + expectedErrMsgs: []string{"listing data"}, + failClose: false, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + funcs := &interceptor.Funcs{} + funcs.List = func(ctx context.Context, client client.WithWatch, list client.ObjectList, opts ...client.ListOption) error { + if v, ok := list.(*unstructured.UnstructuredList); ok && v.GroupVersionKind().Kind == "PodList" { + return fmt.Errorf("Force Test data list Failure") + } + + return client.List(ctx, list, opts...) + } + + lister := fake.NewClientBuilder().WithScheme(mustInitializeScheme(runtime.NewScheme())).WithRuntimeObjects(fakes.UnstructuredFor(podGVK, "", "pod1-name")).WithInterceptorFuncs(*funcs).Build() + rt := newTracker(lister, false, false, false, tc.failClose, retryNone, func() objData { + return objData{retries: 0} + }) + errChan := make(chan error) + errGatheringDone := make(chan struct{}) + + errs := syncutil.NewConcurrentErrorSlice() + go func() { + defer close(errGatheringDone) + for { + err, ok := <-errChan + if !ok { + return + } + errs = errs.Append(err) + } + }() + + ctx := context.Background() + gvk := testSyncSet.Spec.GVKs[0].ToGroupVersionKind() + dt := rt.data.Get(gvk) + + rt.makeDataTrackerFor(gvk, dt)(ctx, errChan) + close(errChan) + <-errGatheringDone + + gotErrs := errs.GetSlice() + if len(gotErrs) != 1 { + t.Errorf("unexpected number of errors returned: %+v, expected: 1, got: %v ", gotErrs, len(gotErrs)) + } + + if !strings.Contains(gotErrs[0].Error(), tc.expectedErrMsgs[0]) { + t.Errorf("expected error to contain %v, but got error: %v ", tc.expectedErrMsgs[0], gotErrs[0]) + } + + expectPopulated := !tc.failClose + if rt.data.Populated() != expectPopulated { + t.Fatalf("data object tracker's populated field is marked as %v but should be %v", rt.data.Populated(), expectPopulated) + } + }) + } +} + +func Test_ReadyTracker_Run_GRP_Wait(t *testing.T) { + tcs := []struct { + name string + errMessage string + failClose bool + }{ + { + name: "Ready Tracker Run GRP.Wait() fail close", + errMessage: "listing templates", + failClose: true, + }, + { + name: "Ready Tracker Run GRP.Wait() fail open", + failClose: false, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + var m sync.Mutex + funcs := &interceptor.Funcs{} + funcs.List = func(ctx context.Context, client client.WithWatch, list client.ObjectList, opts ...client.ListOption) error { + if _, ok := list.(*v1beta1.ConstraintTemplateList); ok { + return fmt.Errorf("Force Test ConstraintTemplateList Failure") + } + + // Adding a mutex lock here avoids the race condition within fake client's List method + m.Lock() + defer m.Unlock() + return client.List(ctx, list, opts...) + } + + lister := fake.NewClientBuilder().WithScheme(mustInitializeScheme(runtime.NewScheme())).WithRuntimeObjects(&testExpansionTemplate, convertedTemplate.DeepCopyObject(), getTestConstraint(), &testSyncSet, fakes.UnstructuredFor(podGVK, "", "pod1-name")).WithInterceptorFuncs(*funcs).Build() + rt := newTracker(lister, false, false, true, tc.failClose, retryNone, func() objData { + return objData{retries: 0} + }) + + // Run kicks off all the tracking + ctx, cancel := context.WithCancel(context.Background()) + err := rt.Run(ctx) + cancel() + expectError := tc.failClose + gotError := (err != nil) + if gotError != expectError || gotError && !strings.Contains(err.Error(), tc.errMessage) { + t.Fatalf("Run should have returned an error with %v, but got %v", tc.errMessage, err) + } + + expectPopulated := !tc.failClose + if rt.Populated() != expectPopulated { + t.Fatalf("ready tracker's populated field is marked as %v but should be %v", rt.templates.Populated(), expectPopulated) + } + }) + } +} + +func Test_ReadyTracker_Run_ConstraintTrackers_Wait(t *testing.T) { + tcs := []struct { + name string + errMessage string + failClose bool + }{ + { + name: "Ready Tracker Run GRP.Wait() fail close", + errMessage: "listing constraints", + failClose: true, + }, + { + name: "Ready Tracker Run GRP.Wait() fail open", + failClose: false, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + var m sync.Mutex + funcs := &interceptor.Funcs{} + funcs.List = func(ctx context.Context, client client.WithWatch, list client.ObjectList, opts ...client.ListOption) error { + if v, ok := list.(*unstructured.UnstructuredList); ok && v.GroupVersionKind().Group == "constraints.gatekeeper.sh" { + t.Log(v.GroupVersionKind()) + return fmt.Errorf("Force Test constraint list Failure") + } + m.Lock() + defer m.Unlock() + return client.List(ctx, list, opts...) + } + + lister := fake.NewClientBuilder().WithScheme(mustInitializeScheme(runtime.NewScheme())).WithRuntimeObjects(&testExpansionTemplate, convertedTemplate.DeepCopyObject(), getTestConstraint(), &testSyncSet, fakes.UnstructuredFor(podGVK, "", "pod1-name")).WithInterceptorFuncs(*funcs).Build() + rt := newTracker(lister, false, false, true, tc.failClose, retryNone, func() objData { + return objData{retries: 0} + }) + + // Run kicks off all the tracking + ctx, cancel := context.WithCancel(context.Background()) + err := rt.Run(ctx) + cancel() + expectError := tc.failClose + gotError := (err != nil) + t.Logf("WOWZA: %v", err) + if gotError != expectError || gotError && !strings.Contains(err.Error(), tc.errMessage) { + t.Fatalf("Run should have returned an error with %v, but got %v", tc.errMessage, err) + } + + expectPopulated := !tc.failClose + if rt.Populated() != expectPopulated { + t.Fatalf("ready tracker's populated field is marked as %v but should be %v", rt.templates.Populated(), expectPopulated) + } + }) + } +} + +func Test_ReadyTracker_Run_DataTrackers_Wait(t *testing.T) { + tcs := []struct { + name string + errMessage string + failClose bool + }{ + { + name: "Ready Tracker Run GRP.Wait() fail close", + errMessage: "listing data", + failClose: true, + }, + { + name: "Ready Tracker Run GRP.Wait() fail open", + failClose: false, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + var m sync.Mutex + funcs := &interceptor.Funcs{} + funcs.List = func(ctx context.Context, client client.WithWatch, list client.ObjectList, opts ...client.ListOption) error { + if v, ok := list.(*unstructured.UnstructuredList); ok && v.GroupVersionKind().Kind == "PodList" { + return fmt.Errorf("Force Test pod list Failure") + } + m.Lock() + defer m.Unlock() + return client.List(ctx, list, opts...) + } + + lister := fake.NewClientBuilder().WithScheme(mustInitializeScheme(runtime.NewScheme())).WithRuntimeObjects(&testExpansionTemplate, convertedTemplate.DeepCopyObject(), getTestConstraint(), &testSyncSet, fakes.UnstructuredFor(podGVK, "", "pod1-name")).WithInterceptorFuncs(*funcs).Build() + rt := newTracker(lister, false, false, true, tc.failClose, retryNone, func() objData { + return objData{retries: 0} + }) + + // Run kicks off all the tracking + ctx, cancel := context.WithCancel(context.Background()) + err := rt.Run(ctx) + cancel() + expectError := tc.failClose + gotError := (err != nil) + if gotError != expectError || gotError && !strings.Contains(err.Error(), tc.errMessage) { + t.Fatalf("Run should have returned an error with %v, but got %v", tc.errMessage, err) + } + + expectPopulated := !tc.failClose + if rt.Populated() != expectPopulated { + t.Fatalf("ready tracker's populated field is marked as %v but should be %v", rt.templates.Populated(), expectPopulated) + } + }) + } +} diff --git a/pkg/syncutil/concurrent_slice.go b/pkg/syncutil/concurrent_slice.go new file mode 100644 index 00000000000..6f186d42b81 --- /dev/null +++ b/pkg/syncutil/concurrent_slice.go @@ -0,0 +1,39 @@ +package syncutil + +import "sync" + +type ConcurrentErrorSlice struct { + s []error + mu *sync.RWMutex +} + +func NewConcurrentErrorSlice() ConcurrentErrorSlice { + return ConcurrentErrorSlice{ + s: make([]error, 0), + mu: &sync.RWMutex{}, + } +} + +func (c ConcurrentErrorSlice) Append(e error) ConcurrentErrorSlice { + c.mu.Lock() + defer c.mu.Unlock() + + return ConcurrentErrorSlice{ + s: append(c.s, e), + mu: c.mu, + } +} + +func (c ConcurrentErrorSlice) Last() error { + c.mu.RLock() + defer c.mu.RUnlock() + + return c.s[len(c.s)-1] +} + +func (c ConcurrentErrorSlice) GetSlice() []error { + c.mu.RLock() + defer c.mu.RUnlock() + var s []error + return append(s, c.s...) +} diff --git a/pkg/syncutil/single_runner.go b/pkg/syncutil/single_runner.go index c43a384cadc..a49d94b230c 100644 --- a/pkg/syncutil/single_runner.go +++ b/pkg/syncutil/single_runner.go @@ -18,40 +18,38 @@ package syncutil import ( "context" "sync" - - "golang.org/x/sync/errgroup" ) // SingleRunner wraps an errgroup to run keyed goroutines as singletons. // Keys are single-use and subsequent usage to schedule will be silently ignored. // Goroutines can be individually canceled provided they respect the context passed to them. type SingleRunner struct { - m map[string]context.CancelFunc - mu sync.RWMutex - grp *errgroup.Group + m map[string]context.CancelFunc + mu sync.RWMutex + wg *sync.WaitGroup + ec chan<- error } -// RunnerWithContext returns an initialized SingleRunner. -// The provided context is used as the parent of subsequently scheduled goroutines. -func RunnerWithContext(ctx context.Context) *SingleRunner { - grp, _ := errgroup.WithContext(ctx) +// NewSingleRunner returns an initialized SingleRunner. +func NewSingleRunner(errChan chan<- error) *SingleRunner { return &SingleRunner{ - grp: grp, - m: make(map[string]context.CancelFunc), + wg: &sync.WaitGroup{}, + m: make(map[string]context.CancelFunc), + ec: errChan, } } // Wait waits for all goroutines managed by the SingleRunner to complete. // Returns the first error returned from a managed goroutine, or nil. -func (s *SingleRunner) Wait() error { +func (s *SingleRunner) Wait() { s.mu.RLock() - grp := s.grp + grp := s.wg s.mu.RUnlock() if grp == nil { - return nil + return } - err := grp.Wait() + grp.Wait() s.mu.Lock() defer s.mu.Unlock() @@ -61,12 +59,11 @@ func (s *SingleRunner) Wait() error { } c() } - return err } // Go schedules the provided function on a new goroutine if the provided key has // not been used for scheduling before. -func (s *SingleRunner) Go(ctx context.Context, key string, f func(context.Context) error) { +func (s *SingleRunner) Go(ctx context.Context, key string, f func(context.Context, chan<- error)) { s.mu.Lock() defer s.mu.Unlock() @@ -81,9 +78,11 @@ func (s *SingleRunner) Go(ctx context.Context, key string, f func(context.Contex ctx, cancel := context.WithCancel(ctx) s.m[key] = cancel - s.grp.Go(func() error { - return f(ctx) - }) + s.wg.Add(1) + go func() { + defer s.wg.Done() + f(ctx, s.ec) + }() } // Cancel cancels a keyed goroutine if it exists. diff --git a/pkg/syncutil/single_runner_test.go b/pkg/syncutil/single_runner_test.go index c413d3a8d69..7a7fb48ac0e 100644 --- a/pkg/syncutil/single_runner_test.go +++ b/pkg/syncutil/single_runner_test.go @@ -28,29 +28,27 @@ func Test_SingleRunner(t *testing.T) { var wg sync.WaitGroup syncOne := make(chan struct{}) syncTwo := make(chan struct{}) + errChan := make(chan error) - r := RunnerWithContext(ctx) + r := NewSingleRunner(errChan) wg.Add(1) - r.Go(ctx, "one", func(ctx context.Context) error { + r.Go(ctx, "one", func(ctx context.Context, _ chan<- error) { defer wg.Done() defer close(syncOne) <-ctx.Done() - return nil }) // Repeat key won't be scheduled. - r.Go(ctx, "one", func(_ context.Context) error { + r.Go(ctx, "one", func(_ context.Context, _ chan<- error) { t.Fatal("repeat key will never be scheduled") - return nil }) wg.Add(1) - r.Go(ctx, "two", func(ctx context.Context) error { + r.Go(ctx, "two", func(ctx context.Context, _ chan<- error) { defer wg.Done() defer close(syncTwo) <-ctx.Done() - return nil }) select {