diff --git a/.github/workflows/e2e.yaml b/.github/workflows/e2e.yaml index af92193c77..60c752f213 100644 --- a/.github/workflows/e2e.yaml +++ b/.github/workflows/e2e.yaml @@ -103,7 +103,7 @@ jobs: - id: set-paths-matrix run: | set -x - sudo apt-get install -y jq yq + sudo apt-get install -y jq paths=$(ls -d ./test/e2e*) echo "matrix=$(printf '%s\n' "${paths}" | jq -R . | jq -cs .)" >> "$GITHUB_OUTPUT" outputs: @@ -268,6 +268,9 @@ jobs: run: | sudo apt-get install -y sed + - name: Install yq@v4 + run: go install github.com/mikefarah/yq/v4@latest + - name: create vcluster with current cli run: | chmod +x ./vcluster-current diff --git a/.golangci.yml b/.golangci.yml index 9567e0ddd9..2952fa3cf4 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -7,6 +7,7 @@ linters: - asasalint - asciicheck - bidichk + - copyloopvar - decorder - dupl - durationcheck @@ -14,7 +15,6 @@ linters: - errname - errorlint - exhaustive - - exportloopref - ginkgolinter - gocheckcompilerdirectives - gofmt diff --git a/.goreleaser.yaml b/.goreleaser.yaml index 9c042fdd4b..d817270c8c 100644 --- a/.goreleaser.yaml +++ b/.goreleaser.yaml @@ -118,7 +118,7 @@ signs: artifacts: checksum snapshot: - name_template: "{{ incpatch .Version }}-next" + version_template: "{{ incpatch .Version }}-next" changelog: use: github diff --git a/Dockerfile.release b/Dockerfile.release index 12ffe7d038..be38a0d29f 100644 --- a/Dockerfile.release +++ b/Dockerfile.release @@ -1,8 +1,8 @@ ARG KINE_VERSION="v0.13.1" -FROM rancher/kine:${KINE_VERSION} as kine +FROM rancher/kine:${KINE_VERSION} AS kine # Build the manager binary -FROM alpine:3.20 as builder +FROM alpine:3.20 AS builder WORKDIR /vcluster-dev diff --git a/Justfile b/Justfile index 08679b5151..9fb274bb1c 100644 --- a/Justfile +++ b/Justfile @@ -161,3 +161,25 @@ gen-license-report: go-licenses save --save_path=./licenses --ignore github.com/loft-sh ./... cp -r ./licenses ./cmd/vclusterctl/cmd/credits + +build-dev-image tag="": + TELEMETRY_PRIVATE_KEY="" goreleaser build --snapshot --clean + + cp dist/vcluster_linux_$(go env GOARCH | sed s/amd64/amd64_v1/g)/vcluster ./vcluster + docker build -t vcluster:dev-{{tag}} -f Dockerfile.release --build-arg TARGETARCH=$(uname -m) --build-arg TARGETOS=linux . + rm ./vcluster + +run-conformance k8s_version="1.31.1" mode="conformance-lite" tag="conf": (build-dev-image tag) + minikube start --kubernetes-version {{k8s_version}} --nodes=2 + minikube addons enable metrics-server + minikube image load vcluster:dev-{{tag}} + + vcluster create vcluster -n vcluster -f vcluster.yaml + + sonobuoy run --mode={{mode}} --level=debug + +conformance-status: + sonobuoy status + +conformance-logs: + sonobuoy logs diff --git a/cmd/vcluster/cmd/debug/etcd/keys.go b/cmd/vcluster/cmd/debug/etcd/keys.go index ff986b392a..78a38cdc54 100644 --- a/cmd/vcluster/cmd/debug/etcd/keys.go +++ b/cmd/vcluster/cmd/debug/etcd/keys.go @@ -47,7 +47,7 @@ func ExecuteKeys(ctx context.Context, options *KeysOptions) error { } // create new etcd backend & list mappings - keyValues, err := etcdClient.List(ctx, options.Prefix, 0) + keyValues, err := etcdClient.List(ctx, options.Prefix) if err != nil { return err } diff --git a/hack/schema/main.go b/hack/schema/main.go index 5441827822..79c9db8e51 100644 --- a/hack/schema/main.go +++ b/hack/schema/main.go @@ -15,8 +15,10 @@ import ( "gopkg.in/yaml.v3" ) -const OutFile = "chart/values.schema.json" -const ValuesOutFile = "chart/values.yaml" +const ( + OutFile = "chart/values.schema.json" + ValuesOutFile = "chart/values.yaml" +) const ( defsPrefix = "#/$defs/" externalConfigName = "ExternalConfig" @@ -100,7 +102,6 @@ func addPlatformSchema(toSchema *jsonschema.Schema) error { } for pair := platformConfigSchema.Properties.Oldest(); pair != nil; pair = pair.Next() { - pair := pair platformNode.Properties.AddPairs(*pair) } diff --git a/pkg/cli/add_vcluster_helm.go b/pkg/cli/add_vcluster_helm.go index 78b3aafc60..4d032cf051 100644 --- a/pkg/cli/add_vcluster_helm.go +++ b/pkg/cli/add_vcluster_helm.go @@ -79,7 +79,6 @@ func AddVClusterHelm( var addErrors []error log.Debugf("trying to add %d vCluster instances to platform", len(vClusters)) for _, vCluster := range vClusters { - vCluster := vCluster log.Infof("adding %s vCluster to platform", vCluster.Name) err := addVClusterHelm(ctx, options, globalFlags, vCluster.Name, &vCluster, kubeClient, log) if err != nil { @@ -131,7 +130,6 @@ func addVClusterHelm( return !lifecycle.IsPaused(vCluster), nil }) - if err != nil { return fmt.Errorf("error waiting for vCluster to wake up %w", err) } diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index c72f9c0a5a..2e87e4aed7 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -8,28 +8,23 @@ import ( vconfig "github.com/loft-sh/vcluster/config" "github.com/loft-sh/vcluster/pkg/config" "github.com/loft-sh/vcluster/pkg/constants" + "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" ) type Value struct { - Key []byte - Data []byte - Revision int64 + Key []byte + Data []byte } -var ( - ErrNotFound = errors.New("etcdwrapper: key not found") -) +var ErrNotFound = errors.New("etcdwrapper: key not found") type Client interface { - List(ctx context.Context, key string, rev int) ([]Value, error) - Watch(ctx context.Context, key string, rev int) clientv3.WatchChan + List(ctx context.Context, key string) ([]Value, error) + Watch(ctx context.Context, key string) clientv3.WatchChan Get(ctx context.Context, key string) (Value, error) Put(ctx context.Context, key string, value []byte) error - Create(ctx context.Context, key string, value []byte) error - Update(ctx context.Context, key string, revision int64, value []byte) error - Delete(ctx context.Context, key string, revision int64) error - Compact(ctx context.Context, revision int64) (int64, error) + Delete(ctx context.Context, key string) error Close() error } @@ -102,12 +97,12 @@ func New(ctx context.Context, certificates *Certificates, endpoints ...string) ( }, nil } -func (c *client) Watch(ctx context.Context, key string, rev int) clientv3.WatchChan { - return c.c.Watch(ctx, key, clientv3.WithPrefix(), clientv3.WithRev(int64(rev))) +func (c *client) Watch(ctx context.Context, key string) clientv3.WatchChan { + return c.c.Watch(ctx, key, clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithProgressNotify()) } -func (c *client) List(ctx context.Context, key string, rev int) ([]Value, error) { - resp, err := c.c.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithRev(int64(rev))) +func (c *client) List(ctx context.Context, key string) ([]Value, error) { + resp, err := c.c.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithRev(0)) if err != nil { return nil, err } @@ -115,9 +110,8 @@ func (c *client) List(ctx context.Context, key string, rev int) ([]Value, error) var vals []Value for _, kv := range resp.Kvs { vals = append(vals, Value{ - Key: kv.Key, - Data: kv.Value, - Revision: kv.ModRevision, + Key: kv.Key, + Data: kv.Value, }) } @@ -125,83 +119,52 @@ func (c *client) List(ctx context.Context, key string, rev int) ([]Value, error) } func (c *client) Get(ctx context.Context, key string) (Value, error) { - resp, err := c.c.Get(ctx, key) + resp, err := c.c.Get(ctx, key, clientv3.WithRev(0)) if err != nil { - return Value{}, err + return Value{}, fmt.Errorf("etcd get: %w", err) + } + + if len(resp.Kvs) == 0 { + return Value{}, ErrNotFound } if len(resp.Kvs) == 1 { return Value{ - Key: resp.Kvs[0].Key, - Data: resp.Kvs[0].Value, - Revision: resp.Kvs[0].ModRevision, + Key: resp.Kvs[0].Key, + Data: resp.Kvs[0].Value, }, nil } - return Value{}, ErrNotFound -} - -func (c *client) Put(ctx context.Context, key string, value []byte) error { - val, err := c.Get(ctx, key) - if err != nil && !errors.Is(err, ErrNotFound) { - return err - } - if val.Revision == 0 { - return c.Create(ctx, key, value) + highestRevision := &mvccpb.KeyValue{ModRevision: -1} + for _, kv := range resp.Kvs { + if kv.ModRevision > highestRevision.ModRevision { + highestRevision = kv + } } - return c.Update(ctx, key, val.Revision, value) -} -func (c *client) Create(ctx context.Context, key string, value []byte) error { - resp, err := c.c.Txn(ctx). - If(clientv3.Compare(clientv3.ModRevision(key), "=", 0)). - Then(clientv3.OpPut(key, string(value))). - Commit() - if err != nil { - return err - } - if !resp.Succeeded { - return fmt.Errorf("key exists") - } - return nil + return Value{ + Key: highestRevision.Key, + Data: highestRevision.Value, + }, nil } -func (c *client) Update(ctx context.Context, key string, revision int64, value []byte) error { - resp, err := c.c.Txn(ctx). - If(clientv3.Compare(clientv3.ModRevision(key), "=", revision)). - Then(clientv3.OpPut(key, string(value))). - Else(clientv3.OpGet(key)). +func (c *client) Put(ctx context.Context, key string, value []byte) error { + _, err := c.c.Txn(ctx). + If(clientv3.Compare(clientv3.Version(key), ">", 0)). + Then(clientv3.OpPut(key, string(value), clientv3.WithIgnoreLease())). + Else(clientv3.OpPut(key, string(value))). Commit() - if err != nil { - return err - } - if !resp.Succeeded { - return fmt.Errorf("revision %d doesnt match", revision) - } - return nil + + return err } -func (c *client) Delete(ctx context.Context, key string, revision int64) error { - resp, err := c.c.Txn(ctx). - If(clientv3.Compare(clientv3.ModRevision(key), "=", revision)). +func (c *client) Delete(ctx context.Context, key string) error { + _, err := c.c.Txn(ctx). + If(clientv3.Compare(clientv3.Version(key), ">", 0)). Then(clientv3.OpDelete(key)). - Else(clientv3.OpGet(key)). Commit() - if err != nil { - return err - } - if !resp.Succeeded { - return fmt.Errorf("revision %d doesnt match", revision) - } - return nil -} -func (c *client) Compact(ctx context.Context, revision int64) (int64, error) { - resp, err := c.c.Compact(ctx, revision) - if resp != nil { - return resp.Header.GetRevision(), err - } - return 0, err + return err } func (c *client) Close() error { diff --git a/pkg/etcd/util.go b/pkg/etcd/util.go index 9440ad29ff..bdf03fa48f 100644 --- a/pkg/etcd/util.go +++ b/pkg/etcd/util.go @@ -72,6 +72,11 @@ func getClientConfig(ctx context.Context, certificates *Certificates, endpoints DialTimeout: 5 * time.Second, Logger: zap.L().Named("etcd-client"), + + // DialOptions: []grpc.DialOption{ + // grpc.WithDisableRetry(), + // }, + // MaxUnaryRetries: 1, } if len(endpoints) > 0 { diff --git a/pkg/mappings/generic/recorder.go b/pkg/mappings/generic/recorder.go index 7afa77900d..0f9952ce6b 100644 --- a/pkg/mappings/generic/recorder.go +++ b/pkg/mappings/generic/recorder.go @@ -211,7 +211,7 @@ func RecordMapping(ctx *synccontext.SyncContext, pName, vName types.NamespacedNa } // record the reference - err := ctx.Mappings.Store().AddReference(ctx, synccontext.NameMapping{ + err := ctx.Mappings.Store().AddReferenceAndSave(ctx, synccontext.NameMapping{ GroupVersionKind: gvk, HostName: pName, diff --git a/pkg/mappings/store/backend.go b/pkg/mappings/store/backend.go index ac35fea754..df1199de69 100644 --- a/pkg/mappings/store/backend.go +++ b/pkg/mappings/store/backend.go @@ -17,14 +17,13 @@ type Backend interface { } type BackendWatchResponse struct { + Err error Events []*BackendWatchEvent - - Err error } type BackendWatchEvent struct { - Type BackendWatchEventType Mapping *Mapping + Type BackendWatchEventType } type BackendWatchEventType string diff --git a/pkg/mappings/store/etcd_backend.go b/pkg/mappings/store/etcd_backend.go index c553a6775a..68206d994c 100644 --- a/pkg/mappings/store/etcd_backend.go +++ b/pkg/mappings/store/etcd_backend.go @@ -25,9 +25,9 @@ type etcdBackend struct { } func (m *etcdBackend) List(ctx context.Context) ([]*Mapping, error) { - mappings, err := m.etcdClient.List(ctx, mappingsPrefix, 0) + mappings, err := m.etcdClient.List(ctx, mappingsPrefix) if err != nil { - return nil, fmt.Errorf("list mappings") + return nil, fmt.Errorf("etcd backend: list mappings: %w", err) } retMappings := make([]*Mapping, 0, len(mappings)) @@ -35,7 +35,7 @@ func (m *etcdBackend) List(ctx context.Context) ([]*Mapping, error) { retMapping := &Mapping{} err = json.Unmarshal(kv.Data, retMapping) if err != nil { - return nil, fmt.Errorf("parse mapping %s: %w", string(kv.Key), err) + return nil, fmt.Errorf("etcd backend: parse mapping %s: %w", string(kv.Key), err) } retMappings = append(retMappings, retMapping) @@ -46,32 +46,52 @@ func (m *etcdBackend) List(ctx context.Context) ([]*Mapping, error) { func (m *etcdBackend) Watch(ctx context.Context) <-chan BackendWatchResponse { responseChan := make(chan BackendWatchResponse) - watchChan := m.etcdClient.Watch(ctx, mappingsPrefix, 0) + watchChan := m.etcdClient.Watch(ctx, mappingsPrefix) go func() { defer close(responseChan) for event := range watchChan { - if event.Canceled { + switch { + case event.Canceled: responseChan <- BackendWatchResponse{ Err: event.Err(), } - } else if len(event.Events) > 0 { + case event.IsProgressNotify(): + klog.FromContext(ctx).V(1).Info("received progress notify from etcd") + case len(event.Events) > 0: retEvents := make([]*BackendWatchEvent, 0, len(event.Events)) for _, singleEvent := range event.Events { var eventType BackendWatchEventType - if singleEvent.Type == mvccpb.PUT { + switch singleEvent.Type { + case mvccpb.PUT: eventType = BackendWatchEventTypeUpdate - } else if singleEvent.Type == mvccpb.DELETE { + case mvccpb.DELETE: eventType = BackendWatchEventTypeDelete - } else { + default: continue } // parse mapping retMapping := &Mapping{} - err := json.Unmarshal(singleEvent.Kv.Value, retMapping) + + value := singleEvent.Kv.Value + if len(value) == 0 && singleEvent.Type == mvccpb.DELETE && singleEvent.PrevKv != nil { + value = singleEvent.PrevKv.Value + } + + err := json.Unmarshal(value, retMapping) if err != nil { - klog.FromContext(ctx).Info("Error decoding event", "key", string(singleEvent.Kv.Key), "error", err.Error()) + klog.FromContext(ctx).Info( + "etcd backend: Error decoding event", + "key", string(singleEvent.Kv.Key), + "singleEventValue", string(singleEvent.Kv.Value), + "eventType", eventType, + "error", err.Error(), + ) + // FIXME(ThomasK33): This leads to mapping leaks. Etcd might have + // already compacted the previous version. Thus we would never + // receive any information of the mapping that was deleted apart from its keys. + // And because there is no mapping, we are omitting deleting it from the mapping stores. continue } @@ -101,7 +121,7 @@ func (m *etcdBackend) Save(ctx context.Context, mapping *Mapping) error { } func (m *etcdBackend) Delete(ctx context.Context, mapping *Mapping) error { - return m.etcdClient.Delete(ctx, mappingToKey(mapping), 0) + return m.etcdClient.Delete(ctx, mappingToKey(mapping)) } func mappingToKey(mapping *Mapping) string { diff --git a/pkg/mappings/store/store.go b/pkg/mappings/store/store.go index 2753059aed..a8086573f3 100644 --- a/pkg/mappings/store/store.go +++ b/pkg/mappings/store/store.go @@ -2,6 +2,7 @@ package store import ( "context" + "errors" "fmt" "sync" "time" @@ -10,7 +11,7 @@ import ( "github.com/loft-sh/vcluster/pkg/scheme" "github.com/loft-sh/vcluster/pkg/syncer/synccontext" kerrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -20,7 +21,10 @@ import ( "sigs.k8s.io/controller-runtime/pkg/source" ) -const GarbageCollectionInterval = time.Minute * 3 +const ( + GarbageCollectionInterval = 3 * time.Minute + GarbageCollectionTimeout = 15 * time.Second +) type VerifyMapping func(mapping synccontext.NameMapping) bool @@ -96,9 +100,7 @@ func (s *Store) Watch(gvk schema.GroupVersionKind, addQueueFn synccontext.AddQue func (s *Store) StartGarbageCollection(ctx context.Context) { go func() { - wait.Until(func() { - s.garbageCollectMappings(ctx) - }, GarbageCollectionInterval, ctx.Done()) + wait.UntilWithContext(ctx, s.garbageCollectMappings, GarbageCollectionInterval) }() } @@ -106,21 +108,58 @@ func (s *Store) garbageCollectMappings(ctx context.Context) { s.m.Lock() defer s.m.Unlock() + ctx, cancel := context.WithTimeoutCause(ctx, GarbageCollectionTimeout, errors.New("garbage collection timed out")) + defer cancel() + startTime := time.Now() - klog.FromContext(ctx).V(1).Info("Start mappings garbage collection") + klog.FromContext(ctx).V(1).Info( + "start mappings garbage collection", + "mappings", len(s.mappings), + "marker", "gc", + ) defer func() { - klog.FromContext(ctx).V(1).Info("Garbage collection done", "took", time.Since(startTime).String()) + klog.FromContext(ctx).V(1).Info( + "garbage collection done", + "took", time.Since(startTime).String(), + "marker", "gc", + ) }() for _, mapping := range s.mappings { - err := s.garbageCollectMapping(ctx, mapping) - if err != nil { - klog.FromContext(ctx).Error(err, "Garbage collect mapping", "mapping", mapping.String()) + select { + case <-ctx.Done(): + klog.FromContext(ctx).V(1).Info( + "exiting garbage collection early", + "err", ctx.Err(), + "marker", "gc", + ) + return + default: + klog.FromContext(ctx).V(1).Info( + "garbage collecting mapping", + "mapping", mapping.String(), + "marker", "gc", + ) + + err := s.garbageCollectMapping(ctx, mapping) + if err != nil { + klog.FromContext(ctx).Error( + err, + "garbage collect mapping", + "mapping", mapping.String(), + "marker", "gc", + ) + } } } } func (s *Store) garbageCollectMapping(ctx context.Context, mapping *Mapping) error { + klog.FromContext(ctx).V(1).Info( + "check object exists", + "name", mapping.NameMapping, + "marker", "gc", + ) // check if object exists exists, err := s.objectExists(ctx, mapping.NameMapping) if err != nil { @@ -129,13 +168,22 @@ func (s *Store) garbageCollectMapping(ctx context.Context, mapping *Mapping) err return nil } + klog.FromContext(ctx).V(1).Info( + "delete mapping", + "name", mapping.NameMapping, + "marker", "gc", + ) // delete the mapping err = s.deleteMapping(ctx, mapping) if err != nil { return err } - klog.FromContext(ctx).Info("Remove mapping as both virtual and host were not found", "mapping", mapping.String()) + klog.FromContext(ctx).Info( + "Remove mapping as both virtual and host were not found", + "mapping", mapping.String(), + "marker", "gc", + ) return nil } @@ -155,38 +203,70 @@ func (s *Store) deleteMapping(ctx context.Context, mapping *Mapping) error { func (s *Store) objectExists(ctx context.Context, nameMapping synccontext.NameMapping) (bool, error) { // build the object we can query - obj, err := scheme.Scheme.New(nameMapping.GroupVersionKind) + _, err := scheme.Scheme.New(nameMapping.GroupVersionKind) if err != nil { if !runtime.IsNotRegisteredError(err) { return false, fmt.Errorf("create object: %w", err) } - - obj = &unstructured.Unstructured{} } - // set kind & apiVersion if unstructured - uObject, ok := obj.(*unstructured.Unstructured) - if ok { - uObject.SetKind(nameMapping.GroupVersionKind.Kind) - uObject.SetAPIVersion(nameMapping.GroupVersionKind.GroupVersion().String()) - } + mObject := &metav1.PartialObjectMetadata{} + mObject.SetGroupVersionKind(nameMapping.GroupVersionKind) + + klog.FromContext(ctx).V(1).Info( + "virtual get", + "name", nameMapping.VirtualName, + "marker", "gc", + ) // check if virtual object exists - err = s.cachedVirtualClient.Get(ctx, nameMapping.VirtualName, obj.DeepCopyObject().(client.Object)) + err = s.cachedVirtualClient.Get(ctx, nameMapping.VirtualName, mObject) if err == nil { return true, nil } else if !kerrors.IsNotFound(err) { // TODO: filter out other allowed errors here could be Forbidden, Type not found etc. - klog.FromContext(ctx).Info("Error retrieving virtual object", "virtualObject", nameMapping.Virtual().String()) + klog.FromContext(ctx).Info( + "Error retrieving virtual object", + "virtualObject", nameMapping.Virtual().String(), + "err", err, + "marker", "gc", + ) + + // (ThomasK33): If the error is a not found, we're going + // to assume that the object is still used. + // + // In case of a transient error (server timeout or others) + // the GC should be able to figure out that it doesn't exist + // anymore on the next GC run. + return true, nil } + klog.FromContext(ctx).V(1).Info( + "host get", + "name", nameMapping.HostName, + "marker", "gc", + ) + // check if host object exists - err = s.cachedHostClient.Get(ctx, nameMapping.HostName, obj.DeepCopyObject().(client.Object)) + err = s.cachedHostClient.Get(ctx, nameMapping.HostName, mObject) if err == nil { return true, nil } else if !kerrors.IsNotFound(err) { // TODO: filter out other allowed errors here could be Forbidden, Type not found etc. - klog.FromContext(ctx).Info("Error retrieving host object", "hostObject", nameMapping.Host().String()) + klog.FromContext(ctx).Info( + "Error retrieving host object", + "hostObject", nameMapping.Host().String(), + "err", err, + "marker", "gc", + ) + + // (ThomasK33): If the error is a not found, we're going + // to assume that the object is still used. + // + // In case of a transient error (server timeout or others) + // the GC should be able to figure out that it doesn't exist + // anymore on the next GC run. + return true, nil } return false, nil @@ -217,13 +297,13 @@ func (s *Store) start(ctx context.Context) error { } go func() { - wait.Until(func() { + wait.UntilWithContext(ctx, func(ctx context.Context) { for watchEvent := range s.backend.Watch(ctx) { s.handleEvent(ctx, watchEvent) } klog.FromContext(ctx).Info("mapping store watch has ended") - }, time.Second, ctx.Done()) + }, time.Second) }() return nil @@ -233,6 +313,12 @@ func (s *Store) handleEvent(ctx context.Context, watchEvent BackendWatchResponse s.m.Lock() defer s.m.Unlock() + klog.FromContext(ctx).V(1).Info( + "handling mapping store events", + "len", len(watchEvent.Events), + "err", watchEvent.Err, + ) + if watchEvent.Err != nil { klog.FromContext(ctx).Error(watchEvent.Err, "watch err in mappings store") return @@ -377,7 +463,7 @@ func (s *Store) AddReference(ctx context.Context, nameMapping, belongsTo synccon } // check if we need to add mapping - if mapping.NameMapping.Equals(nameMapping) { + if mapping.Equals(nameMapping) { return nil } @@ -456,8 +542,8 @@ func (s *Store) DeleteMapping(ctx context.Context, nameMapping synccontext.NameM } func (s *Store) ReferencesTo(ctx context.Context, vObj synccontext.Object) []synccontext.NameMapping { - s.m.Lock() - defer s.m.Unlock() + s.m.RLock() + defer s.m.RUnlock() retReferences := s.referencesTo(vObj) klog.FromContext(ctx).V(1).Info("Found references for object", "object", vObj.String(), "references", len(retReferences)) diff --git a/pkg/setup/controller_context.go b/pkg/setup/controller_context.go index 3ec0a213b8..6fefa44691 100644 --- a/pkg/setup/controller_context.go +++ b/pkg/setup/controller_context.go @@ -381,7 +381,13 @@ func initControllerContext( Config: vClusterOptions, } - mappingStore, err := store.NewStoreWithVerifyMapping(ctx, virtualManager.GetClient(), localManager.GetClient(), store.NewEtcdBackend(etcdClient), verify.NewVerifyMapping(controllerContext.ToRegisterContext().ToSyncContext("verify-mapping"))) + mappingStore, err := store.NewStoreWithVerifyMapping( + ctx, + virtualManager.GetClient(), + localManager.GetClient(), + store.NewEtcdBackend(etcdClient), + verify.NewVerifyMapping(controllerContext.ToRegisterContext().ToSyncContext("verify-mapping")), + ) if err != nil { return nil, fmt.Errorf("start mapping store: %w", err) } diff --git a/vcluster.yaml b/vcluster.yaml new file mode 100644 index 0000000000..71cd495b7c --- /dev/null +++ b/vcluster.yaml @@ -0,0 +1,59 @@ +controlPlane: + advanced: + virtualScheduler: + enabled: true + backingStore: + etcd: + deploy: + enabled: true + statefulSet: + image: + tag: 3.5.14-0 + distro: + k8s: + apiServer: + extraArgs: + - --service-account-jwks-uri=https://kubernetes.default.svc.cluster.local/openid/v1/jwks + image: + tag: v1.31.1 + controllerManager: + image: + tag: v1.31.1 + enabled: true + scheduler: + image: + tag: v1.31.1 + statefulSet: + scheduling: + podManagementPolicy: OrderedReady + image: + registry: "" + repository: "vcluster" + tag: "dev-conf" + env: + - name: DEBUG + value: "true" + +networking: + advanced: + proxyKubelets: + byHostname: false + byIP: false + +sync: + fromHost: + csiDrivers: + enabled: false + csiStorageCapacities: + enabled: false + nodes: + enabled: true + selector: + all: true + toHost: + persistentVolumes: + enabled: true + priorityClasses: + enabled: true + storageClasses: + enabled: true