Skip to content

Commit

Permalink
Track secrets in consumer reconciler (#2153)
Browse files Browse the repository at this point in the history
* Track secrets in consumer reconciler

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Add E2E test

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Remove unused method

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Move generic run utils in top level dir to avoid import errors

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Use shared utility function and move tracking before using secrets

Signed-off-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
pierDipi authored Aug 9, 2022
1 parent 6d4e4a4 commit d250e80
Show file tree
Hide file tree
Showing 32 changed files with 684 additions and 35 deletions.
5 changes: 5 additions & 0 deletions control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ type Auth struct {
Secret *Secret `json:"secret,omitempty"`
}

func (a *Auth) HasAuth() bool {
return a != nil && a.Secret != nil &&
a.Secret.Ref != nil && a.Secret.Ref.Name != ""
}

type Secret struct {
// Secret reference for SASL and SSL configurations.
Ref *SecretReference `json:"ref,omitempty"`
Expand Down
37 changes: 27 additions & 10 deletions control-plane/pkg/reconciler/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,14 @@ func (r Reconciler) reconcileContractEgress(ctx context.Context, c *kafkainterna
}

func (r Reconciler) reconcileAuth(ctx context.Context, c *kafkainternals.Consumer, resource *contract.Resource) error {

if c.Spec.Auth == nil {
return nil
}

if err := r.trackAuthContext(c, c.Spec.Auth); err != nil {
return err
}

if c.Spec.Auth.NetSpec != nil {
authContext, err := security.ResolveAuthContextFromNetSpec(r.SecretLister, c.GetNamespace(), *c.Spec.Auth.NetSpec)
if err != nil {
Expand All @@ -191,6 +195,7 @@ func (r Reconciler) reconcileAuth(ctx context.Context, c *kafkainternals.Consume
resource.Auth = &contract.Resource_MultiAuthSecret{MultiAuthSecret: authContext.MultiSecretReference}
return nil
}

if c.Spec.Auth.AuthSpec != nil {
secret, err := security.Secret(ctx, &SecretLocator{Consumer: c}, r.SecretProviderFunc())
if err != nil {
Expand All @@ -204,15 +209,6 @@ func (r Reconciler) reconcileAuth(ctx context.Context, c *kafkainternals.Consume
Version: secret.ResourceVersion,
},
}
ref := tracker.Reference{
APIVersion: c.APIVersion,
Kind: c.Kind,
Namespace: c.GetNamespace(),
Name: c.GetName(),
}
if err := r.Tracker.TrackReference(ref, c); err != nil {
return fmt.Errorf("failed to track reference %v: %w", ref, err)
}
return nil
}

Expand Down Expand Up @@ -413,3 +409,24 @@ func podOwnerReference(p *corev1.Pod) base.ConfigMapOption {
})
}
}

func (r *Reconciler) trackAuthContext(c *kafkainternals.Consumer, auth *kafkainternals.Auth) error {
if auth == nil {
return nil
}

if auth.AuthSpec.HasAuth() {
ref := tracker.Reference{
APIVersion: "v1",
Kind: "Secret",
Namespace: c.GetNamespace(),
Name: auth.AuthSpec.Secret.Ref.Name,
}
if err := r.Tracker.TrackReference(ref, c); err != nil {
return fmt.Errorf("failed to track secret for rotation %s/%s: %w", ref.Namespace, ref.Name, err)
}
return nil
}

return security.TrackNetSpecSecrets(r.Tracker, auth.NetSpec, c)
}
6 changes: 5 additions & 1 deletion control-plane/pkg/security/secrets_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ import (

// TrackNetSpecSecrets tracks all secrets referenced by a provided bindings.KafkaNetSpec.
// parent is the object that is tracking changes to those secrets.
func TrackNetSpecSecrets(secretsTracker tracker.Interface, netSpec bindings.KafkaNetSpec, parent metav1.Object) error {
func TrackNetSpecSecrets(secretsTracker tracker.Interface, netSpec *bindings.KafkaNetSpec, parent metav1.Object) error {
if netSpec == nil {
return nil
}

secrets := []bindings.SecretValueFromSource{
netSpec.TLS.Key,
netSpec.TLS.Cert,
Expand Down
2 changes: 1 addition & 1 deletion control-plane/pkg/security/secrets_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func TestTrackNetSpecSecrets(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := TrackNetSpecSecrets(tt.secretsTracker, tt.netSpec, tt.parent); (err != nil) != tt.wantErr {
if err := TrackNetSpecSecrets(tt.secretsTracker, &tt.netSpec, tt.parent); (err != nil) != tt.wantErr {
t.Errorf("TrackNetSpecSecrets() error = %v, wantErr %v", err, tt.wantErr)
}
if tt.secretsTracker.trackReferenceCalls != tt.expectedTrackReferenceCalls {
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/conformance/data_plane_conformance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
pkgtest "knative.dev/pkg/test"

"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"
pkgtesting "knative.dev/eventing-kafka-broker/test/pkg/testing"
pkgtesting "knative.dev/eventing-kafka-broker/test/pkg"
)

func TestBrokerIngress(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/delete_cm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ import (

eventingv1alpha1 "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1"
eventingv1alpha1clientset "knative.dev/eventing-kafka-broker/control-plane/pkg/client/clientset/versioned/typed/eventing/v1alpha1"
testingpkg "knative.dev/eventing-kafka-broker/test/pkg"
"knative.dev/eventing-kafka-broker/test/pkg/addressable"
"knative.dev/eventing-kafka-broker/test/pkg/sink"
testingpkg "knative.dev/eventing-kafka-broker/test/pkg/testing"
)

func TestDeleteSinkConfigMap(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion test/e2e/sacura_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (

kafkatest "knative.dev/eventing-kafka-broker/test/pkg/kafka"
pkgtesting "knative.dev/eventing-kafka-broker/test/pkg/testing"
pkgtest "knative.dev/eventing-kafka-broker/test/pkg"
)

const (
Expand Down Expand Up @@ -136,7 +137,7 @@ func runSacuraTest(t *testing.T, config SacuraTestConfig) {
Name: names.SimpleNameGenerator.GenerateName(sacuraVerifyCommittedOffsetJob + "-" + *topic),
},
&kafkatest.AdminConfig{
BootstrapServers: pkgtesting.BootstrapServersPlaintext,
BootstrapServers: pkgtest.BootstrapServersPlaintext,
Topic: *topic,
Group: consumerGroup,
},
Expand Down
2 changes: 1 addition & 1 deletion test/e2e_broker/broker_event_trasformation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (

"knative.dev/eventing/test/e2e/helpers"

pkgtesting "knative.dev/eventing-kafka-broker/test/pkg"
testbroker "knative.dev/eventing-kafka-broker/test/pkg/broker"
pkgtesting "knative.dev/eventing-kafka-broker/test/pkg/testing"
)

func TestEventTransformationForTriggerV1BrokerV1(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion test/e2e_broker/broker_redelivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/resources"

kafkatesting "knative.dev/eventing-kafka-broker/test/pkg"
"knative.dev/eventing-kafka-broker/test/pkg/broker"
kafkatesting "knative.dev/eventing-kafka-broker/test/pkg/testing"
)

const (
Expand Down
1 change: 1 addition & 0 deletions test/e2e_broker/broker_sasl_ssl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"knative.dev/eventing/test/lib/sender"
duckv1 "knative.dev/pkg/apis/duck/v1"

. "knative.dev/eventing-kafka-broker/test/pkg"
"knative.dev/eventing-kafka-broker/test/pkg/broker"
. "knative.dev/eventing-kafka-broker/test/pkg/testing"
)
Expand Down
2 changes: 1 addition & 1 deletion test/e2e_broker/broker_trigger_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ import (

eventingv1alpha1 "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1"
eventingv1alpha1clientset "knative.dev/eventing-kafka-broker/control-plane/pkg/client/clientset/versioned/typed/eventing/v1alpha1"
testingpkg "knative.dev/eventing-kafka-broker/test/pkg"
"knative.dev/eventing-kafka-broker/test/pkg/addressable"
"knative.dev/eventing-kafka-broker/test/pkg/broker"
"knative.dev/eventing-kafka-broker/test/pkg/sink"
testingpkg "knative.dev/eventing-kafka-broker/test/pkg/testing"
)

/*
Expand Down
2 changes: 1 addition & 1 deletion test/e2e_broker/broker_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ import (

"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker"
testingpkg "knative.dev/eventing-kafka-broker/test/pkg"
brokertest "knative.dev/eventing-kafka-broker/test/pkg/broker"
kafkatest "knative.dev/eventing-kafka-broker/test/pkg/kafka"
testingpkg "knative.dev/eventing-kafka-broker/test/pkg/testing"
)

func TestBrokerTrigger(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/resources"

pkgtesting "knative.dev/eventing-kafka-broker/test/pkg"
"knative.dev/eventing-kafka-broker/test/pkg/broker"
pkgtesting "knative.dev/eventing-kafka-broker/test/pkg/testing"
)

func brokerCreator(client *testlib.Client, name string) {
Expand Down
42 changes: 42 additions & 0 deletions test/e2e_new/features/featuressteps/copy_secret.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2022 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package featuressteps

import (
"context"

"knative.dev/eventing/pkg/utils"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/feature"
)

func CopySecretInTestNamespace(namespace, name string) feature.StepFn {
return func(ctx context.Context, t feature.T) {
tgtNs := environment.FromContext(ctx).Namespace()
_, err := utils.CopySecret(
kubeclient.Get(ctx).CoreV1(),
namespace,
name,
tgtNs,
"default",
)
if err != nil {
t.Fatalf("failed to copy secret %s from %s to %s: %v", name, namespace, tgtNs, err)
}
}
}
64 changes: 64 additions & 0 deletions test/e2e_new/features/kafka_source_create_secrets_after.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2022 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package features

import (
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/system"
"knative.dev/reconciler-test/pkg/feature"
"knative.dev/reconciler-test/pkg/k8s"
"knative.dev/reconciler-test/resources/svc"

"knative.dev/eventing-kafka/test/rekt/resources/kafkasource"
"knative.dev/eventing-kafka/test/rekt/resources/kafkatopic"

"knative.dev/eventing-kafka-broker/test/e2e_new/features/featuressteps"
testingpkg "knative.dev/eventing-kafka-broker/test/pkg"
)

func CreateSecretsAfterKafkaSource() *feature.Feature {
f := feature.NewFeatureNamed("Create secrets after KafkaSource")

topicName := feature.MakeRandomK8sName("topic") // A k8s name is also a valid topic name.
name := feature.MakeRandomK8sName("source")
sink := feature.MakeRandomK8sName("sink")

tlsSecretName := "strimzi-tls-secret"
saslSecretName := "strimzi-sasl-secret"

f.Setup("install kafka topic", kafkatopic.Install(topicName))
f.Setup("install a service", svc.Install(sink, "app", "rekt"))
f.Setup("install a KafkaSource", kafkasource.Install(name,
kafkasource.WithSink(&duckv1.KReference{Kind: "Service", Name: sink, APIVersion: "v1"}, ""),
kafkasource.WithBootstrapServers(testingpkg.BootstrapServersSslSaslScramArr),
kafkasource.WithTopics([]string{topicName}),
kafkasource.WithSASLEnabled(),
kafkasource.WithSASLUser(saslSecretName, "user"),
kafkasource.WithSASLPassword(saslSecretName, "password"),
kafkasource.WithSASLType(saslSecretName, "saslType"),
kafkasource.WithTLSEnabled(),
kafkasource.WithTLSCACert(tlsSecretName, "ca.crt"),
))
f.Setup("KafkaSource is not ready", k8s.IsNotReady(kafkasource.GVR(), name))

f.Requirement("Create TLS secret", featuressteps.CopySecretInTestNamespace(system.Namespace(), tlsSecretName))
f.Requirement("Create SASL secret", featuressteps.CopySecretInTestNamespace(system.Namespace(), saslSecretName))

f.Assert("KafkaSource is ready", kafkasource.IsReady(name))

return f
}
46 changes: 46 additions & 0 deletions test/e2e_new/kafka_source_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
//go:build e2e
// +build e2e

/*
* Copyright 2022 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package e2e_new

import (
"testing"

"knative.dev/pkg/system"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/k8s"
"knative.dev/reconciler-test/pkg/knative"

"knative.dev/eventing-kafka-broker/test/e2e_new/features"
)

func TestKafkaSourceCreateSecretsAfterKafkaSource(t *testing.T) {

t.Parallel()

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.Managed(t),
)

env.Test(ctx, t, features.CreateSecretsAfterKafkaSource())
}
1 change: 1 addition & 0 deletions test/e2e_sink/kafka_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

eventingv1alpha1 "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1"
eventingv1alpha1clientset "knative.dev/eventing-kafka-broker/control-plane/pkg/client/clientset/versioned/typed/eventing/v1alpha1"
. "knative.dev/eventing-kafka-broker/test/pkg"
"knative.dev/eventing-kafka-broker/test/pkg/addressable"
"knative.dev/eventing-kafka-broker/test/pkg/sink"

Expand Down
1 change: 1 addition & 0 deletions test/e2e_sink/kafka_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/utils/pointer"

eventingv1alpha1 "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1"
. "knative.dev/eventing-kafka-broker/test/pkg"
. "knative.dev/eventing-kafka-broker/test/pkg/testing"
)

Expand Down
2 changes: 1 addition & 1 deletion test/e2e_source/kafka_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

eventingkafkahelpers "knative.dev/eventing-kafka/test/e2e/helpers"

testingpkg "knative.dev/eventing-kafka-broker/test/pkg/testing"
testingpkg "knative.dev/eventing-kafka-broker/test/pkg"
)

func TestKafkaSourceUpdate(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion test/pkg/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
duckv1 "knative.dev/pkg/apis/duck/v1"

"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"
testingpkg "knative.dev/eventing-kafka-broker/test/pkg/testing"
testingpkg "knative.dev/eventing-kafka-broker/test/pkg"
)

const BrokerClassEnvVarKey = "BROKER_CLASS"
Expand Down
2 changes: 1 addition & 1 deletion test/pkg/testing/run.go → test/pkg/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package testing
package pkg

import (
"fmt"
Expand Down
Loading

0 comments on commit d250e80

Please sign in to comment.