Skip to content

Commit

Permalink
Support for EventPolicy filters (#4086)
Browse files Browse the repository at this point in the history
* Move filters from dispatcher to core module

* Move contract filter converter from ConsumerVerticleBuilder to filter package

* Update benchmark module to reflect filter package movements

* Run update-codegen

* AuthenticationHandler -> AuthHandler

* Move OIDC discovery from Receiver to TokenVerifierImpl

* Rework tokenVerifier AuthZ to read the cloudevent only once from the request when policies claims matched

* Rename TokenVerifier to AuthVerifier

* Provision EventPolicy filters in contract

* Add e2e test

* Run hack/update-codegen.sh

* Fix: Always pass cloudevent (can be null) to ingresshandler
  • Loading branch information
creydr authored Aug 28, 2024
1 parent 268a574 commit 241e6a7
Show file tree
Hide file tree
Showing 55 changed files with 977 additions and 463 deletions.
4 changes: 4 additions & 0 deletions control-plane/pkg/core/config/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ func EventPoliciesFromAppliedEventPoliciesStatus(status duck.AppliedEventPolicie
}
}

for _, filter := range policy.Spec.Filters {
contractPolicy.Filters = append(contractPolicy.Filters, contract.FromSubscriptionFilter(filter))
}

eventPolicies = append(eventPolicies, contractPolicy)
}

Expand Down
76 changes: 76 additions & 0 deletions control-plane/pkg/core/config/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"testing"
"time"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"

"google.golang.org/protobuf/encoding/protojson"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/apis/feature"
Expand Down Expand Up @@ -618,6 +620,80 @@ func TestEventPoliciesFromAppliedEventPoliciesStatus(t *testing.T) {
},
},
},
}, {
name: "Multiple policies with filters",
applyingPolicies: []string{
"policy-1",
"policy-2",
},
existingEventPolicies: []*eventingv1alpha1.EventPolicy{
{
ObjectMeta: metav1.ObjectMeta{
Name: "policy-1",
Namespace: "my-ns",
},
Spec: eventingv1alpha1.EventPolicySpec{
Filters: []eventingv1.SubscriptionsAPIFilter{
{
CESQL: "true",
},
},
},
Status: eventingv1alpha1.EventPolicyStatus{
From: []string{
"from-1",
},
},
}, {
ObjectMeta: metav1.ObjectMeta{
Name: "policy-2",
Namespace: "my-ns",
},
Spec: eventingv1alpha1.EventPolicySpec{
Filters: []eventingv1.SubscriptionsAPIFilter{
{
CESQL: "false",
},
},
},
Status: eventingv1alpha1.EventPolicyStatus{
From: []string{
"from-2-*",
},
},
},
},
namespace: "my-ns",
defaultAuthorizationMode: feature.AuthorizationDenyAll,
expected: []*contract.EventPolicy{
{
TokenMatchers: []*contract.TokenMatcher{
exactTokenMatcher("from-1"),
},
Filters: []*contract.DialectedFilter{
{
Filter: &contract.DialectedFilter_Cesql{
Cesql: &contract.CESQL{
Expression: "true",
},
},
},
},
}, {
TokenMatchers: []*contract.TokenMatcher{
prefixTokenMatcher("from-2-"),
},
Filters: []*contract.DialectedFilter{
{
Filter: &contract.DialectedFilter_Cesql{
Cesql: &contract.CESQL{
Expression: "false",
},
},
},
},
},
},
}, {
name: "No applying policies - allow-same-namespace default mode",
applyingPolicies: []string{},
Expand Down
6 changes: 6 additions & 0 deletions data-plane/benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@
<artifactId>dispatcher</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>dev.knative.eventing.kafka.broker</groupId>
<artifactId>core</artifactId>
<version>${project.version}</version>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@
* limitations under the License.
*/

package dev.knative.eventing.kafka.broker.dispatcher.impl.filter;
package dev.knative.eventing.kafka.broker.core.filter;

import dev.knative.eventing.kafka.broker.dispatcher.Filter;
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.AllFilter;
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.ExactFilter;
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.PrefixFilter;
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.SuffixFilter;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.AllFilter;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.ExactFilter;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.PrefixFilter;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.SuffixFilter;
import io.cloudevents.CloudEvent;
import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
* limitations under the License.
*/

package dev.knative.eventing.kafka.broker.dispatcher.impl.filter;
package dev.knative.eventing.kafka.broker.core.filter;

import dev.knative.eventing.kafka.broker.dispatcher.Filter;
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.*;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.AnyFilter;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.ExactFilter;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.PrefixFilter;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.SuffixFilter;
import io.cloudevents.CloudEvent;
import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
* limitations under the License.
*/

package dev.knative.eventing.kafka.broker.dispatcher.impl.filter;
package dev.knative.eventing.kafka.broker.core.filter;

import dev.knative.eventing.kafka.broker.dispatcher.Filter;
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.ExactFilter;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.ExactFilter;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.core.v1.CloudEventV1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@
* limitations under the License.
*/

package dev.knative.eventing.kafka.broker.dispatcher.impl.filter;
package dev.knative.eventing.kafka.broker.core.filter;

import dev.knative.eventing.kafka.broker.dispatcher.Filter;
import io.cloudevents.CloudEvent;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@
* limitations under the License.
*/

package dev.knative.eventing.kafka.broker.dispatcher.impl.filter;
package dev.knative.eventing.kafka.broker.core.filter;

import dev.knative.eventing.kafka.broker.dispatcher.Filter;
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.ExactFilter;
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.NotFilter;
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.PrefixFilter;
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.SuffixFilter;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.ExactFilter;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.NotFilter;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.PrefixFilter;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.SuffixFilter;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.v1.CloudEventV1;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
* limitations under the License.
*/

package dev.knative.eventing.kafka.broker.dispatcher.impl.filter;
package dev.knative.eventing.kafka.broker.core.filter;

import dev.knative.eventing.kafka.broker.dispatcher.Filter;
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.PrefixFilter;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.PrefixFilter;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.v1.CloudEventV1;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package dev.knative.eventing.kafka.broker.dispatcher.impl.filter;
package dev.knative.eventing.kafka.broker.core.filter;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
* limitations under the License.
*/

package dev.knative.eventing.kafka.broker.dispatcher.impl.filter;
package dev.knative.eventing.kafka.broker.core.filter;

import dev.knative.eventing.kafka.broker.dispatcher.Filter;
import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi.SuffixFilter;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.SuffixFilter;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.v1.CloudEventV1;
import java.util.Map;
Expand Down
4 changes: 4 additions & 0 deletions data-plane/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@
<artifactId>vertx-opentelemetry</artifactId>
</dependency>

<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-sql</artifactId>
</dependency>
<dependency>
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-kafka</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter;
package dev.knative.eventing.kafka.broker.core.filter;

import static java.time.format.DateTimeFormatter.ISO_INSTANT;

import dev.knative.eventing.kafka.broker.dispatcher.Filter;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.v03.CloudEventV03;
import io.cloudevents.core.v1.CloudEventV1;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright © 2018 Knative Authors ([email protected])
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.core.filter;

import dev.knative.eventing.kafka.broker.contract.DataPlaneContract;
import dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi.*;
import io.cloudevents.CloudEvent;
import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
* This interface provides an abstraction for filtering {@link CloudEvent} instances.
*/
@FunctionalInterface
public interface Filter extends Predicate<CloudEvent> {

/**
* @return noop implementation that always returns true
*/
static Filter noop() {
return ce -> true;
}

static Filter fromContract(DataPlaneContract.DialectedFilter filter) {
return switch (filter.getFilterCase()) {
case EXACT -> new ExactFilter(filter.getExact().getAttributesMap());
case PREFIX -> new PrefixFilter(filter.getPrefix().getAttributesMap());
case SUFFIX -> new SuffixFilter(filter.getSuffix().getAttributesMap());
case NOT -> new NotFilter(fromContract(filter.getNot().getFilter()));
case ANY -> new AnyFilter(filter.getAny().getFiltersList().stream()
.map(Filter::fromContract)
.collect(Collectors.toList()));
case ALL -> new AllFilter(filter.getAll().getFiltersList().stream()
.map(Filter::fromContract)
.collect(Collectors.toList()));
case CESQL -> new CeSqlFilter(filter.getCesql().getExpression());
default -> Filter.noop();
};
}

static Filter fromContract(List<DataPlaneContract.DialectedFilter> filters) {
return new AllFilter(filters.stream().map(Filter::fromContract).collect(Collectors.toList()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi;
package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi;

import dev.knative.eventing.kafka.broker.dispatcher.Filter;
import dev.knative.eventing.kafka.broker.core.filter.Filter;
import io.cloudevents.CloudEvent;
import java.util.List;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi;
package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi;

import dev.knative.eventing.kafka.broker.dispatcher.Filter;
import dev.knative.eventing.kafka.broker.core.filter.Filter;
import io.cloudevents.CloudEvent;
import java.util.List;
import org.slf4j.Logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi;
package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi;

import dev.knative.eventing.kafka.broker.dispatcher.Filter;
import dev.knative.eventing.kafka.broker.core.filter.Filter;
import io.cloudevents.CloudEvent;
import io.cloudevents.sql.EvaluationContext;
import io.cloudevents.sql.EvaluationException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi;
package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi;

import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.AttributesFilter;
import dev.knative.eventing.kafka.broker.core.filter.AttributesFilter;
import java.util.Map;

public class ExactFilter extends AttributesFilter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi;
package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi;

import dev.knative.eventing.kafka.broker.dispatcher.Filter;
import dev.knative.eventing.kafka.broker.core.filter.Filter;
import io.cloudevents.CloudEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi;
package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi;

import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.AttributesFilter;
import dev.knative.eventing.kafka.broker.core.filter.AttributesFilter;
import java.util.Map;

public class PrefixFilter extends AttributesFilter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.knative.eventing.kafka.broker.dispatcher.impl.filter.subscriptionsapi;
package dev.knative.eventing.kafka.broker.core.filter.subscriptionsapi;

import dev.knative.eventing.kafka.broker.dispatcher.impl.filter.AttributesFilter;
import dev.knative.eventing.kafka.broker.core.filter.AttributesFilter;
import java.util.Map;

public class SuffixFilter extends AttributesFilter {
Expand Down
Loading

0 comments on commit 241e6a7

Please sign in to comment.