Skip to content

Commit

Permalink
add a concurrent test
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Pana <[email protected]>
  • Loading branch information
acpana committed Aug 3, 2023
1 parent f010d3b commit 86425fc
Showing 1 changed file with 151 additions and 2 deletions.
153 changes: 151 additions & 2 deletions pkg/cachemanager/cachemanager_test/cachemanager_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package cachemanager_test
import (
"context"
"fmt"
"sync"
"testing"
"time"

configv1alpha1 "github.com/open-policy-agent/gatekeeper/v3/apis/config/v1alpha1"
"github.com/open-policy-agent/gatekeeper/v3/pkg/cachemanager"
"github.com/open-policy-agent/gatekeeper/v3/pkg/cachemanager/aggregator"
"github.com/open-policy-agent/gatekeeper/v3/pkg/controller/config/process"
"github.com/open-policy-agent/gatekeeper/v3/pkg/controller/sync"
syncc "github.com/open-policy-agent/gatekeeper/v3/pkg/controller/sync"
"github.com/open-policy-agent/gatekeeper/v3/pkg/fakes"
"github.com/open-policy-agent/gatekeeper/v3/pkg/readiness"
"github.com/open-policy-agent/gatekeeper/v3/pkg/syncutil"
Expand Down Expand Up @@ -101,6 +102,154 @@ func TestCacheManager_replay_retries(t *testing.T) {
require.NoError(t, c.Delete(ctx, pod), "creating ConfigMap pod-1")
}

// TestCacheManager_concurrent makes sure that we can add and remove multiple sources
// from separate go routines and changes to the underlying cache are reflected.
func TestCacheManager_concurrent(t *testing.T) {
mgr, wm := testutils.SetupManager(t, cfg)
c := testclient.NewRetryClient(mgr.GetClient())
cacheManager, dataStore, agg, ctx := cacheManagerForTest(t, mgr, wm, c)

configMapGVK := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "ConfigMap"}
podGVK := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}

// Create configMaps to test for
cm := unstructuredFor(configMapGVK, "config-test-1")
require.NoError(t, c.Create(ctx, cm), "creating ConfigMap config-test-1")

cm2 := unstructuredFor(configMapGVK, "config-test-2")
require.NoError(t, c.Create(ctx, cm2), "creating ConfigMap config-test-2")

pod := unstructuredFor(podGVK, "pod-1")
require.NoError(t, c.Create(ctx, pod), "creating Pod pod-1")

cfClient, ok := dataStore.(*cachemanager.FakeCfClient)
require.True(t, ok)

syncSourceOne := aggregator.Key{Source: "source_a", ID: "ID_a"}
syncSourceTwo := aggregator.Key{Source: "source_b", ID: "ID_b"}

wg := &sync.WaitGroup{}

wg.Add(2)
go func() {
defer wg.Done()
require.NoError(t, cacheManager.UpsertSource(ctx, syncSourceOne, []schema.GroupVersionKind{configMapGVK}))
}()
go func() {
defer wg.Done()
require.NoError(t, cacheManager.UpsertSource(ctx, syncSourceTwo, []schema.GroupVersionKind{podGVK}))
}()

wg.Wait()

expected := map[cachemanager.CfDataKey]interface{}{
{Gvk: configMapGVK, Key: "default/config-test-1"}: nil,
{Gvk: configMapGVK, Key: "default/config-test-2"}: nil,
{Gvk: podGVK, Key: "default/pod-1"}: nil,
}

require.Eventually(t, expectedCheck(cfClient, expected), eventuallyTimeout, eventuallyTicker)
// now assert that the gvkAggregator looks as expected
agg.IsPresent(configMapGVK)
gvks := agg.List(syncSourceOne)
require.Len(t, gvks, 1)
_, foundConfigMap := gvks[configMapGVK]
require.True(t, foundConfigMap)
gvks = agg.List(syncSourceTwo)
require.Len(t, gvks, 1)
_, foundPod := gvks[podGVK]
require.True(t, foundPod)

// now remove the podgvk for sync source two and make sure we don't have pods in the cache anymore
wg.Add(1)
go func() {
defer wg.Done()
require.NoError(t, cacheManager.UpsertSource(ctx, syncSourceTwo, []schema.GroupVersionKind{configMapGVK}))
}()

wg.Wait()

// expecte the config map instances to be repopulated eventually
expected = map[cachemanager.CfDataKey]interface{}{
{Gvk: configMapGVK, Key: "default/config-test-1"}: nil,
{Gvk: configMapGVK, Key: "default/config-test-2"}: nil,
}
require.Eventually(t, expectedCheck(cfClient, expected), eventuallyTimeout, eventuallyTicker)
// now assert that the gvkAggregator looks as expected
agg.IsPresent(configMapGVK)
gvks = agg.List(syncSourceOne)
require.Len(t, gvks, 1)
_, foundConfigMap = gvks[configMapGVK]
require.True(t, foundConfigMap)
_, foundPod = gvks[podGVK]
require.False(t, foundPod)

// now swap the gvks for each source and do so repeatedly to generate some churn
wg.Add(1)
go func() {
defer wg.Done()

order := true
for i := 1; i <= 10; i++ {
if order {
require.NoError(t, cacheManager.UpsertSource(ctx, syncSourceOne, []schema.GroupVersionKind{configMapGVK}))
require.NoError(t, cacheManager.UpsertSource(ctx, syncSourceTwo, []schema.GroupVersionKind{podGVK}))
} else {
require.NoError(t, cacheManager.UpsertSource(ctx, syncSourceOne, []schema.GroupVersionKind{podGVK}))
require.NoError(t, cacheManager.UpsertSource(ctx, syncSourceTwo, []schema.GroupVersionKind{configMapGVK}))
}

order = !order
}

// final upsert for determinism
require.NoError(t, cacheManager.UpsertSource(ctx, syncSourceOne, []schema.GroupVersionKind{configMapGVK}))
require.NoError(t, cacheManager.UpsertSource(ctx, syncSourceTwo, []schema.GroupVersionKind{podGVK}))
}()

wg.Wait()

expected = map[cachemanager.CfDataKey]interface{}{
{Gvk: configMapGVK, Key: "default/config-test-1"}: nil,
{Gvk: configMapGVK, Key: "default/config-test-2"}: nil,
{Gvk: podGVK, Key: "default/pod-1"}: nil,
}

require.Eventually(t, expectedCheck(cfClient, expected), eventuallyTimeout, eventuallyTicker)
// now assert that the gvkAggregator looks as expected
agg.IsPresent(configMapGVK)
gvks = agg.List(syncSourceOne)
require.Len(t, gvks, 1)
_, foundConfigMap = gvks[configMapGVK]
require.True(t, foundConfigMap)
gvks = agg.List(syncSourceTwo)
require.Len(t, gvks, 1)
_, foundPod = gvks[podGVK]
require.True(t, foundPod)

// now remove the sources
wg.Add(2)
go func() {
defer wg.Done()
require.NoError(t, cacheManager.RemoveSource(ctx, syncSourceOne))
}()
go func() {
defer wg.Done()
require.NoError(t, cacheManager.RemoveSource(ctx, syncSourceTwo))
}()

wg.Wait()

// and expect an empty cache and empty aggregator
require.Eventually(t, expectedCheck(cfClient, map[cachemanager.CfDataKey]interface{}{}), eventuallyTimeout, eventuallyTicker)
require.True(t, len(agg.GVKs()) == 0)

// cleanup
require.NoError(t, c.Delete(ctx, cm), "deleting ConfigMap config-test-1")
require.NoError(t, c.Delete(ctx, cm2), "deleting ConfigMap config-test-2")
require.NoError(t, c.Delete(ctx, pod), "deleting Pod pod-1")
}

func expectedCheck(cfClient *cachemanager.FakeCfClient, expected map[cachemanager.CfDataKey]interface{}) func() bool {
return func() bool {
if cfClient.Len() != len(expected) {
Expand Down Expand Up @@ -165,7 +314,7 @@ func cacheManagerForTest(t *testing.T, mgr manager.Manager, wm *watch.Manager, r
cacheManager, err := cachemanager.NewCacheManager(cfg)
require.NoError(t, err)

syncAdder := sync.Adder{
syncAdder := syncc.Adder{
Events: events,
CacheManager: cacheManager,
}
Expand Down

0 comments on commit 86425fc

Please sign in to comment.