Skip to content

Commit

Permalink
feat: use namespaced indexers for eventtypes in receiver (#4031)
Browse files Browse the repository at this point in the history
* feat: use namespaced indexers for eventtypes in receiver

Signed-off-by: Calum Murray <[email protected]>

* fix: abstract data plane tests compile

Signed-off-by: Calum Murray <[email protected]>

---------

Signed-off-by: Calum Murray <[email protected]>
  • Loading branch information
Cali0707 authored Aug 7, 2024
1 parent 6132a34 commit 135c294
Show file tree
Hide file tree
Showing 16 changed files with 160 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@

import dev.knative.eventing.kafka.broker.contract.DataPlaneContract;
import io.cloudevents.CloudEvent;
import io.fabric8.kubernetes.client.informers.cache.Lister;
import io.vertx.core.Future;

/**
* This interface provides an abstraction for creating {@link EventType} instances from {@link CloudEvent} instances.
*/
@FunctionalInterface
public interface EventTypeCreator {
Future<EventType> create(CloudEvent event, DataPlaneContract.Reference reference);
Future<EventType> create(
CloudEvent event, Lister<EventType> eventTypeLister, DataPlaneContract.Reference reference);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.informers.cache.Lister;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.WorkerExecutor;
Expand All @@ -35,19 +36,15 @@ public class EventTypeCreatorImpl implements EventTypeCreator {

private final MixedOperation<EventType, KubernetesResourceList<EventType>, Resource<EventType>> eventTypeClient;

private final EventTypeListerFactory eventTypeListerFactory;

private MessageDigest messageDigest;

private final WorkerExecutor executor;

public EventTypeCreatorImpl(
MixedOperation<EventType, KubernetesResourceList<EventType>, Resource<EventType>> eventTypeClient,
EventTypeListerFactory eventTypeListerFactory,
Vertx vertx)
throws IllegalArgumentException, NoSuchAlgorithmException {
this.eventTypeClient = eventTypeClient;
this.eventTypeListerFactory = eventTypeListerFactory;
this.executor = vertx.createSharedWorkerExecutor("et-creator-worker", 1);
this.messageDigest = MessageDigest.getInstance("MD5");
}
Expand All @@ -64,17 +61,12 @@ private String getName(CloudEvent event, DataPlaneContract.Reference reference)
return name;
}

private EventType eventTypeExists(String etName, DataPlaneContract.Reference reference) {
return this.eventTypeListerFactory
.getForNamespace(reference.getNamespace())
.get(etName);
}

@Override
public Future<EventType> create(CloudEvent event, DataPlaneContract.Reference ownerReference) {
public Future<EventType> create(
CloudEvent event, Lister<EventType> eventTypeLister, DataPlaneContract.Reference ownerReference) {
return this.executor.executeBlocking(() -> {
final var name = this.getName(event, ownerReference);
final var eventType = this.eventTypeExists(name, ownerReference);
final var eventType = eventTypeLister.get(name);
if (eventType != null) {
return eventType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,27 @@

package dev.knative.eventing.kafka.broker.core.eventtype;

import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Lister;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

public class EventTypeListerFactory {
public class EventTypeListerFactory implements AutoCloseable {
private static final long INFORMER_RESYNC_MS = 30 * 1000;
private final Map<String, Lister<EventType>> listerMap;
private final SharedIndexInformer<EventType> eventTypeInformer;
private final List<SharedIndexInformer<EventType>> informers;
private final MixedOperation<EventType, KubernetesResourceList<EventType>, Resource<EventType>> eventTypeClient;

public EventTypeListerFactory(SharedIndexInformer<EventType> eventTypeInformer) {
if (eventTypeInformer == null) {
throw new IllegalArgumentException("you must provide a non null eventtype informer");
}
this.eventTypeInformer = eventTypeInformer;
public EventTypeListerFactory(
MixedOperation<EventType, KubernetesResourceList<EventType>, Resource<EventType>> eventTypeClient) {
this.eventTypeClient = eventTypeClient;
this.listerMap = new HashMap<>();
this.informers = new LinkedList<>();
}

public Lister<EventType> getForNamespace(String namespace) {
Expand All @@ -41,8 +47,16 @@ public Lister<EventType> getForNamespace(String namespace) {
}

private Lister<EventType> createListerForNamespace(String namespace) {
final var lister = new Lister<>(this.eventTypeInformer.getIndexer(), namespace);
final var informer = this.eventTypeClient.inNamespace(namespace).runnableInformer(INFORMER_RESYNC_MS);
informer.start();
this.informers.add(informer);
final var lister = new Lister<>(informer.getIndexer(), namespace);
this.listerMap.put(namespace, lister);
return lister;
}

@Override
public void close() {
this.informers.forEach(SharedIndexInformer::close);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public class EventTypeCreatorImplTest {
@Test
public void testCreate(Vertx vertx, VertxTestContext vertxTestContext) throws NoSuchAlgorithmException {
final var eventTypeClient = kubernetesClient.resources(EventType.class);
final var informer = kubernetesClient.informers().sharedIndexInformerFor(EventType.class, 100L);
var eventTypeCreator = new EventTypeCreatorImpl(eventTypeClient, new EventTypeListerFactory(informer), vertx);
final var listerFactory = new EventTypeListerFactory(eventTypeClient);
var eventTypeCreator = new EventTypeCreatorImpl(eventTypeClient, vertx);
var event = new CloudEventBuilder()
.withType("example.event.type")
.withSource(URI.create("/example/source"))
Expand All @@ -61,16 +61,16 @@ public void testCreate(Vertx vertx, VertxTestContext vertxTestContext) throws No
.setUuid("12345")
.build();
eventTypeCreator
.create(event, reference)
.create(event, listerFactory.getForNamespace("default"), reference)
.onFailure((exception) -> {
informer.close();
listerFactory.close();
vertxTestContext.failNow(exception);
})
.onSuccess((et -> {
KubernetesResourceList<EventType> eventTypeList =
eventTypeClient.inNamespace("default").list();

informer.close();
listerFactory.close();
Assertions.assertNotNull(eventTypeList);
Assertions.assertEquals(1, eventTypeList.getItems().size());
var eventType = eventTypeList.getItems().get(0);
Expand All @@ -96,9 +96,9 @@ public void testCreate(Vertx vertx, VertxTestContext vertxTestContext) throws No
@Test
public void testCreatesOnlyOnce(Vertx vertx, VertxTestContext vertxTestContext) throws NoSuchAlgorithmException {
final var eventTypeClient = kubernetesClient.resources(EventType.class);
final var informer = kubernetesClient.informers().sharedIndexInformerFor(EventType.class, 100L);
informer.run();
var eventTypeCreator = new EventTypeCreatorImpl(eventTypeClient, new EventTypeListerFactory(informer), vertx);
final var listerFactory = new EventTypeListerFactory(eventTypeClient);

var eventTypeCreator = new EventTypeCreatorImpl(eventTypeClient, vertx);
var event = new CloudEventBuilder()
.withType("example.event.type")
.withSource(URI.create("/example/source"))
Expand All @@ -113,9 +113,9 @@ public void testCreatesOnlyOnce(Vertx vertx, VertxTestContext vertxTestContext)
.setUuid("12345")
.build();
eventTypeCreator
.create(event, reference)
.create(event, listerFactory.getForNamespace("default"), reference)
.onFailure((exception) -> {
informer.close();
listerFactory.close();
vertxTestContext.failNow(exception);
})
.onSuccess((et -> {
Expand Down Expand Up @@ -145,20 +145,21 @@ public void testCreatesOnlyOnce(Vertx vertx, VertxTestContext vertxTestContext)
// make sure that the informer has time to resync, the webhook doens't seem to run in the tests
Thread.sleep(2000);
} catch (InterruptedException e) {
listerFactory.close();
vertxTestContext.failNow(e);
}
return eventTypeCreator
.create(event, reference)
.create(event, listerFactory.getForNamespace("default"), reference)
.onFailure((exception) -> {
logger.warn("failure occurred, closing informer", exception);
informer.close();
listerFactory.close();
vertxTestContext.failNow(exception);
})
.onSuccess((et -> {
KubernetesResourceList<EventType> eventTypeList =
eventTypeClient.inNamespace("default").list();

informer.close();
listerFactory.close();

Assertions.assertNotNull(eventTypeList);
Assertions.assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

import dev.knative.eventing.kafka.broker.contract.DataPlaneContract;
import dev.knative.eventing.kafka.broker.core.ReactiveKafkaProducer;
import dev.knative.eventing.kafka.broker.core.eventtype.EventType;
import io.cloudevents.CloudEvent;
import io.fabric8.kubernetes.client.informers.cache.Lister;
import io.vertx.core.Future;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
Expand Down Expand Up @@ -58,6 +60,11 @@ default boolean isEventTypeAutocreateEnabled() {
return false;
}

/**
* @return the Lister for eventtypes in the correct namespace for this producer
*/
Lister<EventType> getEventTypeLister();

/**
* @return the OIDC audience for the ingress.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import dev.knative.eventing.kafka.broker.contract.DataPlaneContract;
import dev.knative.eventing.kafka.broker.core.AsyncCloseable;
import dev.knative.eventing.kafka.broker.core.ReactiveKafkaProducer;
import dev.knative.eventing.kafka.broker.core.eventtype.EventType;
import dev.knative.eventing.kafka.broker.core.eventtype.EventTypeListerFactory;
import dev.knative.eventing.kafka.broker.core.metrics.Metrics;
import dev.knative.eventing.kafka.broker.core.reconciler.IngressReconcilerListener;
import dev.knative.eventing.kafka.broker.core.security.AuthProvider;
Expand All @@ -29,6 +31,7 @@
import io.cloudevents.core.message.Encoding;
import io.cloudevents.jackson.JsonFormat;
import io.cloudevents.kafka.CloudEventSerializer;
import io.fabric8.kubernetes.client.informers.cache.Lister;
import io.vertx.core.Future;
import java.util.Map;
import java.util.Objects;
Expand All @@ -50,6 +53,7 @@ public class IngressProducerReconcilableStore implements IngressReconcilerListen
private final Properties producerConfigs;
private final Function<Properties, ReactiveKafkaProducer<String, CloudEvent>> producerFactory;
private final AuthProvider authProvider;
private final EventTypeListerFactory eventTypeListerFactory;

// ingress uuid -> IngressInfo
// This map is used to resolve the ingress info in the reconciler listener
Expand All @@ -67,14 +71,16 @@ public class IngressProducerReconcilableStore implements IngressReconcilerListen
public IngressProducerReconcilableStore(
final AuthProvider authProvider,
final Properties producerConfigs,
final Function<Properties, ReactiveKafkaProducer<String, CloudEvent>> producerFactory) {
final Function<Properties, ReactiveKafkaProducer<String, CloudEvent>> producerFactory,
EventTypeListerFactory eventTypeListerFactory) {

Objects.requireNonNull(producerConfigs, "provide producerConfigs");
Objects.requireNonNull(producerFactory, "provide producerCreator");

this.authProvider = authProvider;
this.producerConfigs = producerConfigs;
this.producerFactory = producerFactory;
this.eventTypeListerFactory = eventTypeListerFactory;

this.ingressInfos = new ConcurrentHashMap<>();
this.producerReferences = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -159,7 +165,9 @@ private Future<Void> onNewIngress(
ingress.getPath(),
ingress.getHost(),
producerProps,
ingress.getEnableAutoCreateEventTypes());
ingress.getEnableAutoCreateEventTypes(),
this.eventTypeListerFactory.getForNamespace(
resource.getReference().getNamespace()));

if (isRootPath(ingress.getPath()) && Strings.isNullOrEmpty(ingress.getHost())) {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -270,14 +278,16 @@ private static class IngressProducerImpl implements IngressProducer {
private final String audience;

private final boolean eventTypeAutocreateEnabled;
private final Lister<EventType> eventTypeLister;

IngressProducerImpl(
final ReactiveKafkaProducer<String, CloudEvent> producer,
final DataPlaneContract.Resource resource,
final String path,
final String host,
final Properties producerProperties,
final boolean eventTypeAutocreateEnabled) {
final boolean eventTypeAutocreateEnabled,
Lister<EventType> eventTypeLister) {
this.producer = producer;
this.topic = resource.getTopics(0);
this.reference = resource.getReference();
Expand All @@ -286,6 +296,7 @@ private static class IngressProducerImpl implements IngressProducer {
this.host = host;
this.producerProperties = producerProperties;
this.eventTypeAutocreateEnabled = eventTypeAutocreateEnabled;
this.eventTypeLister = eventTypeLister;
}

@Override
Expand Down Expand Up @@ -320,6 +331,11 @@ Properties getProducerProperties() {
return producerProperties;
}

@Override
public Lister<EventType> getEventTypeLister() {
return this.eventTypeLister;
}

@Override
public boolean isEventTypeAutocreateEnabled() {
return eventTypeAutocreateEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,10 @@ public void handle(final RequestContext requestContext, final IngressProducer pr
.compose((recordMetadata) -> {
if (producer.isEventTypeAutocreateEnabled()) {
return this.eventTypeCreator
.create(record.value(), producer.getReference())
.create(
record.value(),
producer.getEventTypeLister(),
producer.getReference())
.compose(
et -> {
logger.debug("successfully created eventtype {}", et);
Expand Down
Loading

0 comments on commit 135c294

Please sign in to comment.