Skip to content

Commit

Permalink
Backend: fix subscribe system log (#2357)
Browse files Browse the repository at this point in the history
System log was only sent to one UI when mulitple UIs of the same user were opened. 

- added unique id for each websocket connection
- replaced subscribe with token to ui websocket id

Co-authored-by: Michael Grill <[email protected]>
  • Loading branch information
michaelgrill authored Sep 12, 2023
1 parent 1002bbe commit 6a44190
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.Set;
import java.util.SortedMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;

import org.osgi.annotation.versioning.ProviderType;
Expand Down Expand Up @@ -45,15 +46,15 @@ public CompletableFuture<JsonrpcResponseSuccess> send(String edgeId, User user,
/**
* Handles a {@link SubscribeSystemLogRequest}.
*
* @param edgeId the Edge-ID
* @param user the {@link User}
* @param token the UI session token
* @param request the {@link SubscribeSystemLogRequest}
* @param edgeId the Edge-ID
* @param user the {@link User}
* @param websocketId the id of the UI websocket connection
* @param request the {@link SubscribeSystemLogRequest}
* @return a reply
* @throws OpenemsNamedException on error
*/
public CompletableFuture<JsonrpcResponseSuccess> handleSubscribeSystemLogRequest(String edgeId, User user,
String token, SubscribeSystemLogRequest request) throws OpenemsNamedException;
UUID websocketId, SubscribeSystemLogRequest request) throws OpenemsNamedException;

/**
* Gets the latest values for the given ChannelAddresses.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.openems.backend.common.uiwebsocket;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;

import org.osgi.annotation.versioning.ProviderType;
Expand All @@ -17,22 +18,22 @@ public interface UiWebsocket {
* Send a JSON-RPC Request to a UI session via WebSocket and expect a JSON-RPC
* Response.
*
* @param token the UI token
* @param websocketId the id of the UI websocket connection
* @param request the JsonrpcRequest
* @return the JSON-RPC Success Response Future
* @throws OpenemsNamedException on error
*/
public CompletableFuture<JsonrpcResponseSuccess> send(String token, JsonrpcRequest request)
public CompletableFuture<JsonrpcResponseSuccess> send(UUID websocketId, JsonrpcRequest request)
throws OpenemsNamedException;

/**
* Send a JSON-RPC Notification to a UI session.
*
* @param token the UI token
* @param websocketId the id of the UI websocket connection
* @param notification the JsonrpcNotification
* @throws OpenemsNamedException on error
*/
public void send(String token, JsonrpcNotification notification) throws OpenemsNamedException;
public void send(UUID websocketId, JsonrpcNotification notification) throws OpenemsNamedException;

/**
* Send a JSON-RPC Notification broadcast to all UI sessions with a given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -258,8 +259,8 @@ protected void logError(Logger log, String message) {

@Override
public CompletableFuture<JsonrpcResponseSuccess> handleSubscribeSystemLogRequest(String edgeId, User user,
String token, SubscribeSystemLogRequest request) throws OpenemsNamedException {
return this.systemLogHandler.handleSubscribeSystemLogRequest(edgeId, user, token, request);
UUID websocketId, SubscribeSystemLogRequest request) throws OpenemsNamedException {
return this.systemLogHandler.handleSubscribeSystemLogRequest(edgeId, user, websocketId, request);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class SystemLogHandler {
/**
* Edge-ID to Session-Token.
*/
private final ConcurrentHashMap<String, Set<String>> subscriptions = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Set<UUID>> subscriptions = new ConcurrentHashMap<>();

public SystemLogHandler(EdgeWebsocketImpl parent) {
this.parent = parent;
Expand All @@ -36,27 +36,27 @@ public SystemLogHandler(EdgeWebsocketImpl parent) {
/**
* Handles a {@link SubscribeSystemLogRequest}.
*
* @param edgeId the Edge-ID
* @param user the {@link User}
* @param token the UI session token
* @param request the {@link SubscribeSystemLogRequest}
* @param edgeId the Edge-ID
* @param user the {@link User}
* @param websocketId the id of the UI websocket connection
* @param request the {@link SubscribeSystemLogRequest}
* @return a reply
* @throws OpenemsNamedException on error
*/
public CompletableFuture<JsonrpcResponseSuccess> handleSubscribeSystemLogRequest(String edgeId, User user,
String token, SubscribeSystemLogRequest request) throws OpenemsNamedException {
UUID websocketId, SubscribeSystemLogRequest request) throws OpenemsNamedException {
if (request.isSubscribe()) {
// Add subscription
this.addToken(edgeId, token);
this.addSubscriptionId(edgeId, websocketId);

// Always forward subscribe to Edge
return this.parent.send(edgeId, user, request);

} else {
// Remove subscription
this.removeToken(edgeId, token);
this.removeSubscriptionId(edgeId, websocketId);

if (this.getTokens(edgeId) != null) {
if (this.getSubscribedWebsocketIds(edgeId) != null) {
// Remaining Tokens left for this Edge -> announce success
return CompletableFuture.completedFuture(new GenericJsonrpcResponseSuccess(request.getId()));

Expand All @@ -75,9 +75,9 @@ public CompletableFuture<JsonrpcResponseSuccess> handleSubscribeSystemLogRequest
* @param notification the {@link SystemLogNotification}
*/
public void handleSystemLogNotification(String edgeId, SystemLogNotification notification) {
var tokens = this.getTokens(edgeId);
final var ids = this.getSubscribedWebsocketIds(edgeId);

if (tokens == null) {
if (ids == null) {
// No Tokens exist, but we still receive Notification? -> send unsubscribe
try {
var dummyGuestUser = new User("internal", "UnsubscribeSystemLogNotification",
Expand All @@ -93,18 +93,20 @@ public void handleSystemLogNotification(String edgeId, SystemLogNotification not
}

// Forward Notification to each Session token
for (String token : tokens) {
for (var id : ids) {
try {
// TODO use events
this.parent.uiWebsocket.send(token, new EdgeRpcNotification(edgeId, notification));
if (this.parent.uiWebsocket != null) {
this.parent.uiWebsocket.send(id, new EdgeRpcNotification(edgeId, notification));
}

} catch (OpenemsNamedException | NullPointerException e) {
this.parent.logWarn(this.log, edgeId, "Unable to handle SystemLogNotification: " + e.getMessage());
// error -> send unsubscribe
try {
var dummyGuestUser = new User("internal", "UnsubscribeSystemLogNotification",
UUID.randomUUID().toString(), Language.EN, Role.GUEST, false);
this.handleSubscribeSystemLogRequest(edgeId, dummyGuestUser, token,
this.handleSubscribeSystemLogRequest(edgeId, dummyGuestUser, id,
SubscribeSystemLogRequest.unsubscribe());

} catch (OpenemsNamedException e1) {
Expand All @@ -117,33 +119,33 @@ public void handleSystemLogNotification(String edgeId, SystemLogNotification not
/**
* Adds a subscription Token for the given Edge-ID.
*
* @param edgeId the Edge-ID
* @param token the Token
* @param edgeId the Edge-ID
* @param websocketId the id of the UI websocket connection
*/
protected void addToken(String edgeId, String token) {
protected void addSubscriptionId(String edgeId, UUID websocketId) {
this.subscriptions.compute(edgeId, (key, tokens) -> {
if (tokens == null) {
// Create new Set for this Edge-ID
tokens = new HashSet<>();
}
tokens.add(token);
tokens.add(websocketId);
return tokens;
});
}

/**
* Removes a subscription Token from the given Edge-ID.
*
* @param edgeId the Edge-ID
* @param token the Token
* @param edgeId the Edge-ID
* @param websocketId the id of the UI websocket connection
*/
protected void removeToken(String edgeId, String token) {
protected void removeSubscriptionId(String edgeId, UUID websocketId) {
this.subscriptions.compute(edgeId, (key, tokens) -> {
if (tokens == null) {
// There was no entry for this Edge-ID
return null;
}
tokens.remove(token);
tokens.remove(websocketId);
if (tokens.isEmpty()) {
return null;
}
Expand All @@ -157,7 +159,7 @@ protected void removeToken(String edgeId, String token) {
* @param edgeId the Edge-ID
* @return a Set of Tokens; or null
*/
protected Set<String> getTokens(String edgeId) {
protected Set<UUID> getSubscribedWebsocketIds(String edgeId) {
return this.subscriptions.get(edgeId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,9 @@ private CompletableFuture<JsonrpcResponseSuccess> handleSubscribeEdgesRequest(Ws
private CompletableFuture<JsonrpcResponseSuccess> handleSubscribeSystemLogRequest(WsData wsData, String edgeId,
User user, SubscribeSystemLogRequest request) throws OpenemsNamedException {
user.assertEdgeRoleIsAtLeast(SubscribeSystemLogRequest.METHOD, edgeId, Role.OWNER);
var token = wsData.assertToken();

// Forward to Edge
return this.parent.edgeWebsocket.handleSubscribeSystemLogRequest(edgeId, user, token, request);
return this.parent.edgeWebsocket.handleSubscribeSystemLogRequest(edgeId, user, wsData.getId(), request);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -33,6 +34,7 @@
import io.openems.backend.common.uiwebsocket.UiWebsocket;
import io.openems.common.exceptions.OpenemsError;
import io.openems.common.exceptions.OpenemsError.OpenemsNamedException;
import io.openems.common.exceptions.OpenemsException;
import io.openems.common.jsonrpc.base.AbstractJsonrpcRequest;
import io.openems.common.jsonrpc.base.JsonrpcNotification;
import io.openems.common.jsonrpc.base.JsonrpcRequest;
Expand Down Expand Up @@ -132,15 +134,15 @@ protected void logError(Logger log, String message) {
}

@Override
public void send(String token, JsonrpcNotification notification) throws OpenemsNamedException {
var wsData = this.getWsDataForTokenOrError(token);
public void send(UUID websocketId, JsonrpcNotification notification) throws OpenemsNamedException {
var wsData = this.getWsDataForIdOrError(websocketId);
wsData.send(notification);
}

@Override
public CompletableFuture<JsonrpcResponseSuccess> send(String token, JsonrpcRequest request)
public CompletableFuture<JsonrpcResponseSuccess> send(UUID websocketId, JsonrpcRequest request)
throws OpenemsNamedException {
var wsData = this.getWsDataForTokenOrError(token);
var wsData = this.getWsDataForIdOrError(websocketId);
return wsData.send(request);
}

Expand All @@ -166,20 +168,22 @@ public void sendBroadcast(String edgeId, JsonrpcNotification notification) throw
/**
* Gets the WebSocket connection attachment for a UI token.
*
* @param token the UI token
* @param websocketId the id of the websocket connection
* @return the WsData
* @throws OpenemsNamedException if there is no connection with this token
*/
private WsData getWsDataForTokenOrError(String token) throws OpenemsNamedException {
private WsData getWsDataForIdOrError(UUID websocketId) throws OpenemsNamedException {
if (this.server == null) {
throw new OpenemsException("Server is not yet fully initialized");
}
var connections = this.server.getConnections();
for (var websocket : connections) {
WsData wsData = websocket.getAttachment();
var thisToken = wsData.getToken();
if (thisToken.isPresent() && thisToken.get().equals(token)) {
if (wsData.getId().equals(websocketId)) {
return wsData;
}
}
throw OpenemsError.BACKEND_NO_UI_WITH_TOKEN.exception(token);
throw OpenemsError.BACKEND_NO_UI_WITH_TOKEN.exception(websocketId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.UUID;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -69,6 +70,8 @@ protected void dispose() {

private final Logger log = LoggerFactory.getLogger(WsData.class);

private final UUID id = UUID.randomUUID();

private final WebsocketServer parent;
private final SubscribedChannels subscribedChannels = new SubscribedChannels();
private Optional<String> userId = Optional.empty();
Expand Down Expand Up @@ -225,4 +228,8 @@ public boolean isEdgeSubscribed(String edgeId) {
return this.subscribedEdges.contains(edgeId);
}

public UUID getId() {
return this.id;
}

}

0 comments on commit 6a44190

Please sign in to comment.