diff --git a/control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_types.go b/control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_types.go index 1814e2e2f5..b5ddaef91e 100644 --- a/control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_types.go +++ b/control-plane/pkg/apis/eventing/v1alpha1/kafka_sink_types.go @@ -103,6 +103,11 @@ type Auth struct { Secret *Secret `json:"secret,omitempty"` } +func (a *Auth) HasAuth() bool { + return a != nil && a.Secret != nil && + a.Secret.Ref != nil && a.Secret.Ref.Name != "" +} + type Secret struct { // Secret reference for SASL and SSL configurations. Ref *SecretReference `json:"ref,omitempty"` diff --git a/control-plane/pkg/reconciler/consumer/consumer.go b/control-plane/pkg/reconciler/consumer/consumer.go index bc521594b3..7c791b71ad 100644 --- a/control-plane/pkg/reconciler/consumer/consumer.go +++ b/control-plane/pkg/reconciler/consumer/consumer.go @@ -179,10 +179,14 @@ func (r Reconciler) reconcileContractEgress(ctx context.Context, c *kafkainterna } func (r Reconciler) reconcileAuth(ctx context.Context, c *kafkainternals.Consumer, resource *contract.Resource) error { - if c.Spec.Auth == nil { return nil } + + if err := r.trackAuthContext(c, c.Spec.Auth); err != nil { + return err + } + if c.Spec.Auth.NetSpec != nil { authContext, err := security.ResolveAuthContextFromNetSpec(r.SecretLister, c.GetNamespace(), *c.Spec.Auth.NetSpec) if err != nil { @@ -191,6 +195,7 @@ func (r Reconciler) reconcileAuth(ctx context.Context, c *kafkainternals.Consume resource.Auth = &contract.Resource_MultiAuthSecret{MultiAuthSecret: authContext.MultiSecretReference} return nil } + if c.Spec.Auth.AuthSpec != nil { secret, err := security.Secret(ctx, &SecretLocator{Consumer: c}, r.SecretProviderFunc()) if err != nil { @@ -204,15 +209,6 @@ func (r Reconciler) reconcileAuth(ctx context.Context, c *kafkainternals.Consume Version: secret.ResourceVersion, }, } - ref := tracker.Reference{ - APIVersion: c.APIVersion, - Kind: c.Kind, - Namespace: c.GetNamespace(), - Name: c.GetName(), - } - if err := r.Tracker.TrackReference(ref, c); err != nil { - return fmt.Errorf("failed to track reference %v: %w", ref, err) - } return nil } @@ -413,3 +409,24 @@ func podOwnerReference(p *corev1.Pod) base.ConfigMapOption { }) } } + +func (r *Reconciler) trackAuthContext(c *kafkainternals.Consumer, auth *kafkainternals.Auth) error { + if auth == nil { + return nil + } + + if auth.AuthSpec.HasAuth() { + ref := tracker.Reference{ + APIVersion: "v1", + Kind: "Secret", + Namespace: c.GetNamespace(), + Name: auth.AuthSpec.Secret.Ref.Name, + } + if err := r.Tracker.TrackReference(ref, c); err != nil { + return fmt.Errorf("failed to track secret for rotation %s/%s: %w", ref.Namespace, ref.Name, err) + } + return nil + } + + return security.TrackNetSpecSecrets(r.Tracker, auth.NetSpec, c) +} diff --git a/control-plane/pkg/security/secrets_tracker.go b/control-plane/pkg/security/secrets_tracker.go index 123b77f769..503975f38a 100644 --- a/control-plane/pkg/security/secrets_tracker.go +++ b/control-plane/pkg/security/secrets_tracker.go @@ -24,7 +24,11 @@ import ( // TrackNetSpecSecrets tracks all secrets referenced by a provided bindings.KafkaNetSpec. // parent is the object that is tracking changes to those secrets. -func TrackNetSpecSecrets(secretsTracker tracker.Interface, netSpec bindings.KafkaNetSpec, parent metav1.Object) error { +func TrackNetSpecSecrets(secretsTracker tracker.Interface, netSpec *bindings.KafkaNetSpec, parent metav1.Object) error { + if netSpec == nil { + return nil + } + secrets := []bindings.SecretValueFromSource{ netSpec.TLS.Key, netSpec.TLS.Cert, diff --git a/control-plane/pkg/security/secrets_tracker_test.go b/control-plane/pkg/security/secrets_tracker_test.go index c990ac7193..d5be3b2936 100644 --- a/control-plane/pkg/security/secrets_tracker_test.go +++ b/control-plane/pkg/security/secrets_tracker_test.go @@ -131,7 +131,7 @@ func TestTrackNetSpecSecrets(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if err := TrackNetSpecSecrets(tt.secretsTracker, tt.netSpec, tt.parent); (err != nil) != tt.wantErr { + if err := TrackNetSpecSecrets(tt.secretsTracker, &tt.netSpec, tt.parent); (err != nil) != tt.wantErr { t.Errorf("TrackNetSpecSecrets() error = %v, wantErr %v", err, tt.wantErr) } if tt.secretsTracker.trackReferenceCalls != tt.expectedTrackReferenceCalls { diff --git a/test/e2e/conformance/data_plane_conformance_test.go b/test/e2e/conformance/data_plane_conformance_test.go index 481539df28..35140405b3 100644 --- a/test/e2e/conformance/data_plane_conformance_test.go +++ b/test/e2e/conformance/data_plane_conformance_test.go @@ -35,7 +35,7 @@ import ( pkgtest "knative.dev/pkg/test" "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" - pkgtesting "knative.dev/eventing-kafka-broker/test/pkg/testing" + pkgtesting "knative.dev/eventing-kafka-broker/test/pkg" ) func TestBrokerIngress(t *testing.T) { diff --git a/test/e2e/delete_cm_test.go b/test/e2e/delete_cm_test.go index 6fcf09c762..18e3fc7ead 100644 --- a/test/e2e/delete_cm_test.go +++ b/test/e2e/delete_cm_test.go @@ -34,9 +34,9 @@ import ( eventingv1alpha1 "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1" eventingv1alpha1clientset "knative.dev/eventing-kafka-broker/control-plane/pkg/client/clientset/versioned/typed/eventing/v1alpha1" + testingpkg "knative.dev/eventing-kafka-broker/test/pkg" "knative.dev/eventing-kafka-broker/test/pkg/addressable" "knative.dev/eventing-kafka-broker/test/pkg/sink" - testingpkg "knative.dev/eventing-kafka-broker/test/pkg/testing" ) func TestDeleteSinkConfigMap(t *testing.T) { diff --git a/test/e2e/sacura_test.go b/test/e2e/sacura_test.go index 854c1302f6..071d2e3d8f 100644 --- a/test/e2e/sacura_test.go +++ b/test/e2e/sacura_test.go @@ -43,6 +43,7 @@ import ( kafkatest "knative.dev/eventing-kafka-broker/test/pkg/kafka" pkgtesting "knative.dev/eventing-kafka-broker/test/pkg/testing" + pkgtest "knative.dev/eventing-kafka-broker/test/pkg" ) const ( @@ -136,7 +137,7 @@ func runSacuraTest(t *testing.T, config SacuraTestConfig) { Name: names.SimpleNameGenerator.GenerateName(sacuraVerifyCommittedOffsetJob + "-" + *topic), }, &kafkatest.AdminConfig{ - BootstrapServers: pkgtesting.BootstrapServersPlaintext, + BootstrapServers: pkgtest.BootstrapServersPlaintext, Topic: *topic, Group: consumerGroup, }, diff --git a/test/e2e_broker/broker_event_trasformation_test.go b/test/e2e_broker/broker_event_trasformation_test.go index 9710b185a4..2927e54adb 100644 --- a/test/e2e_broker/broker_event_trasformation_test.go +++ b/test/e2e_broker/broker_event_trasformation_test.go @@ -25,8 +25,8 @@ import ( "knative.dev/eventing/test/e2e/helpers" + pkgtesting "knative.dev/eventing-kafka-broker/test/pkg" testbroker "knative.dev/eventing-kafka-broker/test/pkg/broker" - pkgtesting "knative.dev/eventing-kafka-broker/test/pkg/testing" ) func TestEventTransformationForTriggerV1BrokerV1(t *testing.T) { diff --git a/test/e2e_broker/broker_redelivery_test.go b/test/e2e_broker/broker_redelivery_test.go index f5d7c0b91a..3a73f61f5a 100644 --- a/test/e2e_broker/broker_redelivery_test.go +++ b/test/e2e_broker/broker_redelivery_test.go @@ -29,8 +29,8 @@ import ( testlib "knative.dev/eventing/test/lib" "knative.dev/eventing/test/lib/resources" + kafkatesting "knative.dev/eventing-kafka-broker/test/pkg" "knative.dev/eventing-kafka-broker/test/pkg/broker" - kafkatesting "knative.dev/eventing-kafka-broker/test/pkg/testing" ) const ( diff --git a/test/e2e_broker/broker_sasl_ssl_test.go b/test/e2e_broker/broker_sasl_ssl_test.go index 9ae083951e..baa7c9f185 100644 --- a/test/e2e_broker/broker_sasl_ssl_test.go +++ b/test/e2e_broker/broker_sasl_ssl_test.go @@ -38,6 +38,7 @@ import ( "knative.dev/eventing/test/lib/sender" duckv1 "knative.dev/pkg/apis/duck/v1" + . "knative.dev/eventing-kafka-broker/test/pkg" "knative.dev/eventing-kafka-broker/test/pkg/broker" . "knative.dev/eventing-kafka-broker/test/pkg/testing" ) diff --git a/test/e2e_broker/broker_trigger_sink_test.go b/test/e2e_broker/broker_trigger_sink_test.go index 1f2c43db64..b7226ebf43 100644 --- a/test/e2e_broker/broker_trigger_sink_test.go +++ b/test/e2e_broker/broker_trigger_sink_test.go @@ -34,10 +34,10 @@ import ( eventingv1alpha1 "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1" eventingv1alpha1clientset "knative.dev/eventing-kafka-broker/control-plane/pkg/client/clientset/versioned/typed/eventing/v1alpha1" + testingpkg "knative.dev/eventing-kafka-broker/test/pkg" "knative.dev/eventing-kafka-broker/test/pkg/addressable" "knative.dev/eventing-kafka-broker/test/pkg/broker" "knative.dev/eventing-kafka-broker/test/pkg/sink" - testingpkg "knative.dev/eventing-kafka-broker/test/pkg/testing" ) /* diff --git a/test/e2e_broker/broker_trigger_test.go b/test/e2e_broker/broker_trigger_test.go index 0e0f2eef42..ec04237af4 100644 --- a/test/e2e_broker/broker_trigger_test.go +++ b/test/e2e_broker/broker_trigger_test.go @@ -36,9 +36,9 @@ import ( "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" "knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker" + testingpkg "knative.dev/eventing-kafka-broker/test/pkg" brokertest "knative.dev/eventing-kafka-broker/test/pkg/broker" kafkatest "knative.dev/eventing-kafka-broker/test/pkg/kafka" - testingpkg "knative.dev/eventing-kafka-broker/test/pkg/testing" ) func TestBrokerTrigger(t *testing.T) { diff --git a/test/e2e_broker/conformance/control_plane_conformance_test.go b/test/e2e_broker/conformance/control_plane_conformance_test.go index f436a03464..deaed366e7 100644 --- a/test/e2e_broker/conformance/control_plane_conformance_test.go +++ b/test/e2e_broker/conformance/control_plane_conformance_test.go @@ -27,8 +27,8 @@ import ( testlib "knative.dev/eventing/test/lib" "knative.dev/eventing/test/lib/resources" + pkgtesting "knative.dev/eventing-kafka-broker/test/pkg" "knative.dev/eventing-kafka-broker/test/pkg/broker" - pkgtesting "knative.dev/eventing-kafka-broker/test/pkg/testing" ) func brokerCreator(client *testlib.Client, name string) { diff --git a/test/e2e_new/features/featuressteps/copy_secret.go b/test/e2e_new/features/featuressteps/copy_secret.go new file mode 100644 index 0000000000..ee641db256 --- /dev/null +++ b/test/e2e_new/features/featuressteps/copy_secret.go @@ -0,0 +1,42 @@ +/* + * Copyright 2022 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package featuressteps + +import ( + "context" + + "knative.dev/eventing/pkg/utils" + kubeclient "knative.dev/pkg/client/injection/kube/client" + "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/feature" +) + +func CopySecretInTestNamespace(namespace, name string) feature.StepFn { + return func(ctx context.Context, t feature.T) { + tgtNs := environment.FromContext(ctx).Namespace() + _, err := utils.CopySecret( + kubeclient.Get(ctx).CoreV1(), + namespace, + name, + tgtNs, + "default", + ) + if err != nil { + t.Fatalf("failed to copy secret %s from %s to %s: %v", name, namespace, tgtNs, err) + } + } +} diff --git a/test/e2e_new/features/kafka_source_create_secrets_after.go b/test/e2e_new/features/kafka_source_create_secrets_after.go new file mode 100644 index 0000000000..8b956b2d81 --- /dev/null +++ b/test/e2e_new/features/kafka_source_create_secrets_after.go @@ -0,0 +1,64 @@ +/* + * Copyright 2022 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package features + +import ( + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/pkg/system" + "knative.dev/reconciler-test/pkg/feature" + "knative.dev/reconciler-test/pkg/k8s" + "knative.dev/reconciler-test/resources/svc" + + "knative.dev/eventing-kafka/test/rekt/resources/kafkasource" + "knative.dev/eventing-kafka/test/rekt/resources/kafkatopic" + + "knative.dev/eventing-kafka-broker/test/e2e_new/features/featuressteps" + testingpkg "knative.dev/eventing-kafka-broker/test/pkg" +) + +func CreateSecretsAfterKafkaSource() *feature.Feature { + f := feature.NewFeatureNamed("Create secrets after KafkaSource") + + topicName := feature.MakeRandomK8sName("topic") // A k8s name is also a valid topic name. + name := feature.MakeRandomK8sName("source") + sink := feature.MakeRandomK8sName("sink") + + tlsSecretName := "strimzi-tls-secret" + saslSecretName := "strimzi-sasl-secret" + + f.Setup("install kafka topic", kafkatopic.Install(topicName)) + f.Setup("install a service", svc.Install(sink, "app", "rekt")) + f.Setup("install a KafkaSource", kafkasource.Install(name, + kafkasource.WithSink(&duckv1.KReference{Kind: "Service", Name: sink, APIVersion: "v1"}, ""), + kafkasource.WithBootstrapServers(testingpkg.BootstrapServersSslSaslScramArr), + kafkasource.WithTopics([]string{topicName}), + kafkasource.WithSASLEnabled(), + kafkasource.WithSASLUser(saslSecretName, "user"), + kafkasource.WithSASLPassword(saslSecretName, "password"), + kafkasource.WithSASLType(saslSecretName, "saslType"), + kafkasource.WithTLSEnabled(), + kafkasource.WithTLSCACert(tlsSecretName, "ca.crt"), + )) + f.Setup("KafkaSource is not ready", k8s.IsNotReady(kafkasource.GVR(), name)) + + f.Requirement("Create TLS secret", featuressteps.CopySecretInTestNamespace(system.Namespace(), tlsSecretName)) + f.Requirement("Create SASL secret", featuressteps.CopySecretInTestNamespace(system.Namespace(), saslSecretName)) + + f.Assert("KafkaSource is ready", kafkasource.IsReady(name)) + + return f +} diff --git a/test/e2e_new/kafka_source_test.go b/test/e2e_new/kafka_source_test.go new file mode 100644 index 0000000000..bc5dc7f204 --- /dev/null +++ b/test/e2e_new/kafka_source_test.go @@ -0,0 +1,46 @@ +//go:build e2e +// +build e2e + +/* + * Copyright 2022 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package e2e_new + +import ( + "testing" + + "knative.dev/pkg/system" + "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/k8s" + "knative.dev/reconciler-test/pkg/knative" + + "knative.dev/eventing-kafka-broker/test/e2e_new/features" +) + +func TestKafkaSourceCreateSecretsAfterKafkaSource(t *testing.T) { + + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + ) + + env.Test(ctx, t, features.CreateSecretsAfterKafkaSource()) +} diff --git a/test/e2e_sink/kafka_sink.go b/test/e2e_sink/kafka_sink.go index 5c607f68b6..ffd4ce1564 100644 --- a/test/e2e_sink/kafka_sink.go +++ b/test/e2e_sink/kafka_sink.go @@ -29,6 +29,7 @@ import ( eventingv1alpha1 "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1" eventingv1alpha1clientset "knative.dev/eventing-kafka-broker/control-plane/pkg/client/clientset/versioned/typed/eventing/v1alpha1" + . "knative.dev/eventing-kafka-broker/test/pkg" "knative.dev/eventing-kafka-broker/test/pkg/addressable" "knative.dev/eventing-kafka-broker/test/pkg/sink" diff --git a/test/e2e_sink/kafka_sink_test.go b/test/e2e_sink/kafka_sink_test.go index 0284f90907..b48c2cc316 100644 --- a/test/e2e_sink/kafka_sink_test.go +++ b/test/e2e_sink/kafka_sink_test.go @@ -25,6 +25,7 @@ import ( "k8s.io/utils/pointer" eventingv1alpha1 "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1" + . "knative.dev/eventing-kafka-broker/test/pkg" . "knative.dev/eventing-kafka-broker/test/pkg/testing" ) diff --git a/test/e2e_source/kafka_source_test.go b/test/e2e_source/kafka_source_test.go index 99f29b8fa4..a6ee020f1b 100644 --- a/test/e2e_source/kafka_source_test.go +++ b/test/e2e_source/kafka_source_test.go @@ -24,7 +24,7 @@ import ( eventingkafkahelpers "knative.dev/eventing-kafka/test/e2e/helpers" - testingpkg "knative.dev/eventing-kafka-broker/test/pkg/testing" + testingpkg "knative.dev/eventing-kafka-broker/test/pkg" ) func TestKafkaSourceUpdate(t *testing.T) { diff --git a/test/pkg/broker/broker.go b/test/pkg/broker/broker.go index 8248c92bcf..6b1eb108ce 100644 --- a/test/pkg/broker/broker.go +++ b/test/pkg/broker/broker.go @@ -30,7 +30,7 @@ import ( duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" - testingpkg "knative.dev/eventing-kafka-broker/test/pkg/testing" + testingpkg "knative.dev/eventing-kafka-broker/test/pkg" ) const BrokerClassEnvVarKey = "BROKER_CLASS" diff --git a/test/pkg/testing/run.go b/test/pkg/run.go similarity index 99% rename from test/pkg/testing/run.go rename to test/pkg/run.go index 9335e3d7bc..13b9553d63 100644 --- a/test/pkg/testing/run.go +++ b/test/pkg/run.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package testing +package pkg import ( "fmt" diff --git a/test/pkg/sink/verify.go b/test/pkg/sink/verify.go index 3489b19f4b..352265a2fb 100644 --- a/test/pkg/sink/verify.go +++ b/test/pkg/sink/verify.go @@ -25,8 +25,8 @@ import ( "k8s.io/apiserver/pkg/storage/names" testlib "knative.dev/eventing/test/lib" + testingpkg "knative.dev/eventing-kafka-broker/test/pkg" kafkatest "knative.dev/eventing-kafka-broker/test/pkg/kafka" - testingpkg "knative.dev/eventing-kafka-broker/test/pkg/testing" ) func Verify(t *testing.T, client *testlib.Client, mode, topic string, ids []string) { diff --git a/test/pkg/testing/auth.go b/test/pkg/testing/auth.go index d8876b4c59..129d6e4cdc 100644 --- a/test/pkg/testing/auth.go +++ b/test/pkg/testing/auth.go @@ -23,6 +23,8 @@ import ( "github.com/stretchr/testify/assert" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" testlib "knative.dev/eventing/test/lib" + + "knative.dev/eventing-kafka-broker/test/pkg" ) type SecretProvider func(t *testing.T, client *testlib.Client) map[string][]byte @@ -34,10 +36,10 @@ func Plaintext(t *testing.T, _ *testlib.Client) map[string][]byte { } func Ssl(t *testing.T, client *testlib.Client) map[string][]byte { - caSecret, err := client.Kube.CoreV1().Secrets(KafkaClusterNamespace).Get(context.Background(), CaSecretName, metav1.GetOptions{}) + caSecret, err := client.Kube.CoreV1().Secrets(pkg.KafkaClusterNamespace).Get(context.Background(), pkg.CaSecretName, metav1.GetOptions{}) assert.Nil(t, err) - tlsUserSecret, err := client.Kube.CoreV1().Secrets(KafkaClusterNamespace).Get(context.Background(), TlsUserSecretName, metav1.GetOptions{}) + tlsUserSecret, err := client.Kube.CoreV1().Secrets(pkg.KafkaClusterNamespace).Get(context.Background(), pkg.TlsUserSecretName, metav1.GetOptions{}) assert.Nil(t, err) return map[string][]byte{ @@ -50,29 +52,29 @@ func Ssl(t *testing.T, client *testlib.Client) map[string][]byte { func SaslPlaintextScram512(t *testing.T, client *testlib.Client) map[string][]byte { - saslUserSecret, err := client.Kube.CoreV1().Secrets(KafkaClusterNamespace).Get(context.Background(), SaslUserSecretName, metav1.GetOptions{}) + saslUserSecret, err := client.Kube.CoreV1().Secrets(pkg.KafkaClusterNamespace).Get(context.Background(), pkg.SaslUserSecretName, metav1.GetOptions{}) assert.Nil(t, err) return map[string][]byte{ "protocol": []byte("SASL_PLAINTEXT"), "sasl.mechanism": []byte("SCRAM-SHA-512"), - "user": []byte(SaslUserSecretName), + "user": []byte(pkg.SaslUserSecretName), "password": saslUserSecret.Data["password"], } } func SslSaslScram512(t *testing.T, client *testlib.Client) map[string][]byte { - caSecret, err := client.Kube.CoreV1().Secrets(KafkaClusterNamespace).Get(context.Background(), CaSecretName, metav1.GetOptions{}) + caSecret, err := client.Kube.CoreV1().Secrets(pkg.KafkaClusterNamespace).Get(context.Background(), pkg.CaSecretName, metav1.GetOptions{}) assert.Nil(t, err) - saslUserSecret, err := client.Kube.CoreV1().Secrets(KafkaClusterNamespace).Get(context.Background(), SaslUserSecretName, metav1.GetOptions{}) + saslUserSecret, err := client.Kube.CoreV1().Secrets(pkg.KafkaClusterNamespace).Get(context.Background(), pkg.SaslUserSecretName, metav1.GetOptions{}) assert.Nil(t, err) return map[string][]byte{ "protocol": []byte("SASL_SSL"), "sasl.mechanism": []byte("SCRAM-SHA-512"), "ca.crt": caSecret.Data["ca.crt"], - "user": []byte(SaslUserSecretName), + "user": []byte(pkg.SaslUserSecretName), "password": saslUserSecret.Data["password"], } } diff --git a/test/test_images/consumer-group-lag-provider-test/admin.go b/test/test_images/consumer-group-lag-provider-test/admin.go index 428c8acf68..9b24e0cd22 100644 --- a/test/test_images/consumer-group-lag-provider-test/admin.go +++ b/test/test_images/consumer-group-lag-provider-test/admin.go @@ -25,7 +25,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" - testingpkg "knative.dev/eventing-kafka-broker/test/pkg/testing" + testingpkg "knative.dev/eventing-kafka-broker/test/pkg" ) func main() { diff --git a/test/upgrade/continual/broker.go b/test/upgrade/continual/broker.go index 7776a9b0fc..46d27f0b17 100644 --- a/test/upgrade/continual/broker.go +++ b/test/upgrade/continual/broker.go @@ -36,7 +36,7 @@ import ( eventingkafkaupgrade "knative.dev/eventing-kafka/test/upgrade/continual" "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" - testingpkg "knative.dev/eventing-kafka-broker/test/pkg/testing" + testingpkg "knative.dev/eventing-kafka-broker/test/pkg" ) const ( diff --git a/test/upgrade/continual/sink_source.go b/test/upgrade/continual/sink_source.go index a78de876a7..81aa575545 100644 --- a/test/upgrade/continual/sink_source.go +++ b/test/upgrade/continual/sink_source.go @@ -35,8 +35,8 @@ import ( eventing "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1" eventingv1alpha1clientset "knative.dev/eventing-kafka-broker/control-plane/pkg/client/clientset/versioned/typed/eventing/v1alpha1" + testingpkg "knative.dev/eventing-kafka-broker/test/pkg" "knative.dev/eventing-kafka-broker/test/pkg/sink" - testingpkg "knative.dev/eventing-kafka-broker/test/pkg/testing" ) const ( diff --git a/test/upgrade/smoke.go b/test/upgrade/smoke.go index cfb86efc69..85a5bb6a76 100644 --- a/test/upgrade/smoke.go +++ b/test/upgrade/smoke.go @@ -22,8 +22,8 @@ import ( "knative.dev/eventing/test/e2e/helpers" + pkgtesting "knative.dev/eventing-kafka-broker/test/pkg" testbroker "knative.dev/eventing-kafka-broker/test/pkg/broker" - pkgtesting "knative.dev/eventing-kafka-broker/test/pkg/testing" ) func runBrokerSmokeTest(t *testing.T) { diff --git a/vendor/knative.dev/eventing-kafka/test/rekt/resources/kafkasource/kafkasource.go b/vendor/knative.dev/eventing-kafka/test/rekt/resources/kafkasource/kafkasource.go new file mode 100644 index 0000000000..1f91739955 --- /dev/null +++ b/vendor/knative.dev/eventing-kafka/test/rekt/resources/kafkasource/kafkasource.go @@ -0,0 +1,240 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kafkasource + +import ( + "context" + "embed" + "time" + + "k8s.io/apimachinery/pkg/runtime/schema" + duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/reconciler-test/pkg/feature" + "knative.dev/reconciler-test/pkg/k8s" + "knative.dev/reconciler-test/pkg/manifest" +) + +//go:embed *.yaml +var yaml embed.FS + +func GVR() schema.GroupVersionResource { + return schema.GroupVersionResource{Group: "sources.knative.dev", Version: "v1beta1", Resource: "kafkasources"} +} + +// Install will create a KafkaSource resource, using the latest version, augmented with the config fn options. +func Install(name string, opts ...manifest.CfgFn) feature.StepFn { + cfg := map[string]interface{}{ + "name": name, + "version": GVR().Version, + } + for _, fn := range opts { + fn(cfg) + } + return func(ctx context.Context, t feature.T) { + if _, err := manifest.InstallLocalYaml(ctx, cfg); err != nil { + t.Fatal(err, cfg) + } + } +} + +// IsReady tests to see if a KafkaSource becomes ready within the time given. +func IsReady(name string, timings ...time.Duration) feature.StepFn { + return k8s.IsReady(GVR(), name, timings...) +} + +// WithVersion overrides the default API version +func WithVersion(version string) manifest.CfgFn { + return func(cfg map[string]interface{}) { + if version != "" { + cfg["version"] = version + } + } +} + +// WithAnnotations adds annotation to a KafkaSource metadata. +func WithAnnotations(annotations map[string]string) manifest.CfgFn { + return func(cfg map[string]interface{}) { + if annotations != nil { + cfg["annotations"] = annotations + } + } +} + +// WithBootstrapServers adds the bootstrapServers config to a KafkaSource spec. +func WithBootstrapServers(bootstrapServers []string) manifest.CfgFn { + return func(cfg map[string]interface{}) { + if bootstrapServers != nil { + cfg["bootstrapServers"] = bootstrapServers + } + } +} + +// WithTopics adds the topics config to a KafkaSource spec. +func WithTopics(topics []string) manifest.CfgFn { + return func(cfg map[string]interface{}) { + if topics != nil { + cfg["topics"] = topics + } + } +} + +// WithTLSEnabled enables TLS to a KafkaSource spec. +func WithTLSEnabled() manifest.CfgFn { + return func(cfg map[string]interface{}) { + if _, ok := cfg["tls"]; !ok { + cfg["tls"] = map[string]interface{}{} + } + } +} + +// WithTLSCert adds the TLS cert config to a KafkaSource spec. +func WithTLSCert(name, key string) manifest.CfgFn { + return func(cfg map[string]interface{}) { + if _, ok := cfg["tls"]; !ok { + cfg["tls"] = map[string]interface{}{} + } + tls := cfg["tls"].(map[string]interface{}) + if _, ok := tls["cert"]; !ok { + tls["cert"] = map[string]interface{}{} + } + cert := tls["cert"].(map[string]interface{}) + cert["name"] = name + cert["key"] = key + } +} + +// WithTLSKey adds the TLS key config to a KafkaSource spec. +func WithTLSKey(name, key string) manifest.CfgFn { + return func(cfg map[string]interface{}) { + if _, ok := cfg["tls"]; !ok { + cfg["tls"] = map[string]interface{}{} + } + tls := cfg["tls"].(map[string]interface{}) + if _, ok := tls["key"]; !ok { + tls["key"] = map[string]interface{}{} + } + cert := tls["key"].(map[string]interface{}) + cert["name"] = name + cert["key"] = key + } +} + +// WithTLSCACert adds the TLS caCert config to a KafkaSource spec. +func WithTLSCACert(name, key string) manifest.CfgFn { + return func(cfg map[string]interface{}) { + if _, ok := cfg["tls"]; !ok { + cfg["tls"] = map[string]interface{}{} + } + tls := cfg["tls"].(map[string]interface{}) + if _, ok := tls["caCert"]; !ok { + tls["caCert"] = map[string]interface{}{} + } + cert := tls["caCert"].(map[string]interface{}) + cert["name"] = name + cert["key"] = key + } +} + +// WithSASLEnabled enables SASL to a KafkaSource spec. +func WithSASLEnabled() manifest.CfgFn { + return func(cfg map[string]interface{}) { + if _, ok := cfg["sasl"]; !ok { + cfg["sasl"] = map[string]interface{}{} + } + } +} + +// WithSASLUser adds the SASL user config to a KafkaSource spec. +func WithSASLUser(name, key string) manifest.CfgFn { + return func(cfg map[string]interface{}) { + if _, ok := cfg["sasl"]; !ok { + cfg["sasl"] = map[string]interface{}{} + } + sasl := cfg["sasl"].(map[string]interface{}) + if _, ok := sasl["user"]; !ok { + sasl["user"] = map[string]interface{}{} + } + user := sasl["user"].(map[string]interface{}) + user["name"] = name + user["key"] = key + } +} + +// WithSASLPassword adds the SASL password config to a KafkaSource spec. +func WithSASLPassword(name, key string) manifest.CfgFn { + return func(cfg map[string]interface{}) { + if _, ok := cfg["sasl"]; !ok { + cfg["sasl"] = map[string]interface{}{} + } + sasl := cfg["sasl"].(map[string]interface{}) + if _, ok := sasl["password"]; !ok { + sasl["password"] = map[string]interface{}{} + } + password := sasl["password"].(map[string]interface{}) + password["name"] = name + password["key"] = key + } +} + +// WithSASLType adds the SASL type config to a KafkaSource spec. +func WithSASLType(name, key string) manifest.CfgFn { + return func(cfg map[string]interface{}) { + if _, ok := cfg["sasl"]; !ok { + cfg["sasl"] = map[string]interface{}{} + } + sasl := cfg["sasl"].(map[string]interface{}) + if _, ok := sasl["type"]; !ok { + sasl["type"] = map[string]interface{}{} + } + t := sasl["type"].(map[string]interface{}) + t["name"] = name + t["key"] = key + } +} + +// WithSink adds the sink related config to a KafkaSource spec. +func WithSink(ref *duckv1.KReference, uri string) manifest.CfgFn { + return func(cfg map[string]interface{}) { + if _, set := cfg["sink"]; !set { + cfg["sink"] = map[string]interface{}{} + } + sink := cfg["sink"].(map[string]interface{}) + + if uri != "" { + sink["uri"] = uri + } + if ref != nil { + if _, set := sink["ref"]; !set { + sink["ref"] = map[string]interface{}{} + } + sref := sink["ref"].(map[string]interface{}) + sref["apiVersion"] = ref.APIVersion + sref["kind"] = ref.Kind + // skip namespace + sref["name"] = ref.Name + } + } +} + +// WithExtensions set ceoverrides.extensions to a KafkaSource spec. +func WithExtensions(extensions map[string]string) manifest.CfgFn { + return func(cfg map[string]interface{}) { + if extensions != nil { + cfg["extensions"] = extensions + } + } +} diff --git a/vendor/knative.dev/eventing-kafka/test/rekt/resources/kafkasource/kafkasource.yaml b/vendor/knative.dev/eventing-kafka/test/rekt/resources/kafkasource/kafkasource.yaml new file mode 100644 index 0000000000..2c8673cfd0 --- /dev/null +++ b/vendor/knative.dev/eventing-kafka/test/rekt/resources/kafkasource/kafkasource.yaml @@ -0,0 +1,108 @@ +# Copyright 2021 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: sources.knative.dev/{{ .version }} +kind: KafkaSource +metadata: + name: {{ .name }} + namespace: {{ .namespace }} + {{ if .annotations }} + annotations: + {{ range $key, $value := .annotations }} + {{ $key }}: "{{ $value }}" + {{ end }} + {{ end }} +spec: + {{ if .bootstrapServers }} + bootstrapServers: + {{ range $value := .bootstrapServers }} + - "{{ $value }}" + {{ end }} + {{ end }} + {{ if .consumerGroup }} + consumerGroup: {{ .consumerGroup }} + {{ end }} + {{ if .topics }} + topics: + {{ range $value := .topics }} + - "{{ $value }}" + {{ end }} + {{ end }} + {{ if or .tls .sasl }} + net: + {{ if .tls }} + tls: + enable: true + {{ if .tls.cert }} + cert: + secretKeyRef: + name: "{{ .tls.cert.name }}" + key: "{{ .tls.cert.key }}" + {{ end }} + {{ if .tls.key }} + key: + secretKeyRef: + name: "{{ .tls.key.name }}" + key: "{{ .tls.key.key }}" + {{ end }} + {{ if .tls.caCert }} + caCert: + secretKeyRef: + name: "{{ .tls.caCert.name }}" + key: "{{ .tls.caCert.key }}" + {{ end }} + {{ end }} + {{ if .sasl }} + sasl: + enable: true + {{ if .sasl.user }} + user: + secretKeyRef: + name: "{{ .sasl.user.name }}" + key: "{{ .sasl.user.key }}" + {{ end }} + {{ if .sasl.password }} + password: + secretKeyRef: + name: "{{ .sasl.password.name }}" + key: "{{ .sasl.password.key }}" + {{ end }} + {{ if .sasl.type }} + type: + secretKeyRef: + name: "{{ .sasl.type.name }}" + key: "{{ .sasl.type.key }}" + {{ end }} + {{ end }} + {{ end }} + {{ if .extensions }} + ceOverrides: + extensions: + {{ range $key, $value := .extensions }} + {{ $key }}: "{{ $value }}" + {{ end }} + {{ end }} + {{ if .sink }} + sink: + {{ if .sink.ref }} + ref: + kind: {{ .sink.ref.kind }} + namespace: {{ .namespace }} + name: {{ .sink.ref.name }} + apiVersion: {{ .sink.ref.apiVersion }} + {{ end }} + {{ if .sink.uri }} + uri: {{ .sink.uri }} + {{ end }} + {{ end }} diff --git a/vendor/knative.dev/eventing-kafka/test/rekt/resources/kafkatopic/topic.go b/vendor/knative.dev/eventing-kafka/test/rekt/resources/kafkatopic/topic.go new file mode 100644 index 0000000000..3f732bf5a7 --- /dev/null +++ b/vendor/knative.dev/eventing-kafka/test/rekt/resources/kafkatopic/topic.go @@ -0,0 +1,91 @@ +/* +Copyright 2021 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kafkatopic + +import ( + "context" + "embed" + "time" + + "k8s.io/apimachinery/pkg/runtime/schema" + + "knative.dev/reconciler-test/pkg/feature" + "knative.dev/reconciler-test/pkg/k8s" + "knative.dev/reconciler-test/pkg/manifest" +) + +const ( + // kafkaNamespace is the namespace where kafka is installed + kafkaNamespace = "kafka" +) + +//go:embed *.yaml +var yaml embed.FS + +func GVR() schema.GroupVersionResource { + return schema.GroupVersionResource{Group: "kafka.strimzi.io", Version: "v1beta2", Resource: "kafkatopics"} +} + +// Install will create a Kafka Topic via the Strimzi topic CRD,, augmented with the config fn options. +func Install(name string, opts ...manifest.CfgFn) feature.StepFn { + cfg := map[string]interface{}{ + "name": name, + "clusterNamespace": "kafka", + "partitions": 10, + "clusterName": "my-cluster", + } + for _, fn := range opts { + fn(cfg) + } + + return func(ctx context.Context, t feature.T) { + if _, err := manifest.InstallLocalYaml(ctx, cfg); err != nil { + t.Fatal(err, cfg) + } + } +} + +// IsReady tests to see if a KafkaTopic becomes ready within the time given. +func IsReady(name string, timings ...time.Duration) feature.StepFn { + return func(ctx context.Context, t feature.T) { + interval, timeout := k8s.PollTimings(ctx, timings) + if err := k8s.WaitForResourceReady(ctx, t, kafkaNamespace, name, GVR(), interval, timeout); err != nil { + t.Error(GVR(), "did not become ready,", err) + } + } +} + +// WithPartitions overrides the number of partitions (default: 10). +func WithPartitions(partitions string) manifest.CfgFn { + return func(cfg map[string]interface{}) { + cfg["partitions"] = partitions + } +} + +// WithClusterName overrides the Kafka cluster names where to create the topic (default: my-cluster) +func WithClusterName(name string) manifest.CfgFn { + return func(cfg map[string]interface{}) { + cfg["clusterName"] = name + } +} + +// WithClusterNamespace overrides the Kafka cluster namespace where to create the topic (default: kafka) +func WithClusterNamespace(namespace string) manifest.CfgFn { + return func(cfg map[string]interface{}) { + cfg["clusterNamespace"] = namespace + } +} diff --git a/vendor/knative.dev/eventing-kafka/test/rekt/resources/kafkatopic/topic.yaml b/vendor/knative.dev/eventing-kafka/test/rekt/resources/kafkatopic/topic.yaml new file mode 100644 index 0000000000..1d6e2e543b --- /dev/null +++ b/vendor/knative.dev/eventing-kafka/test/rekt/resources/kafkatopic/topic.yaml @@ -0,0 +1,24 @@ +# Copyright 2021 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaTopic +metadata: + name: {{ .name }} + namespace: {{ .clusterNamespace }} + labels: + strimzi.io/cluster: {{ .clusterName }} +spec: + partitions: {{ .partitions }} + replicas: 1 diff --git a/vendor/modules.txt b/vendor/modules.txt index 13b077f816..e51fec1b9b 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1385,6 +1385,8 @@ knative.dev/eventing-kafka/test/lib knative.dev/eventing-kafka/test/lib/resources knative.dev/eventing-kafka/test/rekt/features/kafkachannel knative.dev/eventing-kafka/test/rekt/resources/kafkachannel +knative.dev/eventing-kafka/test/rekt/resources/kafkasource +knative.dev/eventing-kafka/test/rekt/resources/kafkatopic knative.dev/eventing-kafka/test/rekt/resources/resetoffset knative.dev/eventing-kafka/test/upgrade knative.dev/eventing-kafka/test/upgrade/continual