Skip to content

Commit

Permalink
Update for STOMP
Browse files Browse the repository at this point in the history
  • Loading branch information
bdw429s committed Oct 11, 2024
1 parent f80572a commit edf2500
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 59 deletions.
2 changes: 1 addition & 1 deletion gradle/version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
5.0.6-0de55e3b7a2f4c8dd5b33ef11edc7a90a39506e7-96ea216b7cfb6883a6af15c191a94953a0596b9c
5.0.7-0de55e3b7a2f4c8dd5b33ef11edc7a90a39506e7-96ea216b7cfb6883a6af15c191a94953a0596b9c
12 changes: 11 additions & 1 deletion src/main/java/runwar/undertow/WebsocketHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,13 @@ public WebsocketHandler(final HttpHandler next, ServerOptions serverOptions, Sit
this.next = next;
this.serverOptions = serverOptions;
this.siteOptions = siteOptions;

Set<Handshake> handshakes = new HashSet<>();
handshakes.add(new Hybi13Handshake(Set.of("v12.stomp", "v11.stomp", "v10.stomp"), false));
handshakes.add(new Hybi08Handshake(Set.of("v12.stomp", "v11.stomp", "v10.stomp"), false));
handshakes.add(new Hybi07Handshake(Set.of("v12.stomp", "v11.stomp", "v10.stomp"), false));
this.webSocketProtocolHandshakeHandler = new WebSocketProtocolHandshakeHandler(
handshakes,
new WebSocketConnectionCallback() {
@Override
public void onConnect(WebSocketHttpExchange WSexchange, WebSocketChannel channel) {
Expand Down Expand Up @@ -88,7 +94,11 @@ public void sendMessage(WebSocketChannel channel, String message) {
if (channel == null || !channel.isOpen()) {
return;
}
WebSockets.sendText(message, channel, null);
// I'm not clear if Undertow handles this, but just to be safe only send one
// message to a channel at once to avoid threading issues
synchronized (channel) {
WebSockets.sendText(message, channel, null);
}
}

public void broadcastMessage(String message) {
Expand Down
138 changes: 81 additions & 57 deletions src/main/java/runwar/undertow/WebsocketReceiveListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class WebsocketReceiveListener extends AbstractReceiveListener {
*/
public WebsocketReceiveListener(HttpServerExchange exchange, HttpHandler next, ServerOptions serverOptions,
SiteOptions siteOptions, WebSocketChannel channel) {
this.exchange = exchange;
this.initialExchange = exchange;
this.next = next;
this.serverOptions = serverOptions;
this.siteOptions = siteOptions;
Expand All @@ -97,7 +97,6 @@ public WebsocketReceiveListener(HttpServerExchange exchange, HttpHandler next, S
* @param channel the WebSocketChannel
*/
public void onClose(AbstractFramedChannel channel) {
// System.out.println("dispatching onClose");
dispatchRequest("onClose", List.of(channel));
}

Expand All @@ -112,8 +111,21 @@ public void onClose(AbstractFramedChannel channel) {
@Override
protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage message)
throws IOException {
// System.out.println("dispatching onFullTextMessage");
dispatchRequest("onFullTextMessage", List.of(message, channel));
dispatchRequest("onFullTextMessage", List.of(message.getData(), channel));
}

protected void onFullBinaryMessage(final WebSocketChannel channel, BufferedBinaryMessage message)
throws IOException {
// turn the message into a string
Pooled<ByteBuffer[]> pbb = message.getData();
ByteBuffer[] bb = pbb.getResource();
StringBuilder sb = new StringBuilder();
for (ByteBuffer b : bb) {
sb.append(new String(b.array(), "UTF-8"));
}
dispatchRequest("onFullBinaryMessage", List.of(sb.toString(), channel));

message.getData().free();
}

public void dispatchRequest(String method, List<Object> requestDetails) {
Expand All @@ -122,62 +134,62 @@ public void dispatchRequest(String method, List<Object> requestDetails) {
|| Server.getServerState().equals(Server.ServerState.STOPPED)) {
return;
}
try {
String newUri = siteOptions.webSocketListener() + "?method=onProcess&WSMethod=" + method;

String newUri = siteOptions.webSocketListener() + "?method=onProcess&WSMethod=" + method;
// System.out.println( "dispatching request: " + newUri );

final DefaultByteBufferPool bufferPool = new DefaultByteBufferPool(false, 1024, 0, 0);
MockServerConnection connection = new MockServerConnection(bufferPool);
final DefaultByteBufferPool bufferPool = new DefaultByteBufferPool(false, 1024, 0, 0);
MockServerConnection connection = new MockServerConnection(bufferPool, initialExchange);

// Create a new HttpServerExchange for the new request
HttpServerExchange newExchange = new HttpServerExchange(connection);
newExchange.putAttachment(SiteDeploymentManager.SITE_DEPLOYMENT_KEY,
exchange.getAttachment(SiteDeploymentManager.SITE_DEPLOYMENT_KEY));
// Create a new HttpServerExchange for the new request
HttpServerExchange newExchange = new HttpServerExchange(connection);
newExchange.putAttachment(SiteDeploymentManager.SITE_DEPLOYMENT_KEY,
initialExchange.getAttachment(SiteDeploymentManager.SITE_DEPLOYMENT_KEY));

// Put the details on the new exchange so we can access them in our CF code
newExchange.putAttachment(WEBSOCKET_REQUEST_DETAILS, requestDetails);
// Put the details on the new exchange so we can access them in our CF code
newExchange.putAttachment(WEBSOCKET_REQUEST_DETAILS, requestDetails);

// copy headers (like cookies) any from the original request (except Upgrade)
this.exchange.getRequestHeaders().forEach(header -> {
if (!header.getHeaderName().toString().equalsIgnoreCase("Upgrade")) {
newExchange.getRequestHeaders().add(header.getHeaderName(), header.getFirst());
// copy headers (like cookies) any from the original request (except Upgrade)
this.initialExchange.getRequestHeaders().forEach(header -> {
if (!header.getHeaderName().toString().equalsIgnoreCase("Upgrade")) {
newExchange.getRequestHeaders().add(header.getHeaderName(), header.getFirst());
}
});
newExchange.setRequestMethod(this.initialExchange.getRequestMethod());
newExchange.setProtocol(this.initialExchange.getProtocol());
newExchange.setRequestScheme(this.initialExchange.getRequestScheme());
newExchange.setSourceAddress(this.initialExchange.getSourceAddress());
newExchange.setDestinationAddress(this.initialExchange.getDestinationAddress());

final StringBuilder sb = new StringBuilder();
try {
Connectors.setExchangeRequestPath(newExchange, newUri, sb);
} catch (ParameterLimitException | BadRequestException e) {
e.printStackTrace();
}
});
newExchange.setRequestMethod(this.exchange.getRequestMethod());
newExchange.setProtocol(this.exchange.getProtocol());
newExchange.setRequestScheme(this.exchange.getRequestScheme());
newExchange.setSourceAddress(this.exchange.getSourceAddress());
newExchange.setDestinationAddress(this.exchange.getDestinationAddress());

final StringBuilder sb = new StringBuilder();
try {
Connectors.setExchangeRequestPath(newExchange, newUri, sb);
} catch (ParameterLimitException e) {
e.printStackTrace();
}

// This sets the requestpath, relativepath, querystring, and parses the query
// parameters
newExchange.setRequestURI(this.exchange.getRequestScheme() + "://" + this.exchange.getHostAndPort() + newUri,
true);
// This sets the requestpath, relativepath, querystring, and parses the query
// parameters
newExchange.setRequestURI(
this.initialExchange.getRequestScheme() + "://" + this.initialExchange.getHostAndPort() + newUri,
true);

// Call the handler for the new URI
try {
// Call the handler for the new URI
HttpHandler exchangeSetter = new HttpHandler() {

@Override
public void handleRequest(final HttpServerExchange exchange) throws Exception {
HttpServerExchange currentExchange = Server.getCurrentExchange();
String currentDeployKey = Server.getCurrentDeploymentKey();

try {
// This allows the exchange to be available to the thread.
Server.setCurrentExchange(newExchange);
Server.setCurrentExchange(exchange);
Server.setCurrentDeploymentKey(currentDeployKey);
next.handleRequest(newExchange);
next.handleRequest(exchange);
} catch (Exception e) {
e.printStackTrace();
} finally {
// Clean up after
Server.setCurrentExchange(currentExchange);
Server.setCurrentDeploymentKey(currentDeployKey);
Server.setCurrentExchange(null);
Server.setCurrentDeploymentKey(null);
}
}

Expand All @@ -186,13 +198,12 @@ public String toString() {
return "Websocket Exchange Setter Handler";
}
};
if (exchange.isInIoThread()) {
exchange.dispatch(exchangeSetter);
} else {
exchangeSetter.handleRequest(exchange);
}

// We are in an IO thread, so we can dispatch the request directly to the worker
// pool
newExchange.dispatch(exchangeSetter);
} catch (Exception e) {
System.out.println("Error dispatching request: " + e.getMessage());
System.out.println("Error dispatching websocket request: " + e.getMessage());
e.printStackTrace();
}

Expand All @@ -204,9 +215,11 @@ private static class MockServerConnection extends ServerConnection {
private final ByteBufferPool bufferPool;
private SSLSessionInfo sslSessionInfo;
private XnioBufferPoolAdaptor poolAdaptor;
private HttpServerExchange initialExchange;

private MockServerConnection(ByteBufferPool bufferPool) {
private MockServerConnection(ByteBufferPool bufferPool, HttpServerExchange initialExchange) {
this.bufferPool = bufferPool;
this.initialExchange = initialExchange;
}

@Override
Expand All @@ -225,7 +238,7 @@ public ByteBufferPool getByteBufferPool() {

@Override
public XnioWorker getWorker() {
return null;
return initialExchange.getConnection().getWorker();
}

@Override
Expand Down Expand Up @@ -328,12 +341,12 @@ public StreamConnection upgradeChannel() {

@Override
public ConduitStreamSinkChannel getSinkChannel() {
return new ConduitStreamSinkChannel(Configurable.EMPTY, new DummyStreamSinkConduit());
return new ConduitStreamSinkChannel(Configurable.EMPTY, new DummyStreamSinkConduit(initialExchange));
}

@Override
public ConduitStreamSourceChannel getSourceChannel() {
return new ConduitStreamSourceChannel(Configurable.EMPTY, new DummyStreamSourceConduit());
return new ConduitStreamSourceChannel(Configurable.EMPTY, new DummyStreamSourceConduit(initialExchange));
}

@Override
Expand Down Expand Up @@ -382,6 +395,12 @@ public boolean isRequestTrailerFieldsSupported() {

private static class DummyStreamSinkConduit implements StreamSinkConduit {

private HttpServerExchange initialExchange;

public DummyStreamSinkConduit(HttpServerExchange initialExchange) {
this.initialExchange = initialExchange;
}

@Override
public int write(ByteBuffer src) throws IOException {
// Ignore all input
Expand Down Expand Up @@ -434,8 +453,7 @@ public XnioIoThread getWriteThread() {

@Override
public XnioWorker getWorker() {
// Return a dummy value or null
return null;
return initialExchange.getConnection().getWorker();
}

@Override
Expand Down Expand Up @@ -504,6 +522,12 @@ public void awaitWritable(long time, java.util.concurrent.TimeUnit timeUnit) thr

private static class DummyStreamSourceConduit implements StreamSourceConduit {

private HttpServerExchange initialExchange;

public DummyStreamSourceConduit(HttpServerExchange initialExchange) {
this.initialExchange = initialExchange;
}

@Override
public long transferTo(long position, long count, FileChannel target) throws IOException {
// Mock implementation: return 0 to indicate no bytes transferred
Expand Down Expand Up @@ -582,7 +606,7 @@ public void setReadReadyHandler(ReadReadyHandler handler) {

@Override
public XnioWorker getWorker() {
return null;
return initialExchange.getConnection().getWorker();
}
}
}

0 comments on commit edf2500

Please sign in to comment.