Skip to content

Commit

Permalink
Backend Alerting: improve tests using Clock (#2432)
Browse files Browse the repository at this point in the history
* Update Alerting to use Timer and Clock to improve testing
* Alerting implement DebugLoggable

---------

Co-authored-by: Kai J <[email protected]>
  • Loading branch information
da-Kai and Kai J authored Nov 28, 2023
1 parent d72cdab commit 8700a6f
Show file tree
Hide file tree
Showing 15 changed files with 1,112 additions and 323 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package io.openems.backend.alerting;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
Expand All @@ -17,10 +20,12 @@
import org.slf4j.LoggerFactory;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.JsonElement;

import io.openems.backend.alerting.handler.OfflineEdgeHandler;
import io.openems.backend.alerting.scheduler.Scheduler;
import io.openems.backend.common.component.AbstractOpenemsBackendComponent;
import io.openems.backend.common.debugcycle.DebugLoggable;
import io.openems.backend.common.metadata.Edge;
import io.openems.backend.common.metadata.Mailer;
import io.openems.backend.common.metadata.Metadata;
Expand All @@ -36,12 +41,18 @@
Edge.Events.ON_SET_ONLINE, //
Metadata.Events.AFTER_IS_INITIALIZED //
})
public class Alerting extends AbstractOpenemsBackendComponent implements EventHandler {
public class Alerting extends AbstractOpenemsBackendComponent implements EventHandler, DebugLoggable {

// Maximum number of messages constructed at the same time
private static final byte THREAD_POOL_SIZE = 2;
// Queue size from which warnings are issued
private static final byte THREAD_QUEUE_WARNING_THRESHOLD = 50;

protected final Scheduler scheduler;
private static final ThreadPoolExecutor createDefaultExecutorService() {
final var threadFactory = new ThreadFactoryBuilder()
.setNameFormat(Alerting.class.getSimpleName() + ".EventHandler-%d").build();
return (ThreadPoolExecutor) Executors.newFixedThreadPool(THREAD_POOL_SIZE, threadFactory);
}

private final Logger log = LoggerFactory.getLogger(Alerting.class);
private final ThreadPoolExecutor executor;
Expand All @@ -52,52 +63,66 @@ public class Alerting extends AbstractOpenemsBackendComponent implements EventHa
@Reference
protected Mailer mailer;

protected Handler<?>[] handlers = {};
private final Scheduler scheduler;

protected Alerting(Scheduler scheduler) {
super("Alerting");
protected final List<Handler<?>> handler = new ArrayList<>(1);

protected Alerting(Scheduler scheduler, ThreadPoolExecutor executor) {
super("Alerting");
this.executor = executor;
this.scheduler = scheduler;
this.executor = new ThreadPoolExecutor(0, THREAD_POOL_SIZE, 1, TimeUnit.HOURS, new LinkedBlockingQueue<>(), //
new ThreadFactoryBuilder().setNameFormat(Alerting.class.getSimpleName() + ".EventHandler-%d").build());
}

public Alerting() {
this(new Scheduler());
this(new Scheduler(), Alerting.createDefaultExecutorService());
}

@Activate
protected void activate(Config config) {
this.logInfo(this.log, "Activate");
this.scheduler.start();

this.handlers = new Handler[] {
new OfflineEdgeHandler(this.scheduler, this.mailer, this.metadata, config.initialDelay()) };
var handler = new OfflineEdgeHandler(this.scheduler, this.scheduler, this.mailer, this.metadata, //
config.initialDelay());
this.handler.add(handler);
}

@Deactivate
protected void deactivate() {
this.logInfo(this.log, "Deactivate");

for (var handler : this.handlers) {
handler.stop();
}
this.handlers = new Handler<?>[0];
this.handler.forEach(Handler::stop);
this.handler.clear();
this.scheduler.stop();
}

@Override
public void handleEvent(Event event) {
var reader = new EventReader(event);
for (var handler : this.handlers) {
var task = handler.getEventHandler(reader);
for (var h : this.handler) {
var task = h.getEventHandler(reader.getTopic());
if (task != null) {
this.executor.execute(task);
this.execute(task, reader);
}
}
}

private void execute(Consumer<EventReader> consumer, EventReader reader) {
this.executor.execute(() -> consumer.accept(reader));
}

@Override
public String debugLog() {
int queueSize = this.executor.getQueue().size();
if (queueSize > 0 && queueSize % THREAD_QUEUE_WARNING_THRESHOLD == 0) {
this.logWarn(this.log, queueSize + " tasks in the EventHandlerQueue!");
if (queueSize >= THREAD_QUEUE_WARNING_THRESHOLD) {
return "%d tasks in the EventHandlerQueue!".formatted(queueSize);
} else {
return null;
}
}

@Override
public Map<String, JsonElement> debugMetrics() {
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.time.ZonedDateTime;
import java.util.List;
import java.util.function.Consumer;

import io.openems.common.event.EventReader;

Expand Down Expand Up @@ -29,9 +30,9 @@ public interface Handler<T extends Message> {

/**
* Handle given event.
*
* @param event to handle
* @return Runnable to be scheduled in executor
*
* @param eventTopic to handle
* @return {@link Consumer} to be scheduled in executor
*/
public Runnable getEventHandler(EventReader event);
public Consumer<EventReader> getEventHandler(String eventTopic);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
public abstract class Message implements Comparable<Message> {
private final String id;

public Message(String messageId) {
this.id = messageId;
protected Message(String id) {
this.id = id;
}

/**
Expand All @@ -24,10 +24,6 @@ public String getId() {
return this.id;
}

public boolean isValid() {
return this.id != null && this.getNotifyStamp() != null;
}

/**
* Returns the time stamp at which this message is supposed to be sent.
*
Expand All @@ -44,19 +40,18 @@ public boolean isValid() {

@Override
public int hashCode() {
return Objects.hash(this.id);
return Objects.hash(this.getId());
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || this.getClass() != obj.getClass()) {
} else if (obj instanceof Message other) {
return Objects.equals(this.getId(), other.getId());
} else {
return false;
}
var other = (Message) obj;
return Objects.equals(this.id, other.id);
}

@Override
Expand Down
Loading

0 comments on commit 8700a6f

Please sign in to comment.