Skip to content

Commit

Permalink
feat: add sample to primary to secondary missing issue (#1847)
Browse files Browse the repository at this point in the history
Co-authored-by: Sébastien CROCQUESEL <[email protected]>
  • Loading branch information
csviri and scrocquesel authored Apr 5, 2023
1 parent eaddc24 commit d76a47c
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ void readsSecondaryInManyToOneCases() throws InterruptedException {
.isEqualTo(1));
}

Job job() {
public static Job job() {
var job = new Job();
job.setMetadata(new ObjectMetaBuilder()
.withName("job1")
Expand All @@ -48,7 +48,7 @@ Job job() {
return job;
}

Cluster cluster() {
public static Cluster cluster() {
Cluster cluster = new Cluster();
cluster.setMetadata(new ObjectMetaBuilder()
.withName(CLUSTER_NAME)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package io.javaoperatorsdk.operator;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
import io.javaoperatorsdk.operator.sample.primarytosecondary.Cluster;
import io.javaoperatorsdk.operator.sample.primarytosecondary.JobReconciler;

import static io.javaoperatorsdk.operator.PrimaryToSecondaryIT.cluster;
import static io.javaoperatorsdk.operator.PrimaryToSecondaryIT.job;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

/**
* The intention with this IT is to show the use cases why the PrimaryToSecondary Mapper is needed,
* and the situation when it is not working.
*/
class PrimaryToSecondaryMissingIT {

@RegisterExtension
LocallyRunOperatorExtension operator =
LocallyRunOperatorExtension.builder()
.withAdditionalCustomResourceDefinition(Cluster.class)
.withReconciler(new JobReconciler(false))
.build();

@Test
void missingPrimaryToSecondaryCausesIssueAccessingSecondary() throws InterruptedException {
var reconciler = operator.getReconcilerOfType(JobReconciler.class);
operator.create(cluster());
Thread.sleep(300);
operator.create(job());

await().untilAsserted(() -> {
assertThat(reconciler.isErrorOccurred()).isTrue();
assertThat(reconciler.getNumberOfExecutions()).isZero();
});
}

@Test
void accessingDirectlyTheCacheWorksWithoutPToSMapper() throws InterruptedException {
var reconciler = operator.getReconcilerOfType(JobReconciler.class);
reconciler.setGetResourceDirectlyFromCache(true);
operator.create(cluster());
Thread.sleep(300);
operator.create(job());

await().untilAsserted(() -> {
assertThat(reconciler.isErrorOccurred()).isFalse();
assertThat(reconciler.getNumberOfExecutions()).isPositive();
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,49 @@
import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper;
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;

/**
* This reconciler used in integration tests to show the cases when PrimaryToSecondaryMapper is
* needed, and to show the use cases when some mechanisms would not work without that. It's not
* intended to be a reusable code as it is, rather serves for deeper understanding of the problem.
*/
@ControllerConfiguration()
public class JobReconciler
implements Reconciler<Job>, EventSourceInitializer<Job> {
implements Reconciler<Job>, EventSourceInitializer<Job>, ErrorStatusHandler<Job> {

private static final String JOB_CLUSTER_INDEX = "job-cluster-index";

private final AtomicInteger numberOfExecutions = new AtomicInteger(0);

private final boolean addPrimaryToSecondaryMapper;
private boolean getResourceDirectlyFromCache = false;
private volatile boolean errorOccurred;

public JobReconciler() {
this(true);
}

public JobReconciler(boolean addPrimaryToSecondaryMapper) {
this.addPrimaryToSecondaryMapper = addPrimaryToSecondaryMapper;
}

@Override
public UpdateControl<Job> reconcile(
Job resource, Context<Job> context) {

context.getSecondaryResource(Cluster.class)
.orElseThrow(() -> new IllegalStateException("Secondary resource should be present"));
if (!getResourceDirectlyFromCache) {
// this is only possible when there is primary to secondary mapper
context.getSecondaryResource(Cluster.class)
.orElseThrow(() -> new IllegalStateException("Secondary resource should be present"));
} else {
// reading the resource from cache as alternative, works without primary to secondary mapper
var informerEventSource = (InformerEventSource<Cluster, Job>) context.eventSourceRetriever()
.getResourceEventSourceFor(Cluster.class);
informerEventSource
.get(new ResourceID(resource.getSpec().getClusterName(),
resource.getMetadata().getNamespace()))
.orElseThrow(
() -> new IllegalStateException("Secondary resource cannot be read from cache"));
}
numberOfExecutions.addAndGet(1);
return UpdateControl.noUpdate();
}
Expand All @@ -36,20 +65,22 @@ public Map<String, EventSource> prepareEventSources(EventSourceContext<Job> cont
context.getPrimaryCache().addIndexer(JOB_CLUSTER_INDEX, (job -> List
.of(indexKey(job.getSpec().getClusterName(), job.getMetadata().getNamespace()))));

InformerConfiguration<Cluster> informerConfiguration =
InformerConfiguration.InformerConfigurationBuilder<Cluster> informerConfiguration =
InformerConfiguration.from(Cluster.class, context)
.withSecondaryToPrimaryMapper(cluster -> context.getPrimaryCache()
.byIndex(JOB_CLUSTER_INDEX, indexKey(cluster.getMetadata().getName(),
cluster.getMetadata().getNamespace()))
.stream().map(ResourceID::fromResource).collect(Collectors.toSet()))
.withPrimaryToSecondaryMapper(
(PrimaryToSecondaryMapper<Job>) primary -> Set.of(new ResourceID(
primary.getSpec().getClusterName(), primary.getMetadata().getNamespace())))
.withNamespacesInheritedFromController(context)
.build();
.withNamespacesInheritedFromController(context);

if (addPrimaryToSecondaryMapper) {
informerConfiguration = informerConfiguration.withPrimaryToSecondaryMapper(
(PrimaryToSecondaryMapper<Job>) primary -> Set.of(new ResourceID(
primary.getSpec().getClusterName(), primary.getMetadata().getNamespace())));
}

return EventSourceInitializer
.nameEventSources(new InformerEventSource<>(informerConfiguration, context));
.nameEventSources(new InformerEventSource<>(informerConfiguration.build(), context));
}

private String indexKey(String clusterName, String namespace) {
Expand All @@ -59,4 +90,20 @@ private String indexKey(String clusterName, String namespace) {
public int getNumberOfExecutions() {
return numberOfExecutions.get();
}

@Override
public ErrorStatusUpdateControl<Job> updateErrorStatus(Job resource, Context<Job> context,
Exception e) {
errorOccurred = true;
return ErrorStatusUpdateControl.noStatusUpdate();
}

public boolean isErrorOccurred() {
return errorOccurred;
}

public JobReconciler setGetResourceDirectlyFromCache(boolean getResourceDirectlyFromCache) {
this.getResourceDirectlyFromCache = getResourceDirectlyFromCache;
return this;
}
}

0 comments on commit d76a47c

Please sign in to comment.