From 241e6a71e15f6ad0707b87b8813f5bff7eec991b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Wed, 28 Aug 2024 13:45:35 +0200 Subject: [PATCH] Support for EventPolicy filters (#4086) * Move filters from dispatcher to core module * Move contract filter converter from ConsumerVerticleBuilder to filter package * Update benchmark module to reflect filter package movements * Run update-codegen * AuthenticationHandler -> AuthHandler * Move OIDC discovery from Receiver to TokenVerifierImpl * Rework tokenVerifier AuthZ to read the cloudevent only once from the request when policies claims matched * Rename TokenVerifier to AuthVerifier * Provision EventPolicy filters in contract * Add e2e test * Run hack/update-codegen.sh * Fix: Always pass cloudevent (can be null) to ingresshandler --- control-plane/pkg/core/config/utils.go | 4 + control-plane/pkg/core/config/utils_test.go | 76 ++++++ data-plane/benchmarks/pom.xml | 6 + .../filter/AllFilterBenchmark.java | 11 +- .../filter/AnyFilterBenchmark.java | 8 +- .../filter/ExactFilterBenchmark.java | 5 +- .../impl => core}/filter/FilterBenchmark.java | 3 +- .../filter/NotFilterBenchmark.java | 11 +- .../filter/PrefixFilterBenchmark.java | 5 +- .../impl => core}/filter/SampleEvent.java | 2 +- .../filter/SuffixFilterBenchmark.java | 5 +- data-plane/core/pom.xml | 4 + .../broker/core}/filter/AttributesFilter.java | 3 +- .../kafka/broker/core/filter/Filter.java | 58 +++++ .../filter/subscriptionsapi/AllFilter.java | 4 +- .../filter/subscriptionsapi/AnyFilter.java | 4 +- .../filter/subscriptionsapi/CeSqlFilter.java | 4 +- .../filter/subscriptionsapi/ExactFilter.java | 4 +- .../filter/subscriptionsapi/NotFilter.java | 4 +- .../filter/subscriptionsapi/PrefixFilter.java | 4 +- .../filter/subscriptionsapi/SuffixFilter.java | 4 +- .../subscriptionsapi/AllFilterTest.java | 4 +- .../subscriptionsapi/AnyFilterTest.java | 4 +- .../subscriptionsapi/CeSqlFilterTest.java | 2 +- .../subscriptionsapi/ExactFilterTest.java | 2 +- .../subscriptionsapi/NotFilterTest.java | 4 +- .../subscriptionsapi/PrefixFilterTest.java | 2 +- .../subscriptionsapi/SuffixFilterTest.java | 2 +- data-plane/dispatcher/pom.xml | 5 - .../kafka/broker/dispatcher/Filter.java | 33 --- .../dispatcher/impl/RecordDispatcherImpl.java | 2 +- .../main/ConsumerVerticleBuilder.java | 35 +-- .../dispatcher/impl/RecordDispatcherTest.java | 2 +- .../receiver/IngressRequestHandler.java | 3 +- .../receiver/RequestToRecordMapper.java | 34 --- .../receiver/impl/ReceiverVerticle.java | 39 +-- .../impl/StrictRequestToRecordMapper.java | 57 ----- .../{TokenVerifier.java => AuthVerifier.java} | 5 +- ...erifierImpl.java => AuthVerifierImpl.java} | 63 ++++- .../receiver/impl/auth/EventPolicy.java | 17 +- ...nticationHandler.java => AuthHandler.java} | 55 ++-- .../handler/IngressRequestHandlerImpl.java | 162 +++++------- .../main/ReceiverVerticleFactory.java | 7 +- .../receiver/impl/ReceiverVerticleTest.java | 3 +- .../impl/ReceiverVerticleTracingTest.java | 5 +- ...nHandlerTest.java => AuthHandlerTest.java} | 100 ++++---- .../IngressRequestHandlerImplTest.java | 21 +- .../broker/tests/AbstractDataPlaneTest.java | 6 +- .../features.yaml | 6 - test/e2e_new/broker_test.go | 20 ++ test/reconciler-tests.sh | 6 +- .../authz/addressable_authz_conformance.go | 237 ++++++++++++++++++ .../rekt/resources/eventpolicy/eventpolicy.go | 192 ++++++++++++++ .../resources/eventpolicy/eventpolicy.yaml | 74 ++++++ vendor/modules.txt | 2 + 55 files changed, 977 insertions(+), 463 deletions(-) rename data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/{dispatcher/impl => core}/filter/AllFilterBenchmark.java (87%) rename data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/{dispatcher/impl => core}/filter/AnyFilterBenchmark.java (90%) rename data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/{dispatcher/impl => core}/filter/ExactFilterBenchmark.java (92%) rename data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/{dispatcher/impl => core}/filter/FilterBenchmark.java (92%) rename data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/{dispatcher/impl => core}/filter/NotFilterBenchmark.java (79%) rename data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/{dispatcher/impl => core}/filter/PrefixFilterBenchmark.java (94%) rename data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/{dispatcher/impl => core}/filter/SampleEvent.java (96%) rename data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/{dispatcher/impl => core}/filter/SuffixFilterBenchmark.java (94%) rename data-plane/{dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl => core/src/main/java/dev/knative/eventing/kafka/broker/core}/filter/AttributesFilter.java (97%) create mode 100644 data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/filter/Filter.java rename data-plane/{dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl => core/src/main/java/dev/knative/eventing/kafka/broker/core}/filter/subscriptionsapi/AllFilter.java (91%) rename data-plane/{dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl => core/src/main/java/dev/knative/eventing/kafka/broker/core}/filter/subscriptionsapi/AnyFilter.java (91%) rename data-plane/{dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl => core/src/main/java/dev/knative/eventing/kafka/broker/core}/filter/subscriptionsapi/CeSqlFilter.java (95%) rename data-plane/{dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl => core/src/main/java/dev/knative/eventing/kafka/broker/core}/filter/subscriptionsapi/ExactFilter.java (84%) rename data-plane/{dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl => core/src/main/java/dev/knative/eventing/kafka/broker/core}/filter/subscriptionsapi/NotFilter.java (90%) rename data-plane/{dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl => core/src/main/java/dev/knative/eventing/kafka/broker/core}/filter/subscriptionsapi/PrefixFilter.java (84%) rename data-plane/{dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl => core/src/main/java/dev/knative/eventing/kafka/broker/core}/filter/subscriptionsapi/SuffixFilter.java (84%) rename data-plane/{dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl => core/src/test/java/dev/knative/eventing/kafka/broker/core}/filter/subscriptionsapi/AllFilterTest.java (95%) rename data-plane/{dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl => core/src/test/java/dev/knative/eventing/kafka/broker/core}/filter/subscriptionsapi/AnyFilterTest.java (95%) rename data-plane/{dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl => core/src/test/java/dev/knative/eventing/kafka/broker/core}/filter/subscriptionsapi/CeSqlFilterTest.java (96%) rename data-plane/{dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl => core/src/test/java/dev/knative/eventing/kafka/broker/core}/filter/subscriptionsapi/ExactFilterTest.java (99%) rename data-plane/{dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl => core/src/test/java/dev/knative/eventing/kafka/broker/core}/filter/subscriptionsapi/NotFilterTest.java (93%) rename data-plane/{dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl => core/src/test/java/dev/knative/eventing/kafka/broker/core}/filter/subscriptionsapi/PrefixFilterTest.java (96%) rename data-plane/{dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl => core/src/test/java/dev/knative/eventing/kafka/broker/core}/filter/subscriptionsapi/SuffixFilterTest.java (96%) delete mode 100644 data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/Filter.java delete mode 100644 data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/RequestToRecordMapper.java delete mode 100644 data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/StrictRequestToRecordMapper.java rename data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/{TokenVerifier.java => AuthVerifier.java} (84%) rename data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/{TokenVerifierImpl.java => AuthVerifierImpl.java} (61%) rename data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/{AuthenticationHandler.java => AuthHandler.java} (60%) rename data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/{AuthenticationHandlerTest.java => AuthHandlerTest.java} (68%) rename test/{config-oidc-authentication => config-auth}/features.yaml (81%) create mode 100644 vendor/knative.dev/eventing/test/rekt/features/authz/addressable_authz_conformance.go create mode 100644 vendor/knative.dev/eventing/test/rekt/resources/eventpolicy/eventpolicy.go create mode 100644 vendor/knative.dev/eventing/test/rekt/resources/eventpolicy/eventpolicy.yaml diff --git a/control-plane/pkg/core/config/utils.go b/control-plane/pkg/core/config/utils.go index 8a30079300..580e4067c8 100644 --- a/control-plane/pkg/core/config/utils.go +++ b/control-plane/pkg/core/config/utils.go @@ -90,6 +90,10 @@ func EventPoliciesFromAppliedEventPoliciesStatus(status duck.AppliedEventPolicie } } + for _, filter := range policy.Spec.Filters { + contractPolicy.Filters = append(contractPolicy.Filters, contract.FromSubscriptionFilter(filter)) + } + eventPolicies = append(eventPolicies, contractPolicy) } diff --git a/control-plane/pkg/core/config/utils_test.go b/control-plane/pkg/core/config/utils_test.go index 3e949a459e..9d4ff8d2ce 100644 --- a/control-plane/pkg/core/config/utils_test.go +++ b/control-plane/pkg/core/config/utils_test.go @@ -23,6 +23,8 @@ import ( "testing" "time" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + "google.golang.org/protobuf/encoding/protojson" eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" "knative.dev/eventing/pkg/apis/feature" @@ -618,6 +620,80 @@ func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) { }, }, }, + }, { + name: "Multiple policies with filters", + applyingPolicies: []string{ + "policy-1", + "policy-2", + }, + existingEventPolicies: []*eventingv1alpha1.EventPolicy{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "policy-1", + Namespace: "my-ns", + }, + Spec: eventingv1alpha1.EventPolicySpec{ + Filters: []eventingv1.SubscriptionsAPIFilter{ + { + CESQL: "true", + }, + }, + }, + Status: eventingv1alpha1.EventPolicyStatus{ + From: []string{ + "from-1", + }, + }, + }, { + ObjectMeta: metav1.ObjectMeta{ + Name: "policy-2", + Namespace: "my-ns", + }, + Spec: eventingv1alpha1.EventPolicySpec{ + Filters: []eventingv1.SubscriptionsAPIFilter{ + { + CESQL: "false", + }, + }, + }, + Status: eventingv1alpha1.EventPolicyStatus{ + From: []string{ + "from-2-*", + }, + }, + }, + }, + namespace: "my-ns", + defaultAuthorizationMode: feature.AuthorizationDenyAll, + expected: []*contract.EventPolicy{ + { + TokenMatchers: []*contract.TokenMatcher{ + exactTokenMatcher("from-1"), + }, + Filters: []*contract.DialectedFilter{ + { + Filter: &contract.DialectedFilter_Cesql{ + Cesql: &contract.CESQL{ + Expression: "true", + }, + }, + }, + }, + }, { + TokenMatchers: []*contract.TokenMatcher{ + prefixTokenMatcher("from-2-"), + }, + Filters: []*contract.DialectedFilter{ + { + Filter: &contract.DialectedFilter_Cesql{ + Cesql: &contract.CESQL{ + Expression: "false", + }, + }, + }, + }, + }, + }, }, { name: "No applying policies - allow-same-namespace default mode", applyingPolicies: []string{}, diff --git a/data-plane/benchmarks/pom.xml b/data-plane/benchmarks/pom.xml index 2cbc78f479..0fa339fe2c 100644 --- a/data-plane/benchmarks/pom.xml +++ b/data-plane/benchmarks/pom.xml @@ -54,6 +54,12 @@ dispatcher ${project.version} + + dev.knative.eventing.kafka.broker + core + ${project.version} + + diff --git a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AllFilterBenchmark.java b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/core/filter/AllFilterBenchmark.java similarity index 87% rename from data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AllFilterBenchmark.java rename to data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/core/filter/AllFilterBenchmark.java index 2faff3027e..294c2ac19c 100644 --- a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AllFilterBenchmark.java +++ b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/core/filter/AllFilterBenchmark.java @@ -14,13 +14,12 @@ * limitations under the License. */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.filter; +package dev.knative.eventing.kafka.broker.core.filter; -import dev.knative.eventing.kafka.broker.dispatcher.Filter; -import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.AllFilter; -import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.ExactFilter; -import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.PrefixFilter; -import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.SuffixFilter; +import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.AllFilter; +import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.ExactFilter; +import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.PrefixFilter; +import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.SuffixFilter; import io.cloudevents.CloudEvent; import java.util.List; import java.util.Map; diff --git a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AnyFilterBenchmark.java b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/core/filter/AnyFilterBenchmark.java similarity index 90% rename from data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AnyFilterBenchmark.java rename to data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/core/filter/AnyFilterBenchmark.java index 702bc75d7f..9f9da46e09 100644 --- a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AnyFilterBenchmark.java +++ b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/core/filter/AnyFilterBenchmark.java @@ -14,10 +14,12 @@ * limitations under the License. */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.filter; +package dev.knative.eventing.kafka.broker.core.filter; -import dev.knative.eventing.kafka.broker.dispatcher.Filter; -import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.*; +import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.AnyFilter; +import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.ExactFilter; +import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.PrefixFilter; +import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.SuffixFilter; import io.cloudevents.CloudEvent; import java.util.List; import java.util.Map; diff --git a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/ExactFilterBenchmark.java b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/core/filter/ExactFilterBenchmark.java similarity index 92% rename from data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/ExactFilterBenchmark.java rename to data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/core/filter/ExactFilterBenchmark.java index d223723612..87768b536a 100644 --- a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/ExactFilterBenchmark.java +++ b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/core/filter/ExactFilterBenchmark.java @@ -14,10 +14,9 @@ * limitations under the License. */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.filter; +package dev.knative.eventing.kafka.broker.core.filter; -import dev.knative.eventing.kafka.broker.dispatcher.Filter; -import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.ExactFilter; +import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.ExactFilter; import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; import io.cloudevents.core.v1.CloudEventV1; diff --git a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/FilterBenchmark.java b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/core/filter/FilterBenchmark.java similarity index 92% rename from data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/FilterBenchmark.java rename to data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/core/filter/FilterBenchmark.java index bc29350bad..0db52c307e 100644 --- a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/FilterBenchmark.java +++ b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/core/filter/FilterBenchmark.java @@ -14,9 +14,8 @@ * limitations under the License. */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.filter; +package dev.knative.eventing.kafka.broker.core.filter; -import dev.knative.eventing.kafka.broker.dispatcher.Filter; import io.cloudevents.CloudEvent; import java.util.concurrent.TimeUnit; import org.openjdk.jmh.annotations.*; diff --git a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/NotFilterBenchmark.java b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/core/filter/NotFilterBenchmark.java similarity index 79% rename from data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/NotFilterBenchmark.java rename to data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/core/filter/NotFilterBenchmark.java index 9b290d5bb7..00f487baec 100644 --- a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/NotFilterBenchmark.java +++ b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/core/filter/NotFilterBenchmark.java @@ -14,13 +14,12 @@ * limitations under the License. */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.filter; +package dev.knative.eventing.kafka.broker.core.filter; -import dev.knative.eventing.kafka.broker.dispatcher.Filter; -import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.ExactFilter; -import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.NotFilter; -import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.PrefixFilter; -import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.SuffixFilter; +import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.ExactFilter; +import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.NotFilter; +import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.PrefixFilter; +import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.SuffixFilter; import io.cloudevents.CloudEvent; import io.cloudevents.core.v1.CloudEventV1; import java.util.Map; diff --git a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/PrefixFilterBenchmark.java b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/core/filter/PrefixFilterBenchmark.java similarity index 94% rename from data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/PrefixFilterBenchmark.java rename to data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/core/filter/PrefixFilterBenchmark.java index e66f338c1e..cb05bbd392 100644 --- a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/PrefixFilterBenchmark.java +++ b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/core/filter/PrefixFilterBenchmark.java @@ -14,10 +14,9 @@ * limitations under the License. */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.filter; +package dev.knative.eventing.kafka.broker.core.filter; -import dev.knative.eventing.kafka.broker.dispatcher.Filter; -import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.PrefixFilter; +import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.PrefixFilter; import io.cloudevents.CloudEvent; import io.cloudevents.core.v1.CloudEventV1; import java.util.Map; diff --git a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/SampleEvent.java b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/core/filter/SampleEvent.java similarity index 96% rename from data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/SampleEvent.java rename to data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/core/filter/SampleEvent.java index 6c230230a4..ebae954a61 100644 --- a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/SampleEvent.java +++ b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/core/filter/SampleEvent.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.filter; +package dev.knative.eventing.kafka.broker.core.filter; import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; diff --git a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/SuffixFilterBenchmark.java b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/core/filter/SuffixFilterBenchmark.java similarity index 94% rename from data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/SuffixFilterBenchmark.java rename to data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/core/filter/SuffixFilterBenchmark.java index 9b45d4a7da..fe332df2ce 100644 --- a/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/SuffixFilterBenchmark.java +++ b/data-plane/benchmarks/src/main/java/dev/knative/eventing/kafka/broker/core/filter/SuffixFilterBenchmark.java @@ -14,10 +14,9 @@ * limitations under the License. */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.filter; +package dev.knative.eventing.kafka.broker.core.filter; -import dev.knative.eventing.kafka.broker.dispatcher.Filter; -import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.SuffixFilter; +import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.SuffixFilter; import io.cloudevents.CloudEvent; import io.cloudevents.core.v1.CloudEventV1; import java.util.Map; diff --git a/data-plane/core/pom.xml b/data-plane/core/pom.xml index 476e3c6600..362a8ec209 100644 --- a/data-plane/core/pom.xml +++ b/data-plane/core/pom.xml @@ -146,6 +146,10 @@ vertx-opentelemetry + + io.cloudevents + cloudevents-sql + io.cloudevents cloudevents-kafka diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AttributesFilter.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/filter/AttributesFilter.java similarity index 97% rename from data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AttributesFilter.java rename to data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/filter/AttributesFilter.java index 876fc5d1b3..b1dd51c0e9 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/AttributesFilter.java +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/filter/AttributesFilter.java @@ -13,11 +13,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.filter; +package dev.knative.eventing.kafka.broker.core.filter; import static java.time.format.DateTimeFormatter.ISO_INSTANT; -import dev.knative.eventing.kafka.broker.dispatcher.Filter; import io.cloudevents.CloudEvent; import io.cloudevents.core.v03.CloudEventV03; import io.cloudevents.core.v1.CloudEventV1; diff --git a/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/filter/Filter.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/filter/Filter.java new file mode 100644 index 0000000000..c6f6943c09 --- /dev/null +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/filter/Filter.java @@ -0,0 +1,58 @@ +/* + * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package dev.knative.eventing.kafka.broker.core.filter; + +import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; +import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.*; +import io.cloudevents.CloudEvent; +import java.util.List; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +/** + * This interface provides an abstraction for filtering {@link CloudEvent} instances. + */ +@FunctionalInterface +public interface Filter extends Predicate { + + /** + * @return noop implementation that always returns true + */ + static Filter noop() { + return ce -> true; + } + + static Filter fromContract(DataPlaneContract.DialectedFilter filter) { + return switch (filter.getFilterCase()) { + case EXACT -> new ExactFilter(filter.getExact().getAttributesMap()); + case PREFIX -> new PrefixFilter(filter.getPrefix().getAttributesMap()); + case SUFFIX -> new SuffixFilter(filter.getSuffix().getAttributesMap()); + case NOT -> new NotFilter(fromContract(filter.getNot().getFilter())); + case ANY -> new AnyFilter(filter.getAny().getFiltersList().stream() + .map(Filter::fromContract) + .collect(Collectors.toList())); + case ALL -> new AllFilter(filter.getAll().getFiltersList().stream() + .map(Filter::fromContract) + .collect(Collectors.toList())); + case CESQL -> new CeSqlFilter(filter.getCesql().getExpression()); + default -> Filter.noop(); + }; + } + + static Filter fromContract(List filters) { + return new AllFilter(filters.stream().map(Filter::fromContract).collect(Collectors.toList())); + } +} diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilter.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/AllFilter.java similarity index 91% rename from data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilter.java rename to data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/AllFilter.java index e1a0ab87a0..404235c4c1 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilter.java +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/AllFilter.java @@ -13,9 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi; +package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi; -import dev.knative.eventing.kafka.broker.dispatcher.Filter; +import dev.knative.eventing.kafka.broker.core.filter.Filter; import io.cloudevents.CloudEvent; import java.util.List; import org.slf4j.Logger; diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/AnyFilter.java similarity index 91% rename from data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java rename to data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/AnyFilter.java index 2325300801..03618b0018 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilter.java +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/AnyFilter.java @@ -13,9 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi; +package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi; -import dev.knative.eventing.kafka.broker.dispatcher.Filter; +import dev.knative.eventing.kafka.broker.core.filter.Filter; import io.cloudevents.CloudEvent; import java.util.List; import org.slf4j.Logger; diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/CeSqlFilter.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/CeSqlFilter.java similarity index 95% rename from data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/CeSqlFilter.java rename to data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/CeSqlFilter.java index 5c9fcb570b..0c8e89ff01 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/CeSqlFilter.java +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/CeSqlFilter.java @@ -13,9 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi; +package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi; -import dev.knative.eventing.kafka.broker.dispatcher.Filter; +import dev.knative.eventing.kafka.broker.core.filter.Filter; import io.cloudevents.CloudEvent; import io.cloudevents.sql.EvaluationContext; import io.cloudevents.sql.EvaluationException; diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/ExactFilter.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/ExactFilter.java similarity index 84% rename from data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/ExactFilter.java rename to data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/ExactFilter.java index 6b4c61285a..0cf13e6778 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/ExactFilter.java +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/ExactFilter.java @@ -13,9 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi; +package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi; -import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.AttributesFilter; +import dev.knative.eventing.kafka.broker.core.filter.AttributesFilter; import java.util.Map; public class ExactFilter extends AttributesFilter { diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/NotFilter.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/NotFilter.java similarity index 90% rename from data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/NotFilter.java rename to data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/NotFilter.java index 8d90034751..70b97bad48 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/NotFilter.java +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/NotFilter.java @@ -13,9 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi; +package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi; -import dev.knative.eventing.kafka.broker.dispatcher.Filter; +import dev.knative.eventing.kafka.broker.core.filter.Filter; import io.cloudevents.CloudEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/PrefixFilter.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/PrefixFilter.java similarity index 84% rename from data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/PrefixFilter.java rename to data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/PrefixFilter.java index e0313a60ca..78b0011160 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/PrefixFilter.java +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/PrefixFilter.java @@ -13,9 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi; +package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi; -import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.AttributesFilter; +import dev.knative.eventing.kafka.broker.core.filter.AttributesFilter; import java.util.Map; public class PrefixFilter extends AttributesFilter { diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/SuffixFilter.java b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/SuffixFilter.java similarity index 84% rename from data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/SuffixFilter.java rename to data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/SuffixFilter.java index dcaf4c5503..86c339afca 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/SuffixFilter.java +++ b/data-plane/core/src/main/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/SuffixFilter.java @@ -13,9 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi; +package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi; -import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.AttributesFilter; +import dev.knative.eventing.kafka.broker.core.filter.AttributesFilter; import java.util.Map; public class SuffixFilter extends AttributesFilter { diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilterTest.java b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/AllFilterTest.java similarity index 95% rename from data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilterTest.java rename to data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/AllFilterTest.java index c77d6209a1..c808f5da64 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AllFilterTest.java +++ b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/AllFilterTest.java @@ -13,11 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi; +package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi; import static org.assertj.core.api.Assertions.assertThat; -import dev.knative.eventing.kafka.broker.dispatcher.Filter; +import dev.knative.eventing.kafka.broker.core.filter.Filter; import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; import java.net.URI; diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilterTest.java b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/AnyFilterTest.java similarity index 95% rename from data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilterTest.java rename to data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/AnyFilterTest.java index c1564c4cb4..4d3313307d 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/AnyFilterTest.java +++ b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/AnyFilterTest.java @@ -13,11 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi; +package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi; import static org.assertj.core.api.Assertions.assertThat; -import dev.knative.eventing.kafka.broker.dispatcher.Filter; +import dev.knative.eventing.kafka.broker.core.filter.Filter; import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; import java.net.URI; diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/CeSqlFilterTest.java b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/CeSqlFilterTest.java similarity index 96% rename from data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/CeSqlFilterTest.java rename to data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/CeSqlFilterTest.java index 15f5eb8b9c..020307567f 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/CeSqlFilterTest.java +++ b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/CeSqlFilterTest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi; +package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi; import static org.assertj.core.api.Assertions.assertThat; diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/ExactFilterTest.java b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/ExactFilterTest.java similarity index 99% rename from data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/ExactFilterTest.java rename to data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/ExactFilterTest.java index 775a4f061f..be380c5ea5 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/ExactFilterTest.java +++ b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/ExactFilterTest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi; +package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi; import static org.assertj.core.api.Assertions.assertThat; diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/NotFilterTest.java b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/NotFilterTest.java similarity index 93% rename from data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/NotFilterTest.java rename to data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/NotFilterTest.java index ff6791ef81..b0754a3cbc 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/NotFilterTest.java +++ b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/NotFilterTest.java @@ -13,11 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi; +package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi; import static org.assertj.core.api.Assertions.assertThat; -import dev.knative.eventing.kafka.broker.dispatcher.Filter; +import dev.knative.eventing.kafka.broker.core.filter.Filter; import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; import java.net.URI; diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/PrefixFilterTest.java b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/PrefixFilterTest.java similarity index 96% rename from data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/PrefixFilterTest.java rename to data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/PrefixFilterTest.java index 0a02ff2bd0..b493740ad7 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/PrefixFilterTest.java +++ b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/PrefixFilterTest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi; +package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi; import static org.assertj.core.api.Assertions.assertThat; diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/SuffixFilterTest.java b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/SuffixFilterTest.java similarity index 96% rename from data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/SuffixFilterTest.java rename to data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/SuffixFilterTest.java index e72749cf01..11d8bde4dc 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/filter/subscriptionsapi/SuffixFilterTest.java +++ b/data-plane/core/src/test/java/dev/knative/eventing/kafka/broker/core/filter/subscriptionsapi/SuffixFilterTest.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi; +package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi; import static org.assertj.core.api.Assertions.assertThat; diff --git a/data-plane/dispatcher/pom.xml b/data-plane/dispatcher/pom.xml index 6146e18b5f..bb49c6bf9b 100644 --- a/data-plane/dispatcher/pom.xml +++ b/data-plane/dispatcher/pom.xml @@ -35,11 +35,6 @@ ${project.version} - - io.cloudevents - cloudevents-sql - - com.github.vladimir-bukhtoyarov bucket4j-core diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/Filter.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/Filter.java deleted file mode 100644 index 67248bef94..0000000000 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/Filter.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dev.knative.eventing.kafka.broker.dispatcher; - -import io.cloudevents.CloudEvent; -import java.util.function.Predicate; - -/** - * This interface provides an abstraction for filtering {@link CloudEvent} instances. - */ -@FunctionalInterface -public interface Filter extends Predicate { - - /** - * @return noop implementation that always returns true - */ - static Filter noop() { - return ce -> true; - } -} diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java index d7da7083b8..7f3d9846cd 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherImpl.java @@ -18,10 +18,10 @@ import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue; import dev.knative.eventing.kafka.broker.core.AsyncCloseable; +import dev.knative.eventing.kafka.broker.core.filter.Filter; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; import dev.knative.eventing.kafka.broker.core.tracing.kafka.ConsumerTracer; import dev.knative.eventing.kafka.broker.dispatcher.CloudEventSender; -import dev.knative.eventing.kafka.broker.dispatcher.Filter; import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcher; import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener; import dev.knative.eventing.kafka.broker.dispatcher.ResponseHandler; diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleBuilder.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleBuilder.java index c7534dc5bb..436ff73059 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleBuilder.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/ConsumerVerticleBuilder.java @@ -21,13 +21,14 @@ import dev.knative.eventing.kafka.broker.core.NamespacedName; import dev.knative.eventing.kafka.broker.core.ReactiveKafkaConsumer; import dev.knative.eventing.kafka.broker.core.ReactiveKafkaProducer; +import dev.knative.eventing.kafka.broker.core.filter.Filter; +import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.ExactFilter; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; import dev.knative.eventing.kafka.broker.core.security.Credentials; import dev.knative.eventing.kafka.broker.core.security.KafkaClientsAuth; import dev.knative.eventing.kafka.broker.core.tracing.kafka.ConsumerTracer; import dev.knative.eventing.kafka.broker.dispatcher.CloudEventSender; import dev.knative.eventing.kafka.broker.dispatcher.DeliveryOrder; -import dev.knative.eventing.kafka.broker.dispatcher.Filter; import dev.knative.eventing.kafka.broker.dispatcher.ResponseHandler; import dev.knative.eventing.kafka.broker.dispatcher.impl.NoopResponseHandler; import dev.knative.eventing.kafka.broker.dispatcher.impl.RecordDispatcherImpl; @@ -40,13 +41,6 @@ import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.OrderedConsumerVerticle; import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.PartitionRevokedHandler; import dev.knative.eventing.kafka.broker.dispatcher.impl.consumer.UnorderedConsumerVerticle; -import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.AllFilter; -import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.AnyFilter; -import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.CeSqlFilter; -import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.ExactFilter; -import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.NotFilter; -import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.PrefixFilter; -import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.SuffixFilter; import dev.knative.eventing.kafka.broker.dispatcher.impl.http.WebClientCloudEventSender; import io.cloudevents.CloudEvent; import io.vertx.core.Future; @@ -62,7 +56,6 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; @@ -184,7 +177,7 @@ public void onPartitionsAssigned(Collection partitions) { private Filter getFilter() { // Dialected filters should override the attributes filter if (consumerVerticleContext.getEgress().getDialectedFilterCount() > 0) { - return getFilter(consumerVerticleContext.getEgress().getDialectedFilterList()); + return Filter.fromContract(consumerVerticleContext.getEgress().getDialectedFilterList()); } else if (consumerVerticleContext.getEgress().hasFilter()) { return new ExactFilter( consumerVerticleContext.getEgress().getFilter().getAttributesMap()); @@ -192,28 +185,6 @@ private Filter getFilter() { return Filter.noop(); } - private static Filter getFilter(List filters) { - return new AllFilter( - filters.stream().map(ConsumerVerticleBuilder::getFilter).collect(Collectors.toList())); - } - - private static Filter getFilter(DataPlaneContract.DialectedFilter filter) { - return switch (filter.getFilterCase()) { - case EXACT -> new ExactFilter(filter.getExact().getAttributesMap()); - case PREFIX -> new PrefixFilter(filter.getPrefix().getAttributesMap()); - case SUFFIX -> new SuffixFilter(filter.getSuffix().getAttributesMap()); - case NOT -> new NotFilter(getFilter(filter.getNot().getFilter())); - case ANY -> new AnyFilter(filter.getAny().getFiltersList().stream() - .map(ConsumerVerticleBuilder::getFilter) - .collect(Collectors.toList())); - case ALL -> new AllFilter(filter.getAll().getFiltersList().stream() - .map(ConsumerVerticleBuilder::getFilter) - .collect(Collectors.toList())); - case CESQL -> new CeSqlFilter(filter.getCesql().getExpression()); - default -> Filter.noop(); - }; - } - private WebClientOptions createWebClientOptionsFromCACerts(final String caCerts) { final var pemTrustOptions = new PemTrustOptions(); for (String trustBundle : consumerVerticleContext.getTrustBundles()) { diff --git a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java index 98ce91556d..d12c7d7853 100644 --- a/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java +++ b/data-plane/dispatcher/src/test/java/dev/knative/eventing/kafka/broker/dispatcher/impl/RecordDispatcherTest.java @@ -28,11 +28,11 @@ import static org.mockito.Mockito.when; import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; +import dev.knative.eventing.kafka.broker.core.filter.Filter; import dev.knative.eventing.kafka.broker.core.metrics.Metrics; import dev.knative.eventing.kafka.broker.core.testing.CoreObjects; import dev.knative.eventing.kafka.broker.dispatcher.CloudEventSender; import dev.knative.eventing.kafka.broker.dispatcher.CloudEventSenderMock; -import dev.knative.eventing.kafka.broker.dispatcher.Filter; import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcher; import dev.knative.eventing.kafka.broker.dispatcher.RecordDispatcherListener; import dev.knative.eventing.kafka.broker.dispatcher.ResponseHandler; diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/IngressRequestHandler.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/IngressRequestHandler.java index 617fc3da0f..0877d2d795 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/IngressRequestHandler.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/IngressRequestHandler.java @@ -16,11 +16,12 @@ package dev.knative.eventing.kafka.broker.receiver; import dev.knative.eventing.kafka.broker.core.reconciler.IngressReconcilerListener; +import io.cloudevents.CloudEvent; /** * This class handles incoming ingress requests. */ public interface IngressRequestHandler extends IngressReconcilerListener { - void handle(RequestContext request, IngressProducer producer); + void handle(RequestContext request, CloudEvent cloudEvent, IngressProducer producer); } diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/RequestToRecordMapper.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/RequestToRecordMapper.java deleted file mode 100644 index b6d6fe6f7b..0000000000 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/RequestToRecordMapper.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dev.knative.eventing.kafka.broker.receiver; - -import io.cloudevents.CloudEvent; -import io.vertx.core.Future; -import io.vertx.core.http.HttpServerRequest; -import org.apache.kafka.clients.producer.ProducerRecord; - -@FunctionalInterface -public interface RequestToRecordMapper { - - /** - * Map the given HTTP request to a Kafka record. - * - * @param request http request. - * @param topic topic to send the event - * @return kafka record (record can be null). - */ - Future> requestToRecord(final HttpServerRequest request, final String topic); -} diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java index 92b3671515..41d0d0d026 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticle.java @@ -27,11 +27,9 @@ import dev.knative.eventing.kafka.broker.receiver.IngressProducer; import dev.knative.eventing.kafka.broker.receiver.IngressRequestHandler; import dev.knative.eventing.kafka.broker.receiver.RequestContext; -import dev.knative.eventing.kafka.broker.receiver.impl.auth.OIDCDiscoveryConfig; +import dev.knative.eventing.kafka.broker.receiver.impl.auth.AuthVerifierImpl; import dev.knative.eventing.kafka.broker.receiver.impl.auth.OIDCDiscoveryConfigListener; -import dev.knative.eventing.kafka.broker.receiver.impl.auth.TokenVerifier; -import dev.knative.eventing.kafka.broker.receiver.impl.auth.TokenVerifierImpl; -import dev.knative.eventing.kafka.broker.receiver.impl.handler.AuthenticationHandler; +import dev.knative.eventing.kafka.broker.receiver.impl.handler.AuthHandler; import dev.knative.eventing.kafka.broker.receiver.impl.handler.MethodNotAllowedHandler; import dev.knative.eventing.kafka.broker.receiver.impl.handler.ProbeHandler; import dev.knative.eventing.kafka.broker.receiver.main.ReceiverEnv; @@ -88,16 +86,17 @@ public class ReceiverVerticle extends AbstractVerticle implements Handler messageConsumer; private IngressProducerReconcilableStore ingressProducerStore; private FileWatcher secretWatcher; + private final AuthVerifierImpl authVerifier; + public ReceiverVerticle( final ReceiverEnv env, final HttpServerOptions httpServerOptions, @@ -122,7 +121,9 @@ public ReceiverVerticle( this.secretVolume = new File(secretVolumePath); this.tlsKeyFile = new File(secretVolumePath + "/tls.key"); this.tlsCrtFile = new File(secretVolumePath + "/tls.crt"); - this.oidcDiscoveryConfigListener = oidcDiscoveryConfigListener; + + this.authVerifier = new AuthVerifierImpl(oidcDiscoveryConfigListener); + this.authHandler = new AuthHandler(this.authVerifier); } public HttpServerOptions getHttpsServerOptions() { @@ -136,14 +137,6 @@ public void start(final Promise startPromise) { .watchIngress(IngressReconcilerListener.all(this.ingressProducerStore, this.ingressRequestHandler)) .buildAndListen(vertx); - // the oidc discovery config is set initially when the listener is started, so we initialize the - // auth handler here, ensuring it is never null - this.buildAuthHandler(oidcDiscoveryConfigListener.getOidcDiscoveryConfig()); - - // the oidc config listener runs the callback whenever the config file is updated - // so, we can be sure that the auth handler always has the up to date config - this.oidcDiscoveryCallbackId = this.oidcDiscoveryConfigListener.registerCallback(this::buildAuthHandler); - this.httpServer = vertx.createHttpServer(this.httpServerOptions); // check whether the secret volume is mounted @@ -160,6 +153,8 @@ public void start(final Promise startPromise) { } } + authVerifier.start(vertx); + final var handler = new ProbeHandler( env.getLivenessProbePath(), env.getReadinessProbePath(), new MethodNotAllowedHandler(this)); @@ -187,11 +182,6 @@ public void start(final Promise startPromise) { setupSecretWatcher(); } - private void buildAuthHandler(OIDCDiscoveryConfig config) { - TokenVerifier tokenVerifier = new TokenVerifierImpl(vertx, config); - this.authenticationHandler = new AuthenticationHandler(tokenVerifier); - } - // Set up the secret watcher private void setupSecretWatcher() { try { @@ -211,7 +201,7 @@ public void stop(Promise stopPromise) throws Exception { .mapEmpty() .onComplete(stopPromise); - this.oidcDiscoveryConfigListener.deregisterCallback(this.oidcDiscoveryCallbackId); + this.authVerifier.stop(); // close the watcher if (this.secretWatcher != null) { @@ -245,11 +235,8 @@ public void handle(final HttpServerRequest request) { return; } - this.authenticationHandler.handle(request, producer, req -> { - // Invoke the ingress request handler - final var requestContext = new RequestContext(req); - this.ingressRequestHandler.handle(requestContext, producer); - }); + RequestContext requestContext = new RequestContext(request); + this.authHandler.handle(requestContext, producer, this.ingressRequestHandler); } public void updateServerConfig() { diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/StrictRequestToRecordMapper.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/StrictRequestToRecordMapper.java deleted file mode 100644 index 35190df194..0000000000 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/StrictRequestToRecordMapper.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright © 2018 Knative Authors (knative-dev@googlegroups.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package dev.knative.eventing.kafka.broker.receiver.impl; - -import dev.knative.eventing.kafka.broker.receiver.RequestToRecordMapper; -import io.cloudevents.CloudEvent; -import io.cloudevents.core.message.MessageReader; -import io.cloudevents.http.vertx.VertxMessageFactory; -import io.vertx.core.Future; -import io.vertx.core.http.HttpServerRequest; -import org.apache.kafka.clients.producer.ProducerRecord; - -/** - * This class implements a strict {@link HttpServerRequest} to {@link ProducerRecord} mapper. - * The conversion will fail if the request does not contain a valid {@link CloudEvent}. - *

- * This class is stateless, hence thread safe and shareable among verticles. - */ -public class StrictRequestToRecordMapper implements RequestToRecordMapper { - - private static class SingletonContainer { - private static final StrictRequestToRecordMapper INSTANCE = new StrictRequestToRecordMapper(); - } - - public static RequestToRecordMapper getInstance() { - return StrictRequestToRecordMapper.SingletonContainer.INSTANCE; - } - - private StrictRequestToRecordMapper() {} - - @Override - public Future> requestToRecord( - final HttpServerRequest request, final String topic) { - - return VertxMessageFactory.createReader(request) - .map(MessageReader::toEvent) - .map(event -> { - if (event == null) { - throw new IllegalArgumentException("event cannot be null"); - } - return new ProducerRecord(topic, event); - }); - } -} diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/TokenVerifier.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/AuthVerifier.java similarity index 84% rename from data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/TokenVerifier.java rename to data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/AuthVerifier.java index 3ffb986c24..dc8dee7a9d 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/TokenVerifier.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/AuthVerifier.java @@ -16,9 +16,10 @@ package dev.knative.eventing.kafka.broker.receiver.impl.auth; import dev.knative.eventing.kafka.broker.receiver.IngressProducer; +import io.cloudevents.CloudEvent; import io.vertx.core.Future; import io.vertx.core.http.HttpServerRequest; -public interface TokenVerifier { - Future verify(HttpServerRequest request, IngressProducer ingressInfo); +public interface AuthVerifier { + Future verify(HttpServerRequest request, IngressProducer ingressInfo); } diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/TokenVerifierImpl.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/AuthVerifierImpl.java similarity index 61% rename from data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/TokenVerifierImpl.java rename to data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/AuthVerifierImpl.java index 164d0e69cf..30336c4966 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/TokenVerifierImpl.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/AuthVerifierImpl.java @@ -17,9 +17,13 @@ import dev.knative.eventing.kafka.broker.core.features.FeaturesConfig; import dev.knative.eventing.kafka.broker.receiver.IngressProducer; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.http.vertx.VertxMessageFactory; import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.http.HttpServerRequest; +import java.util.ArrayList; import java.util.Map; import java.util.stream.Collectors; import org.jose4j.jwt.JwtClaims; @@ -27,17 +31,32 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class TokenVerifierImpl implements TokenVerifier { +public class AuthVerifierImpl implements AuthVerifier { - private static final Logger logger = LoggerFactory.getLogger(TokenVerifierImpl.class); + private static final Logger logger = LoggerFactory.getLogger(AuthVerifierImpl.class); - private final Vertx vertx; + private Vertx vertx; + private final OIDCDiscoveryConfigListener oidcDiscoveryConfigListener; - private final OIDCDiscoveryConfig oidcDiscoveryConfig; + private OIDCDiscoveryConfig oidcDiscoveryConfig; + private int callbackId; - public TokenVerifierImpl(Vertx vertx, OIDCDiscoveryConfig oidcDiscoveryConfig) { + public AuthVerifierImpl(OIDCDiscoveryConfigListener oidcDiscoveryConfigListener) { + this.oidcDiscoveryConfigListener = oidcDiscoveryConfigListener; + } + + public void start(Vertx vertx) { this.vertx = vertx; - this.oidcDiscoveryConfig = oidcDiscoveryConfig; + + oidcDiscoveryConfig = oidcDiscoveryConfigListener.getOidcDiscoveryConfig(); + + callbackId = oidcDiscoveryConfigListener.registerCallback(config -> { + this.oidcDiscoveryConfig = config; + }); + } + + public void stop() { + oidcDiscoveryConfigListener.deregisterCallback(callbackId); } private Future verifyAuthN(String token, IngressProducer ingressInfo) { @@ -85,24 +104,42 @@ private Future verifyAuthN(final HttpServerRequest request, IngressPr return verifyAuthN(token, ingressInfo).onSuccess(v -> request.resume()); } - private Future verifyAuthZ(JwtClaims claims, IngressProducer ingressInfo) { + private Future verifyAuthZ(HttpServerRequest request, JwtClaims claims, IngressProducer ingressInfo) { // claims from Map> to Map> var convertedClaims = claims.flattenClaims().entrySet().stream() .collect(Collectors.toMap( Map.Entry::getKey, v -> v.getValue().stream().map(Object::toString).toList())); + // first we check, if we have EventPolicies, which matches on the claims + final var claimMatchingPolicies = new ArrayList(0); for (EventPolicy ep : ingressInfo.getEventPolicies()) { - if (ep.isAuthorized(convertedClaims)) { - // as soon as one policy allows it, we're good - return Future.succeededFuture(); + if (ep.matchesClaims(convertedClaims)) { + claimMatchingPolicies.add(ep); } } - return Future.failedFuture(new AuthorizationException("Not authorized by any EventPolicy")); + if (claimMatchingPolicies.isEmpty()) { + return Future.failedFuture(new AuthorizationException("Not authorized by any EventPolicy")); + } + + // in case we have Policies which matches on the claims, we check on the filters too, + // so we need to read the cloudevent from the request only, when the "basic" authz check succeeded. + return VertxMessageFactory.createReader(request) + .map(MessageReader::toEvent) + .compose(cloudEvent -> { + for (EventPolicy ep : claimMatchingPolicies) { + if (ep.matchesCloudEvent(cloudEvent)) { + // as soon as one policy matches, we're good + return Future.succeededFuture(cloudEvent); + } + } + + return Future.failedFuture(new AuthorizationException("Not authorized by any EventPolicy")); + }); } - public Future verify(final HttpServerRequest request, IngressProducer ingressInfo) { - return verifyAuthN(request, ingressInfo).compose(jwtClaims -> verifyAuthZ(jwtClaims, ingressInfo)); + public Future verify(final HttpServerRequest request, IngressProducer ingressInfo) { + return verifyAuthN(request, ingressInfo).compose(jwtClaims -> verifyAuthZ(request, jwtClaims, ingressInfo)); } } diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/EventPolicy.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/EventPolicy.java index aa5eba380f..f72c4f93f5 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/EventPolicy.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/auth/EventPolicy.java @@ -16,26 +16,33 @@ package dev.knative.eventing.kafka.broker.receiver.impl.auth; import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; +import dev.knative.eventing.kafka.broker.core.filter.Filter; +import io.cloudevents.CloudEvent; import java.util.List; import java.util.Map; import java.util.stream.Collectors; public class EventPolicy { private final List tokenMatchers; + private final Filter filter; public static EventPolicy fromContract(DataPlaneContract.EventPolicy contractEventPolicy) { - return new EventPolicy(TokenMatcher.fromContract(contractEventPolicy.getTokenMatchersList())); + var tokenMatchers = TokenMatcher.fromContract(contractEventPolicy.getTokenMatchersList()); + var filter = Filter.fromContract(contractEventPolicy.getFiltersList()); + + return new EventPolicy(tokenMatchers, filter); } public static List fromContract(List contractEventPolicies) { return contractEventPolicies.stream().map(EventPolicy::fromContract).collect(Collectors.toList()); } - public EventPolicy(List tokenMatchers) { + public EventPolicy(List tokenMatchers, Filter filter) { this.tokenMatchers = tokenMatchers; + this.filter = filter; } - public boolean isAuthorized(Map> claims) { + public boolean matchesClaims(Map> claims) { for (TokenMatcher matcher : tokenMatchers) { if (matcher.match(claims)) { return true; @@ -44,4 +51,8 @@ public boolean isAuthorized(Map> claims) { return false; } + + public boolean matchesCloudEvent(CloudEvent cloudEvent) { + return filter.test(cloudEvent); + } } diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandler.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthHandler.java similarity index 60% rename from data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandler.java rename to data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthHandler.java index 6e07e502f8..c8b220d232 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandler.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthHandler.java @@ -18,62 +18,77 @@ import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue; import dev.knative.eventing.kafka.broker.receiver.IngressProducer; -import dev.knative.eventing.kafka.broker.receiver.impl.auth.AuthenticationException; -import dev.knative.eventing.kafka.broker.receiver.impl.auth.AuthorizationException; -import dev.knative.eventing.kafka.broker.receiver.impl.auth.TokenVerifier; +import dev.knative.eventing.kafka.broker.receiver.IngressRequestHandler; +import dev.knative.eventing.kafka.broker.receiver.RequestContext; +import dev.knative.eventing.kafka.broker.receiver.impl.auth.*; +import io.cloudevents.core.message.MessageReader; +import io.cloudevents.http.vertx.VertxMessageFactory; import io.netty.handler.codec.http.HttpResponseStatus; -import io.vertx.core.Handler; -import io.vertx.core.http.HttpServerRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Handler checking that the provided request contained a valid JWT. + * Handler checking that the provided request is authenticated and authorized. */ -public class AuthenticationHandler { +public class AuthHandler { - private static final Logger logger = LoggerFactory.getLogger(AuthenticationHandler.class); - private final TokenVerifier tokenVerifier; + private static final Logger logger = LoggerFactory.getLogger(AuthHandler.class); - public AuthenticationHandler(final TokenVerifier tokenVerifier) { - this.tokenVerifier = tokenVerifier; + private final AuthVerifier authVerifier; + + public AuthHandler(AuthVerifier authVerifier) { + this.authVerifier = authVerifier; } public void handle( - final HttpServerRequest request, final IngressProducer ingressInfo, final Handler next) { + final RequestContext requestContext, final IngressProducer ingressInfo, final IngressRequestHandler next) { + if (ingressInfo.getAudience().isEmpty()) { logger.debug("No audience for ingress set. Continue without authentication check..."); - next.handle(request); + + // get CloudEvent from request and pass to ingress handler + VertxMessageFactory.createReader(requestContext.getRequest()) + .map(MessageReader::toEvent) + .onComplete(event -> { + next.handle(requestContext, event.result(), ingressInfo); + }); return; } - tokenVerifier - .verify(request, ingressInfo) + authVerifier + .verify(requestContext.getRequest(), ingressInfo) .onFailure(e -> { if (e instanceof AuthenticationException) { logger.debug( "Failed to verify authentication of request: {}", keyValue("error", e.getMessage())); - request.response() + requestContext + .getRequest() + .response() .setStatusCode(HttpResponseStatus.UNAUTHORIZED.code()) .end(); } else if (e instanceof AuthorizationException) { logger.debug( "Failed to verify authorization of request: {}", keyValue("error", e.getMessage())); - request.response() + requestContext + .getRequest() + .response() .setStatusCode(HttpResponseStatus.FORBIDDEN.code()) .end(); } else { logger.debug( "Got unexpected exception on verifying auth of request: {}", keyValue("error", e.getMessage())); - request.response() + requestContext + .getRequest() + .response() .setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()) .end(); } }) - .onSuccess(v -> { + .onSuccess(cloudEvent -> { logger.debug("Request was authenticated and authorized. Continuing..."); - next.handle(request); + + next.handle(requestContext, cloudEvent, ingressInfo); }); } } diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImpl.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImpl.java index 97245dcd2c..cf0c1944f2 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImpl.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImpl.java @@ -28,7 +28,6 @@ import dev.knative.eventing.kafka.broker.receiver.IngressProducer; import dev.knative.eventing.kafka.broker.receiver.IngressRequestHandler; import dev.knative.eventing.kafka.broker.receiver.RequestContext; -import dev.knative.eventing.kafka.broker.receiver.RequestToRecordMapper; import io.cloudevents.CloudEvent; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Tag; @@ -68,124 +67,101 @@ public class IngressRequestHandlerImpl implements IngressRequestHandler { private static final Logger logger = LoggerFactory.getLogger(IngressRequestHandlerImpl.class); - private final RequestToRecordMapper requestToRecordMapper; private final MeterRegistry meterRegistry; private final EventTypeCreator eventTypeCreator; - public IngressRequestHandlerImpl( - final RequestToRecordMapper requestToRecordMapper, - final MeterRegistry meterRegistry, - final EventTypeCreator eventTypeCreator) { - this.requestToRecordMapper = requestToRecordMapper; + public IngressRequestHandlerImpl(final MeterRegistry meterRegistry, final EventTypeCreator eventTypeCreator) { this.meterRegistry = meterRegistry; this.eventTypeCreator = eventTypeCreator; } @Override - public void handle(final RequestContext requestContext, final IngressProducer producer) { + public void handle(final RequestContext requestContext, CloudEvent cloudEvent, final IngressProducer producer) { final Tags resourceTags = Metrics.resourceRefTags(producer.getReference()); - requestToRecordMapper - .requestToRecord(requestContext.getRequest(), producer.getTopic()) + if (cloudEvent == null) { + requestContext.getRequest().response().setStatusCode(MAPPER_FAILED).end(); + + final var tags = MAPPER_FAILED_COMMON_TAGS.and(resourceTags); + Metrics.eventDispatchLatency(tags).register(meterRegistry).record(requestContext.performLatency()); + Metrics.eventCount(tags).register(meterRegistry).increment(); + + logger.warn( + "Failed to get cloudevent from request {}", + keyValue("path", requestContext.getRequest().path())); + + return; + } + + ProducerRecord record = new ProducerRecord<>(producer.getTopic(), cloudEvent); + // Conversion to record succeeded, let's push it to Kafka + if (logger.isDebugEnabled()) { + final var span = Span.fromContextOrNull(Context.current()); + if (span != null) { + logger.debug( + "Received event {} {}", + keyValue("event", record.value()), + keyValue( + TracingConfig.TRACE_ID_KEY, + span.getSpanContext().getTraceId())); + } else { + logger.debug("Received event {}", keyValue("event", record.value())); + } + } + + // Decorate the span with event specific attributed + TracingSpan.decorateCurrentWithEvent(record.value()); + + final var eventTypeTag = Tag.of(Metrics.Tags.EVENT_TYPE, record.value().getType()); + + publishRecord(producer, record) + .onSuccess(m -> { + requestContext + .getRequest() + .response() + .setStatusCode(RECORD_PRODUCED) + .end(); + + final var tags = + RECORD_PRODUCED_COMMON_TAGS.and(resourceTags).and(eventTypeTag); + Metrics.eventDispatchLatency(tags).register(meterRegistry).record(requestContext.performLatency()); + Metrics.eventCount(tags).register(meterRegistry).increment(); + }) .onFailure(cause -> { - // Conversion to record failed requestContext .getRequest() .response() - .setStatusCode(MAPPER_FAILED) + .setStatusCode(FAILED_TO_PRODUCE) .end(); - final var tags = MAPPER_FAILED_COMMON_TAGS.and(resourceTags); + final var tags = + FAILED_TO_PRODUCE_COMMON_TAGS.and(resourceTags).and(eventTypeTag); Metrics.eventDispatchLatency(tags).register(meterRegistry).record(requestContext.performLatency()); Metrics.eventCount(tags).register(meterRegistry).increment(); logger.warn( - "Failed to convert request to record {}", + "Failed to produce record {}", keyValue("path", requestContext.getRequest().path()), cause); }) - .compose(record -> { - // Conversion to record succeeded, let's push it to Kafka - if (logger.isDebugEnabled()) { - final var span = Span.fromContextOrNull(Context.current()); - if (span != null) { - logger.debug( - "Received event {} {}", - keyValue("event", record.value()), - keyValue( - TracingConfig.TRACE_ID_KEY, - span.getSpanContext().getTraceId())); - } else { - logger.debug("Received event {}", keyValue("event", record.value())); - } + .compose((recordMetadata) -> { + if (producer.isEventTypeAutocreateEnabled()) { + return this.eventTypeCreator + .create(record.value(), producer.getEventTypeLister(), producer.getReference()) + .compose( + et -> { + logger.debug("successfully created eventtype {}", et); + return Future.succeededFuture(recordMetadata); + }, + cause -> { + logger.warn("failed to create eventtype", cause); + return Future.succeededFuture(recordMetadata); + }); + } else { + return Future.succeededFuture(recordMetadata); } - - // Decorate the span with event specific attributed - TracingSpan.decorateCurrentWithEvent(record.value()); - - final var eventTypeTag = - Tag.of(Metrics.Tags.EVENT_TYPE, record.value().getType()); - - return publishRecord(producer, record) - .onSuccess(m -> { - requestContext - .getRequest() - .response() - .setStatusCode(RECORD_PRODUCED) - .end(); - - final var tags = RECORD_PRODUCED_COMMON_TAGS - .and(resourceTags) - .and(eventTypeTag); - Metrics.eventDispatchLatency(tags) - .register(meterRegistry) - .record(requestContext.performLatency()); - Metrics.eventCount(tags).register(meterRegistry).increment(); - }) - .onFailure(cause -> { - requestContext - .getRequest() - .response() - .setStatusCode(FAILED_TO_PRODUCE) - .end(); - - final var tags = FAILED_TO_PRODUCE_COMMON_TAGS - .and(resourceTags) - .and(eventTypeTag); - Metrics.eventDispatchLatency(tags) - .register(meterRegistry) - .record(requestContext.performLatency()); - Metrics.eventCount(tags).register(meterRegistry).increment(); - - logger.warn( - "Failed to produce record {}", - keyValue( - "path", - requestContext.getRequest().path()), - cause); - }) - .compose((recordMetadata) -> { - if (producer.isEventTypeAutocreateEnabled()) { - return this.eventTypeCreator - .create( - record.value(), - producer.getEventTypeLister(), - producer.getReference()) - .compose( - et -> { - logger.debug("successfully created eventtype {}", et); - return Future.succeededFuture(recordMetadata); - }, - cause -> { - logger.warn("failed to create eventtype", cause); - return Future.succeededFuture(recordMetadata); - }); - } else { - return Future.succeededFuture(recordMetadata); - } - }); }); } diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java index aefe5d41a3..5ce119b818 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/ReceiverVerticleFactory.java @@ -23,7 +23,6 @@ import dev.knative.eventing.kafka.broker.receiver.IngressRequestHandler; import dev.knative.eventing.kafka.broker.receiver.impl.IngressProducerReconcilableStore; import dev.knative.eventing.kafka.broker.receiver.impl.ReceiverVerticle; -import dev.knative.eventing.kafka.broker.receiver.impl.StrictRequestToRecordMapper; import dev.knative.eventing.kafka.broker.receiver.impl.auth.OIDCDiscoveryConfigListener; import dev.knative.eventing.kafka.broker.receiver.impl.handler.IngressRequestHandlerImpl; import io.cloudevents.CloudEvent; @@ -69,10 +68,8 @@ class ReceiverVerticleFactory implements Supplier { this.producerConfigs = producerConfigs; this.httpServerOptions = httpServerOptions; this.httpsServerOptions = httpsServerOptions; - this.ingressRequestHandler = new IngressRequestHandlerImpl( - StrictRequestToRecordMapper.getInstance(), - metricsRegistry, - new EventTypeCreatorImpl(eventTypeClient, vertx)); + this.ingressRequestHandler = + new IngressRequestHandlerImpl(metricsRegistry, new EventTypeCreatorImpl(eventTypeClient, vertx)); this.kafkaProducerFactory = kafkaProducerFactory; this.oidcDiscoveryConfigListener = oidcDiscoveryConfigListener; this.eventTypeListerFactory = eventTypeListerFactory; diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java index 9589827f1b..ca28a1bca9 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTest.java @@ -151,8 +151,7 @@ public void setUpHTTP(final Vertx vertx, final VertxTestContext testContext) { httpServerOptions, httpsServerOptions, v -> store, - new IngressRequestHandlerImpl( - StrictRequestToRecordMapper.getInstance(), registry, ((event, lister, reference) -> null)), + new IngressRequestHandlerImpl(registry, ((event, lister, reference) -> null)), SECRET_VOLUME_PATH, mock(OIDCDiscoveryConfigListener.class)); vertx.deployVerticle(verticle, testContext.succeeding(ar -> testContext.completeNow())); diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTracingTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTracingTest.java index f550dcf557..699c54f4a9 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTracingTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/ReceiverVerticleTracingTest.java @@ -135,10 +135,7 @@ public void setup() throws ExecutionException, InterruptedException { httpServerOptions, httpsServerOptions, v -> store, - new IngressRequestHandlerImpl( - StrictRequestToRecordMapper.getInstance(), - Metrics.getRegistry(), - ((event, lister, reference) -> null)), + new IngressRequestHandlerImpl(Metrics.getRegistry(), ((event, lister, reference) -> null)), SECRET_VOLUME_PATH, mock(OIDCDiscoveryConfigListener.class)); diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandlerTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthHandlerTest.java similarity index 68% rename from data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandlerTest.java rename to data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthHandlerTest.java index 84bd88ad06..1fd2365a4c 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthenticationHandlerTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/AuthHandlerTest.java @@ -22,38 +22,40 @@ import dev.knative.eventing.kafka.broker.contract.DataPlaneContract; import dev.knative.eventing.kafka.broker.core.ReactiveKafkaProducer; import dev.knative.eventing.kafka.broker.core.eventtype.EventType; +import dev.knative.eventing.kafka.broker.core.testing.CoreObjects; import dev.knative.eventing.kafka.broker.receiver.IngressProducer; +import dev.knative.eventing.kafka.broker.receiver.IngressRequestHandler; +import dev.knative.eventing.kafka.broker.receiver.RequestContext; +import dev.knative.eventing.kafka.broker.receiver.impl.auth.AuthVerifier; import dev.knative.eventing.kafka.broker.receiver.impl.auth.AuthenticationException; import dev.knative.eventing.kafka.broker.receiver.impl.auth.AuthorizationException; import dev.knative.eventing.kafka.broker.receiver.impl.auth.EventPolicy; -import dev.knative.eventing.kafka.broker.receiver.impl.auth.TokenVerifier; import io.cloudevents.CloudEvent; import io.fabric8.kubernetes.client.informers.cache.Lister; import io.netty.handler.codec.http.HttpResponseStatus; import io.vertx.core.Future; -import io.vertx.core.Handler; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.HttpServerResponse; import java.util.List; import org.junit.jupiter.api.Test; -public class AuthenticationHandlerTest { +public class AuthHandlerTest { @Test public void shouldReturnUnauthorizedWhenJWTValidationFails() { final HttpServerRequest request = mock(HttpServerRequest.class); final var response = mockResponse(request, HttpResponseStatus.UNAUTHORIZED.code()); - TokenVerifier tokenVerifier = new TokenVerifier() { + AuthVerifier authVerifier = new AuthVerifier() { @Override - public Future verify(HttpServerRequest request, IngressProducer ingressInfo) { + public Future verify(HttpServerRequest request, IngressProducer ingressInfo) { return Future.failedFuture(new AuthenticationException("JWT validation failed")); } }; - final AuthenticationHandler authHandler = new AuthenticationHandler(tokenVerifier); + final AuthHandler authHandler = new AuthHandler(authVerifier); authHandler.handle( - request, + new RequestContext(request), new IngressProducer() { @Override public ReactiveKafkaProducer getKafkaProducer() { @@ -85,7 +87,7 @@ public List getEventPolicies() { return null; } }, - mock(Handler.class)); + mock(IngressRequestHandler.class)); verify(response, times(1)).setStatusCode(HttpResponseStatus.UNAUTHORIZED.code()); verify(response, times(1)).end(); @@ -96,17 +98,17 @@ public void shouldReturnForbiddenWhenAuthorizationFails() { final HttpServerRequest request = mock(HttpServerRequest.class); final var response = mockResponse(request, HttpResponseStatus.FORBIDDEN.code()); - TokenVerifier tokenVerifier = new TokenVerifier() { + AuthVerifier authVerifier = new AuthVerifier() { @Override - public Future verify(HttpServerRequest request, IngressProducer ingressInfo) { + public Future verify(HttpServerRequest request, IngressProducer ingressInfo) { return Future.failedFuture(new AuthorizationException("AuthZ failed")); } }; - final AuthenticationHandler authHandler = new AuthenticationHandler(tokenVerifier); + final AuthHandler authHandler = new AuthHandler(authVerifier); authHandler.handle( - request, + new RequestContext(request), new IngressProducer() { @Override public ReactiveKafkaProducer getKafkaProducer() { @@ -138,7 +140,7 @@ public List getEventPolicies() { return null; } }, - mock(Handler.class)); + mock(IngressRequestHandler.class)); verify(response, times(1)).setStatusCode(HttpResponseStatus.FORBIDDEN.code()); verify(response, times(1)).end(); @@ -146,54 +148,54 @@ public List getEventPolicies() { @Test public void shouldContinueWithRequestWhenJWTSucceeds() { - final HttpServerRequest request = mock(HttpServerRequest.class); - final var next = mock(Handler.class); // mockHandler(request); + final RequestContext requestContext = mock(RequestContext.class); + final var next = mock(IngressRequestHandler.class); + final var cloudEvent = CoreObjects.event(); - TokenVerifier tokenVerifier = new TokenVerifier() { + AuthVerifier authVerifier = new AuthVerifier() { @Override - public Future verify(HttpServerRequest request, IngressProducer ingressInfo) { - return Future.succeededFuture(); + public Future verify(HttpServerRequest request, IngressProducer ingressInfo) { + return Future.succeededFuture(cloudEvent); } }; - final AuthenticationHandler authHandler = new AuthenticationHandler(tokenVerifier); + IngressProducer ingressProducer = new IngressProducer() { + @Override + public ReactiveKafkaProducer getKafkaProducer() { + return null; + } - authHandler.handle( - request, - new IngressProducer() { - @Override - public ReactiveKafkaProducer getKafkaProducer() { - return null; - } + @Override + public String getTopic() { + return null; + } - @Override - public String getTopic() { - return null; - } + @Override + public DataPlaneContract.Reference getReference() { + return null; + } - @Override - public DataPlaneContract.Reference getReference() { - return null; - } + @Override + public Lister getEventTypeLister() { + return mock(Lister.class); + } - @Override - public Lister getEventTypeLister() { - return mock(Lister.class); - } + @Override + public String getAudience() { + return "some-required-audience"; + } - @Override - public String getAudience() { - return "some-required-audience"; - } + @Override + public List getEventPolicies() { + return null; + } + }; - @Override - public List getEventPolicies() { - return null; - } - }, - next); + final AuthHandler authHandler = new AuthHandler(authVerifier); + + authHandler.handle(requestContext, ingressProducer, next); - verify(next, times(1)).handle(request); + verify(next, times(1)).handle(requestContext, cloudEvent, ingressProducer); } private static HttpServerResponse mockResponse(final HttpServerRequest request, final int statusCode) { diff --git a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImplTest.java b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImplTest.java index 5289d09ddc..6e6e6d5465 100644 --- a/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImplTest.java +++ b/data-plane/receiver/src/test/java/dev/knative/eventing/kafka/broker/receiver/impl/handler/IngressRequestHandlerImplTest.java @@ -29,7 +29,6 @@ import dev.knative.eventing.kafka.broker.receiver.IngressProducer; import dev.knative.eventing.kafka.broker.receiver.MockReactiveKafkaProducer; import dev.knative.eventing.kafka.broker.receiver.RequestContext; -import dev.knative.eventing.kafka.broker.receiver.RequestToRecordMapper; import dev.knative.eventing.kafka.broker.receiver.impl.auth.EventPolicy; import io.cloudevents.CloudEvent; import io.fabric8.kubernetes.client.informers.cache.Lister; @@ -44,7 +43,6 @@ import io.vertx.micrometer.backends.BackendRegistries; import java.util.List; import org.apache.kafka.clients.producer.MockProducer; -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Test; @@ -68,10 +66,7 @@ public void shouldSendRecordAndTerminateRequestWithFailedToProduce() { } private static void shouldSendRecord(boolean failedToSend, int statusCode) { - final var record = new ProducerRecord<>("topic", 10, "key", CoreObjects.event()); - - final RequestToRecordMapper mapper = (request, topic) -> Future.succeededFuture(record); - + final var cloudEvent = CoreObjects.event(); final ReactiveKafkaProducer producer = mockProducer(); when(producer.send(any())).thenAnswer(invocationOnMock -> { @@ -85,10 +80,9 @@ final var record = new ProducerRecord<>("topic", 10, "key", CoreObjects.event()) final HttpServerRequest request = mockHttpServerRequest("/hello"); final var response = mockResponse(request, statusCode); - final var handler = - new IngressRequestHandlerImpl(mapper, Metrics.getRegistry(), ((event, lister, reference) -> null)); + final var handler = new IngressRequestHandlerImpl(Metrics.getRegistry(), ((event, lister, reference) -> null)); - handler.handle(new RequestContext(request), new IngressProducer() { + handler.handle(new RequestContext(request), cloudEvent, new IngressProducer() { @Override public ReactiveKafkaProducer getKafkaProducer() { return producer; @@ -124,18 +118,15 @@ public List getEventPolicies() { } @Test - public void shouldReturnBadRequestIfNoRecordCanBeCreated() { + public void shouldReturnBadRequestIfNoCloudEvent() { final var producer = mockProducer(); - final RequestToRecordMapper mapper = (request, topic) -> Future.failedFuture(""); - final HttpServerRequest request = mockHttpServerRequest("/hello"); final var response = mockResponse(request, IngressRequestHandlerImpl.MAPPER_FAILED); - final var handler = - new IngressRequestHandlerImpl(mapper, Metrics.getRegistry(), ((event, lister, reference) -> null)); + final var handler = new IngressRequestHandlerImpl(Metrics.getRegistry(), ((event, lister, reference) -> null)); - handler.handle(new RequestContext(request), new IngressProducer() { + handler.handle(new RequestContext(request), null, new IngressProducer() { @Override public ReactiveKafkaProducer getKafkaProducer() { return producer; diff --git a/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java b/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java index 19c24e4ea1..056b2a4482 100644 --- a/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java +++ b/data-plane/tests/src/test/java/dev/knative/eventing/kafka/broker/tests/AbstractDataPlaneTest.java @@ -44,7 +44,6 @@ import dev.knative.eventing.kafka.broker.dispatcher.main.ConsumerVerticleFactoryImpl; import dev.knative.eventing.kafka.broker.receiver.impl.IngressProducerReconcilableStore; import dev.knative.eventing.kafka.broker.receiver.impl.ReceiverVerticle; -import dev.knative.eventing.kafka.broker.receiver.impl.StrictRequestToRecordMapper; import dev.knative.eventing.kafka.broker.receiver.impl.auth.OIDCDiscoveryConfigListener; import dev.knative.eventing.kafka.broker.receiver.impl.handler.IngressRequestHandlerImpl; import dev.knative.eventing.kafka.broker.receiver.main.ReceiverEnv; @@ -398,10 +397,7 @@ private ReceiverVerticle setUpReceiver(final Vertx vertx, final VertxTestContext producerConfigs(), properties -> getReactiveProducerFactory().create(v, properties), mock(EventTypeListerFactory.class)), - new IngressRequestHandlerImpl( - StrictRequestToRecordMapper.getInstance(), - Metrics.getRegistry(), - (((event, lister, reference) -> null))), + new IngressRequestHandlerImpl(Metrics.getRegistry(), (((event, lister, reference) -> null))), SECRET_VOLUME_PATH, mock(OIDCDiscoveryConfigListener.class)); diff --git a/test/config-oidc-authentication/features.yaml b/test/config-auth/features.yaml similarity index 81% rename from test/config-oidc-authentication/features.yaml rename to test/config-auth/features.yaml index ae227d1072..a49fe6eef4 100644 --- a/test/config-oidc-authentication/features.yaml +++ b/test/config-auth/features.yaml @@ -21,11 +21,5 @@ metadata: knative.dev/config-propagation: original knative.dev/config-category: eventing data: - kreference-group: "disabled" - delivery-retryafter: "disabled" - delivery-timeout: "enabled" - kreference-mapping: "disabled" - new-trigger-filters: "enabled" transport-encryption: "strict" - eventtype-auto-create: "disabled" authentication-oidc: "enabled" diff --git a/test/e2e_new/broker_test.go b/test/e2e_new/broker_test.go index cc8f3b5dc2..36aa891e1a 100644 --- a/test/e2e_new/broker_test.go +++ b/test/e2e_new/broker_test.go @@ -34,6 +34,7 @@ import ( "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" "knative.dev/eventing-kafka-broker/test/e2e_new/single_partition_config" "knative.dev/eventing-kafka-broker/test/rekt/features" + "knative.dev/eventing/test/rekt/features/authz" "knative.dev/eventing/test/rekt/features/broker" brokereventingfeatures "knative.dev/eventing/test/rekt/features/broker" "knative.dev/eventing/test/rekt/features/oidc" @@ -323,6 +324,25 @@ func TestBrokerSendsEventsWithOIDCSupport(t *testing.T) { env.TestSet(ctx, t, brokereventingfeatures.BrokerSendEventWithOIDC()) } +func TestBrokerSupportsAuthZ(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.WithPollTimings(4*time.Second, 12*time.Minute), + environment.Managed(t), + eventshub.WithTLS(t), + ) + + name := feature.MakeRandomK8sName("broker") + env.Prerequisite(ctx, t, broker.GoesReady(name, brokerresources.WithEnvConfig()...)) + + env.TestSet(ctx, t, authz.AddressableAuthZConformance(brokerresources.GVR(), "Broker", name)) +} + func TestBrokerDispatcherKedaScaling(t *testing.T) { t.Parallel() diff --git a/test/reconciler-tests.sh b/test/reconciler-tests.sh index 5a35ba123e..e1fc6050e4 100755 --- a/test/reconciler-tests.sh +++ b/test/reconciler-tests.sh @@ -64,11 +64,11 @@ kubectl apply -Rf "$(dirname "$0")/config-transport-encryption" go_test_e2e -timeout=1h ./test/e2e_new -run TLS || fail_test -echo "Running E2E Reconciler Tests with OIDC authentication enabled" +echo "Running E2E Reconciler OIDC and AuthZ Tests" -kubectl apply -Rf "$(dirname "$0")/config-oidc-authentication" +kubectl apply -Rf "$(dirname "$0")/config-auth" -go_test_e2e -timeout=1h ./test/e2e_new -run OIDC || fail_test +go_test_e2e -timeout=1h ./test/e2e_new -run "OIDC|AuthZ" || fail_test if ! ${LOCAL_DEVELOPMENT}; then go_test_e2e -tags=sacura -timeout=40m ./test/e2e/... || fail_test "E2E (sacura) suite failed" diff --git a/vendor/knative.dev/eventing/test/rekt/features/authz/addressable_authz_conformance.go b/vendor/knative.dev/eventing/test/rekt/features/authz/addressable_authz_conformance.go new file mode 100644 index 0000000000..7043e86b08 --- /dev/null +++ b/vendor/knative.dev/eventing/test/rekt/features/authz/addressable_authz_conformance.go @@ -0,0 +1,237 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package authz + +import ( + "context" + "fmt" + "time" + + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + "knative.dev/eventing/test/rekt/resources/eventpolicy" + "knative.dev/eventing/test/rekt/resources/pingsource" + "knative.dev/reconciler-test/pkg/environment" + + "knative.dev/eventing/test/rekt/features/featureflags" + + "github.com/cloudevents/sdk-go/v2/test" + "k8s.io/apimachinery/pkg/runtime/schema" + "knative.dev/reconciler-test/pkg/eventshub" + eventassert "knative.dev/reconciler-test/pkg/eventshub/assert" + "knative.dev/reconciler-test/pkg/feature" + "knative.dev/reconciler-test/pkg/k8s" +) + +// AddressableAuthZConformance returns a feature set to test all Authorization features for an addressable. +func AddressableAuthZConformance(gvr schema.GroupVersionResource, kind, name string) *feature.FeatureSet { + fs := feature.FeatureSet{ + Name: fmt.Sprintf("%s handles authorization features correctly", kind), + Features: []*feature.Feature{ + addressableRespectsEventPolicyFilters(gvr, kind, name), + }, + } + + fs.Features = append(fs.Features, AddressableAuthZConformanceRequestHandling(gvr, kind, name).Features...) + + return &fs +} + +// AddressableAuthZConformanceRequestHandling returns a FeatureSet to test the basic authorization features. +// This basic feature set contains to allow authorized and reject unauthorized requests. In addition it also +// tests, that the addressable becomes unready in case of a NotReady assigned EventPolicy. +func AddressableAuthZConformanceRequestHandling(gvr schema.GroupVersionResource, kind, name string) *feature.FeatureSet { + fs := feature.FeatureSet{ + Name: fmt.Sprintf("%s handles authorization in requests correctly", kind), + Features: []*feature.Feature{ + addressableAllowsAuthorizedRequest(gvr, kind, name), + addressableRejectsUnauthorizedRequest(gvr, kind, name), + addressableBecomesUnreadyOnUnreadyEventPolicy(gvr, kind, name), + }, + } + return &fs +} + +func addressableAllowsAuthorizedRequest(gvr schema.GroupVersionResource, kind, name string) *feature.Feature { + f := feature.NewFeatureNamed(fmt.Sprintf("%s accepts authorized request", kind)) + + f.Prerequisite("OIDC authentication is enabled", featureflags.AuthenticationOIDCEnabled()) + f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict()) + f.Prerequisite("should not run when Istio is enabled", featureflags.IstioDisabled()) + + source := feature.MakeRandomK8sName("source") + eventPolicy := feature.MakeRandomK8sName("eventpolicy") + sourceSubject := feature.MakeRandomK8sName("source-oidc-identity") + + event := test.FullEvent() + + // Install event policy + f.Setup("Install the EventPolicy", func(ctx context.Context, t feature.T) { + namespace := environment.FromContext(ctx).Namespace() + eventpolicy.Install( + eventPolicy, + eventpolicy.WithToRef( + gvr.GroupVersion().WithKind(kind), + name), + eventpolicy.WithFromSubject(fmt.Sprintf("system:serviceaccount:%s:%s", namespace, sourceSubject)), + )(ctx, t) + }) + f.Setup(fmt.Sprintf("EventPolicy for %s %s is ready", kind, name), k8s.IsReady(eventpolicy.GVR(), eventPolicy)) + + // Install source + f.Requirement("install source", eventshub.Install( + source, + eventshub.StartSenderToResourceTLS(gvr, name, nil), + eventshub.InputEvent(event), + eventshub.OIDCSubject(sourceSubject), + )) + + f.Alpha(kind). + Must("event sent", eventassert.OnStore(source).MatchSentEvent(test.HasId(event.ID())).Exact(1)). + Must("get 202 on response", eventassert.OnStore(source).Match(eventassert.MatchStatusCode(202)).AtLeast(1)) + + return f +} + +func addressableRejectsUnauthorizedRequest(gvr schema.GroupVersionResource, kind, name string) *feature.Feature { + f := feature.NewFeatureNamed(fmt.Sprintf("%s rejects unauthorized request", kind)) + + f.Prerequisite("OIDC authentication is enabled", featureflags.AuthenticationOIDCEnabled()) + f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict()) + f.Prerequisite("should not run when Istio is enabled", featureflags.IstioDisabled()) + + source := feature.MakeRandomK8sName("source") + eventPolicy := feature.MakeRandomK8sName("eventpolicy") + + event := test.FullEvent() + + // Install event policy + f.Setup("Install the EventPolicy with from subject that does not match", eventpolicy.Install( + eventPolicy, + eventpolicy.WithToRef( + gvr.GroupVersion().WithKind(kind), + name), + eventpolicy.WithFromSubject("system:serviceaccount:default:unknown-identity"), + )) + f.Setup(fmt.Sprintf("EventPolicy for %s %s is ready", kind, name), k8s.IsReady(eventpolicy.GVR(), eventPolicy)) + + // Install source + f.Requirement("install source", eventshub.Install( + source, + eventshub.StartSenderToResourceTLS(gvr, name, nil), + eventshub.InputEvent(event), + eventshub.InitialSenderDelay(10*time.Second), + )) + + f.Alpha(kind). + Must("event sent", eventassert.OnStore(source).MatchSentEvent(test.HasId(event.ID())).Exact(1)). + Must("get 403 on response", eventassert.OnStore(source).Match(eventassert.MatchStatusCode(403)).AtLeast(1)) + + return f +} + +func addressableRespectsEventPolicyFilters(gvr schema.GroupVersionResource, kind, name string) *feature.Feature { + f := feature.NewFeatureNamed(fmt.Sprintf("%s only admits events that pass the event policy filter", kind)) + + f.Prerequisite("OIDC authentication is enabled", featureflags.AuthenticationOIDCEnabled()) + f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict()) + f.Prerequisite("should not run when Istio is enabled", featureflags.IstioDisabled()) + + eventPolicy := feature.MakeRandomK8sName("eventpolicy") + source1 := feature.MakeRandomK8sName("source") + sourceSubject1 := feature.MakeRandomK8sName("source-oidc-identity") + source2 := feature.MakeRandomK8sName("source") + sourceSubject2 := feature.MakeRandomK8sName("source-oidc-identity") + + event1 := test.FullEvent() + event1.SetType("valid.event.type") + event1.SetID("1") + event2 := test.FullEvent() + event2.SetType("invalid.event.type") + event2.SetID("2") + + // Install event policy + f.Setup("Install the EventPolicy", func(ctx context.Context, t feature.T) { + namespace := environment.FromContext(ctx).Namespace() + eventpolicy.Install( + eventPolicy, + eventpolicy.WithToRef( + gvr.GroupVersion().WithKind(kind), + name), + eventpolicy.WithFromSubject(fmt.Sprintf("system:serviceaccount:%s:%s", namespace, sourceSubject1)), + eventpolicy.WithFromSubject(fmt.Sprintf("system:serviceaccount:%s:%s", namespace, sourceSubject2)), + eventpolicy.WithFilters([]eventingv1.SubscriptionsAPIFilter{ + { + Prefix: map[string]string{ + "type": "valid", + }, + }, + }), + )(ctx, t) + }) + f.Setup(fmt.Sprintf("EventPolicy for %s %s is ready", kind, name), k8s.IsReady(eventpolicy.GVR(), eventPolicy)) + + // Install source + f.Requirement("install source 1", eventshub.Install( + source1, + eventshub.StartSenderToResourceTLS(gvr, name, nil), + eventshub.InputEvent(event1), + eventshub.OIDCSubject(sourceSubject1), + )) + + f.Requirement("install source 2", eventshub.Install( + source2, + eventshub.StartSenderToResourceTLS(gvr, name, nil), + eventshub.InputEvent(event2), + eventshub.OIDCSubject(sourceSubject2), + )) + + f.Alpha(kind). + Must("valid event sent", eventassert.OnStore(source1).MatchSentEvent(test.HasId(event1.ID())).Exact(1)). + Must("get 202 on response", eventassert.OnStore(source1).Match(eventassert.MatchStatusCode(202)).AtLeast(1)) + + f.Alpha(kind). + Must("invalid event sent", eventassert.OnStore(source2).MatchSentEvent(test.HasId(event2.ID())).Exact(1)). + Must("get 403 on response", eventassert.OnStore(source2).Match(eventassert.MatchStatusCode(403)).AtLeast(1)) + + return f +} + +func addressableBecomesUnreadyOnUnreadyEventPolicy(gvr schema.GroupVersionResource, kind, name string) *feature.Feature { + f := feature.NewFeatureNamed(fmt.Sprintf("%s becomes NotReady when EventPolicy is NotReady", kind)) + + f.Prerequisite("OIDC authentication is enabled", featureflags.AuthenticationOIDCEnabled()) + f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict()) + f.Prerequisite("should not run when Istio is enabled", featureflags.IstioDisabled()) + + eventPolicy := feature.MakeRandomK8sName("eventpolicy") + + f.Setup(fmt.Sprintf("%s is ready initially", kind), k8s.IsReady(gvr, name)) + + // Install event policy + f.Requirement("Install the EventPolicy", eventpolicy.Install( + eventPolicy, + eventpolicy.WithToRef( + gvr.GroupVersion().WithKind(kind), + name), + eventpolicy.WithFromRef(pingsource.Gvr().GroupVersion().WithKind("PingSource"), "doesnt-exist", "doesnt-exist"), + )) + f.Requirement(fmt.Sprintf("EventPolicy for %s %s is NotReady", kind, name), k8s.IsNotReady(eventpolicy.GVR(), eventPolicy)) + + f.Alpha(kind).Must("become NotReady with NotReady EventPolicy ", k8s.IsNotReady(gvr, name)) + + return f +} diff --git a/vendor/knative.dev/eventing/test/rekt/resources/eventpolicy/eventpolicy.go b/vendor/knative.dev/eventing/test/rekt/resources/eventpolicy/eventpolicy.go new file mode 100644 index 0000000000..6d44293bd1 --- /dev/null +++ b/vendor/knative.dev/eventing/test/rekt/resources/eventpolicy/eventpolicy.go @@ -0,0 +1,192 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eventpolicy + +import ( + "context" + "embed" + "encoding/json" + "strings" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "knative.dev/reconciler-test/pkg/feature" + "knative.dev/reconciler-test/pkg/k8s" + "knative.dev/reconciler-test/pkg/manifest" + "sigs.k8s.io/yaml" + + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" +) + +//go:embed *.yaml +var yamlEmbed embed.FS + +func GVR() schema.GroupVersionResource { + return schema.GroupVersionResource{Group: "eventing.knative.dev", Version: "v1alpha1", Resource: "eventpolicies"} +} + +// Install will create an EventPolicy resource, augmented with the config fn options. +func Install(name string, opts ...manifest.CfgFn) feature.StepFn { + cfg := map[string]interface{}{ + "name": name, + } + for _, fn := range opts { + fn(cfg) + } + return func(ctx context.Context, t feature.T) { + if _, err := manifest.InstallYamlFS(ctx, yamlEmbed, cfg); err != nil { + t.Fatal(err) + } + } +} + +func WithToRef(gvk schema.GroupVersionKind, name string) manifest.CfgFn { + return func(cfg map[string]interface{}) { + if _, set := cfg["to"]; !set { + cfg["to"] = []map[string]interface{}{} + } + + res := cfg["to"].([]map[string]interface{}) + + to := map[string]interface{}{ + "ref": map[string]interface{}{ + "apiVersion": gvk.GroupVersion().String(), + "kind": gvk.Kind, + "name": name, + }} + + res = append(res, to) + + cfg["to"] = res + } +} + +func WithToSelector(gvk schema.GroupVersionKind, labelSelector *metav1.LabelSelector) manifest.CfgFn { + return func(cfg map[string]interface{}) { + if _, set := cfg["to"]; !set { + cfg["to"] = []map[string]interface{}{} + } + + res := cfg["to"].([]map[string]interface{}) + + selector := labelSelectorToStringMap(labelSelector) + selector["apiVersion"] = gvk.GroupVersion().String() + selector["kind"] = gvk.Kind + + to := map[string]interface{}{ + "selector": selector, + } + + res = append(res, to) + + cfg["to"] = res + } +} + +func WithFromRef(gvk schema.GroupVersionKind, name, namespace string) manifest.CfgFn { + return func(cfg map[string]interface{}) { + if _, set := cfg["from"]; !set { + cfg["from"] = []map[string]interface{}{} + } + + res := cfg["from"].([]map[string]interface{}) + + from := map[string]interface{}{ + "ref": map[string]interface{}{ + "apiVersion": gvk.GroupVersion().String(), + "kind": gvk.Kind, + "name": name, + "namespace": namespace, + }} + + res = append(res, from) + + cfg["from"] = res + } +} + +func WithFromSubject(subject string) manifest.CfgFn { + return func(cfg map[string]interface{}) { + if _, set := cfg["from"]; !set { + cfg["from"] = []map[string]interface{}{} + } + + res := cfg["from"].([]map[string]interface{}) + + from := map[string]interface{}{ + "sub": subject, + } + + res = append(res, from) + + cfg["from"] = res + } +} + +func WithFilters(filters []eventingv1.SubscriptionsAPIFilter) manifest.CfgFn { + jsonBytes, err := json.Marshal(filters) + if err != nil { + panic(err) + } + + yamlBytes, err := yaml.JSONToYAML(jsonBytes) + if err != nil { + panic(err) + } + + filtersYaml := string(yamlBytes) + + lines := strings.Split(filtersYaml, "\n") + out := make([]string, 0, len(lines)) + for i := range lines { + out = append(out, " "+lines[i]) + } + + return func(m map[string]interface{}) { + m["filters"] = strings.Join(out, "\n") + } +} + +// IsReady tests to see if an EventPolicy becomes ready within the time given. +func IsReady(name string, timing ...time.Duration) feature.StepFn { + return k8s.IsReady(GVR(), name, timing...) +} + +func labelSelectorToStringMap(selector *metav1.LabelSelector) map[string]interface{} { + if selector == nil { + return map[string]interface{}{} + } + + r := map[string]interface{}{} + + r["matchLabels"] = selector.MatchLabels + + if selector.MatchExpressions != nil { + me := []map[string]interface{}{} + for _, ml := range selector.MatchExpressions { + me = append(me, map[string]interface{}{ + "key": ml.Key, + "operator": ml.Operator, + "values": ml.Values, + }) + } + r["matchExpressions"] = me + } + + return r +} diff --git a/vendor/knative.dev/eventing/test/rekt/resources/eventpolicy/eventpolicy.yaml b/vendor/knative.dev/eventing/test/rekt/resources/eventpolicy/eventpolicy.yaml new file mode 100644 index 0000000000..84925531e1 --- /dev/null +++ b/vendor/knative.dev/eventing/test/rekt/resources/eventpolicy/eventpolicy.yaml @@ -0,0 +1,74 @@ +# Copyright 2020 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: eventing.knative.dev/v1alpha1 +kind: EventPolicy +metadata: + name: {{ .name }} + namespace: {{ .namespace }} +spec: + {{ if .to }} + to: + {{ range $to := .to }} + {{ if $to.ref }} + - ref: + apiVersion: {{ $to.ref.apiVersion }} + kind: {{ $to.ref.kind }} + name: {{ $to.ref.name }} + {{ end }} #end if $to.ref + + {{ if $to.selector }} + - selector: + apiVersion: {{ $to.selector.apiVersion }} + kind: {{ $to.selector.kind }} + {{ if $to.selector.matchLabels }} + matchLabels: + {{ range $key, $value := $to.selector.matchLabels }} + {{ $key }}: {{ $value }} + {{ end }} + {{ end }} #end if to.matchLabels + + {{ if $to.selector.matchExpressions }} + matchExpressions: + {{ range $expr := $to.selector.matchExpressions }} + - key: {{ $expr.key }} + operator: {{ $expr.operator }} + values: + {{ range $exprValue := $expr.values }} + - {{ $exprValue }} + {{ end }} + {{ end }} #end matchExpressions range + {{ end }} # end if matchExpressions + {{ end }} #end if $to.selector + {{ end }} #end "range $to" + {{ end }} #end "if .to" + + from: + {{ range $from := .from }} + {{ if $from.ref }} + - ref: + apiVersion: {{ $from.ref.apiVersion }} + kind: {{ $from.ref.kind }} + name: {{ $from.ref.name }} + namespace: {{ $from.ref.namespace }} + {{ end }} + {{ if $from.sub }} + - sub: {{ $from.sub }} + {{ end }} + {{ end }} + + {{ if .filters }} + filters: +{{ .filters }} + {{ end }} diff --git a/vendor/modules.txt b/vendor/modules.txt index 839234bff1..177489b7c3 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1302,6 +1302,7 @@ knative.dev/eventing/test/lib/recordevents/sender knative.dev/eventing/test/lib/resources knative.dev/eventing/test/lib/sender knative.dev/eventing/test/rekt/features +knative.dev/eventing/test/rekt/features/authz knative.dev/eventing/test/rekt/features/broker knative.dev/eventing/test/rekt/features/channel knative.dev/eventing/test/rekt/features/featureflags @@ -1317,6 +1318,7 @@ knative.dev/eventing/test/rekt/resources/channel knative.dev/eventing/test/rekt/resources/channel_impl knative.dev/eventing/test/rekt/resources/containersource knative.dev/eventing/test/rekt/resources/delivery +knative.dev/eventing/test/rekt/resources/eventpolicy knative.dev/eventing/test/rekt/resources/eventtype knative.dev/eventing/test/rekt/resources/pingsource knative.dev/eventing/test/rekt/resources/subscription