diff --git a/pkg/cachemanager/cachemanager_test/cachemanager_integration_test.go b/pkg/cachemanager/cachemanager_test/cachemanager_integration_test.go index eaea41a757a..ebc993421a4 100644 --- a/pkg/cachemanager/cachemanager_test/cachemanager_integration_test.go +++ b/pkg/cachemanager/cachemanager_test/cachemanager_integration_test.go @@ -3,6 +3,7 @@ package cachemanager_test import ( "context" "fmt" + "sync" "testing" "time" @@ -10,7 +11,7 @@ import ( "github.com/open-policy-agent/gatekeeper/v3/pkg/cachemanager" "github.com/open-policy-agent/gatekeeper/v3/pkg/cachemanager/aggregator" "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process" - "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/sync" + syncc "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/sync" "github.com/open-policy-agent/gatekeeper/v3/pkg/fakes" "github.com/open-policy-agent/gatekeeper/v3/pkg/readiness" "github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil" @@ -101,6 +102,154 @@ func TestCacheManager_replay_retries(t *testing.T) { require.NoError(t, c.Delete(ctx, pod), "creating ConfigMap pod-1") } +// TestCacheManager_concurrent makes sure that we can add and remove multiple sources +// from separate go routines and changes to the underlying cache are reflected. +func TestCacheManager_concurrent(t *testing.T) { + mgr, wm := testutils.SetupManager(t, cfg) + c := testclient.NewRetryClient(mgr.GetClient()) + cacheManager, dataStore, agg, ctx := cacheManagerForTest(t, mgr, wm, c) + + configMapGVK := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"} + podGVK := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"} + + // Create configMaps to test for + cm := unstructuredFor(configMapGVK, "config-test-1") + require.NoError(t, c.Create(ctx, cm), "creating ConfigMap config-test-1") + + cm2 := unstructuredFor(configMapGVK, "config-test-2") + require.NoError(t, c.Create(ctx, cm2), "creating ConfigMap config-test-2") + + pod := unstructuredFor(podGVK, "pod-1") + require.NoError(t, c.Create(ctx, pod), "creating Pod pod-1") + + cfClient, ok := dataStore.(*cachemanager.FakeCfClient) + require.True(t, ok) + + syncSourceOne := aggregator.Key{Source: "source_a", ID: "ID_a"} + syncSourceTwo := aggregator.Key{Source: "source_b", ID: "ID_b"} + + wg := &sync.WaitGroup{} + + wg.Add(2) + go func() { + defer wg.Done() + require.NoError(t, cacheManager.UpsertSource(ctx, syncSourceOne, []schema.GroupVersionKind{configMapGVK})) + }() + go func() { + defer wg.Done() + require.NoError(t, cacheManager.UpsertSource(ctx, syncSourceTwo, []schema.GroupVersionKind{podGVK})) + }() + + wg.Wait() + + expected := map[cachemanager.CfDataKey]interface{}{ + {Gvk: configMapGVK, Key: "default/config-test-1"}: nil, + {Gvk: configMapGVK, Key: "default/config-test-2"}: nil, + {Gvk: podGVK, Key: "default/pod-1"}: nil, + } + + require.Eventually(t, expectedCheck(cfClient, expected), eventuallyTimeout, eventuallyTicker) + // now assert that the gvkAggregator looks as expected + agg.IsPresent(configMapGVK) + gvks := agg.List(syncSourceOne) + require.Len(t, gvks, 1) + _, foundConfigMap := gvks[configMapGVK] + require.True(t, foundConfigMap) + gvks = agg.List(syncSourceTwo) + require.Len(t, gvks, 1) + _, foundPod := gvks[podGVK] + require.True(t, foundPod) + + // now remove the podgvk for sync source two and make sure we don't have pods in the cache anymore + wg.Add(1) + go func() { + defer wg.Done() + require.NoError(t, cacheManager.UpsertSource(ctx, syncSourceTwo, []schema.GroupVersionKind{configMapGVK})) + }() + + wg.Wait() + + // expecte the config map instances to be repopulated eventually + expected = map[cachemanager.CfDataKey]interface{}{ + {Gvk: configMapGVK, Key: "default/config-test-1"}: nil, + {Gvk: configMapGVK, Key: "default/config-test-2"}: nil, + } + require.Eventually(t, expectedCheck(cfClient, expected), eventuallyTimeout, eventuallyTicker) + // now assert that the gvkAggregator looks as expected + agg.IsPresent(configMapGVK) + gvks = agg.List(syncSourceOne) + require.Len(t, gvks, 1) + _, foundConfigMap = gvks[configMapGVK] + require.True(t, foundConfigMap) + _, foundPod = gvks[podGVK] + require.False(t, foundPod) + + // now swap the gvks for each source and do so repeatedly to generate some churn + wg.Add(1) + go func() { + defer wg.Done() + + order := true + for i := 1; i <= 10; i++ { + if order { + require.NoError(t, cacheManager.UpsertSource(ctx, syncSourceOne, []schema.GroupVersionKind{configMapGVK})) + require.NoError(t, cacheManager.UpsertSource(ctx, syncSourceTwo, []schema.GroupVersionKind{podGVK})) + } else { + require.NoError(t, cacheManager.UpsertSource(ctx, syncSourceOne, []schema.GroupVersionKind{podGVK})) + require.NoError(t, cacheManager.UpsertSource(ctx, syncSourceTwo, []schema.GroupVersionKind{configMapGVK})) + } + + order = !order + } + + // final upsert for determinism + require.NoError(t, cacheManager.UpsertSource(ctx, syncSourceOne, []schema.GroupVersionKind{configMapGVK})) + require.NoError(t, cacheManager.UpsertSource(ctx, syncSourceTwo, []schema.GroupVersionKind{podGVK})) + }() + + wg.Wait() + + expected = map[cachemanager.CfDataKey]interface{}{ + {Gvk: configMapGVK, Key: "default/config-test-1"}: nil, + {Gvk: configMapGVK, Key: "default/config-test-2"}: nil, + {Gvk: podGVK, Key: "default/pod-1"}: nil, + } + + require.Eventually(t, expectedCheck(cfClient, expected), eventuallyTimeout, eventuallyTicker) + // now assert that the gvkAggregator looks as expected + agg.IsPresent(configMapGVK) + gvks = agg.List(syncSourceOne) + require.Len(t, gvks, 1) + _, foundConfigMap = gvks[configMapGVK] + require.True(t, foundConfigMap) + gvks = agg.List(syncSourceTwo) + require.Len(t, gvks, 1) + _, foundPod = gvks[podGVK] + require.True(t, foundPod) + + // now remove the sources + wg.Add(2) + go func() { + defer wg.Done() + require.NoError(t, cacheManager.RemoveSource(ctx, syncSourceOne)) + }() + go func() { + defer wg.Done() + require.NoError(t, cacheManager.RemoveSource(ctx, syncSourceTwo)) + }() + + wg.Wait() + + // and expect an empty cache and empty aggregator + require.Eventually(t, expectedCheck(cfClient, map[cachemanager.CfDataKey]interface{}{}), eventuallyTimeout, eventuallyTicker) + require.True(t, len(agg.GVKs()) == 0) + + // cleanup + require.NoError(t, c.Delete(ctx, cm), "deleting ConfigMap config-test-1") + require.NoError(t, c.Delete(ctx, cm2), "deleting ConfigMap config-test-2") + require.NoError(t, c.Delete(ctx, pod), "deleting Pod pod-1") +} + func expectedCheck(cfClient *cachemanager.FakeCfClient, expected map[cachemanager.CfDataKey]interface{}) func() bool { return func() bool { if cfClient.Len() != len(expected) { @@ -165,7 +314,7 @@ func cacheManagerForTest(t *testing.T, mgr manager.Manager, wm *watch.Manager, r cacheManager, err := cachemanager.NewCacheManager(cfg) require.NoError(t, err) - syncAdder := sync.Adder{ + syncAdder := syncc.Adder{ Events: events, CacheManager: cacheManager, }