Skip to content

Commit

Permalink
Fix RLOG server hang
Browse files Browse the repository at this point in the history
  • Loading branch information
jwbonner committed Mar 14, 2024
1 parent 577884f commit 3b11c3b
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,5 @@ public default void end() {
*
* @param table A copy of the data to save.
*/
public void putTable(LogTable table);
public void putTable(LogTable table) throws InterruptedException;
}
145 changes: 68 additions & 77 deletions junction/core/src/org/littletonrobotics/junction/rlog/RLOGServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;

import org.littletonrobotics.junction.LogDataReceiver;
import org.littletonrobotics.junction.LogTable;
Expand All @@ -29,6 +31,7 @@
public class RLOGServer implements LogDataReceiver {
private final int port;
private ServerThread thread;
private RLOGEncoder encoder = new RLOGEncoder();

public RLOGServer() {
this(5800);
Expand All @@ -51,19 +54,33 @@ public void end() {
}
}

public void putTable(LogTable table) {
if (thread != null) {
thread.periodic(table);
public void putTable(LogTable table) throws InterruptedException {
if (thread != null && thread.broadcastQueue.remainingCapacity() > 0) {
// If broadcast is behind, drop this cycle and encode changes in the next cycle
byte[] data;
synchronized (thread) {
encoder.encodeTable(table, false);
data = encodeData(encoder.getOutput().array());
}
thread.broadcastQueue.put(data);
}
}

private byte[] encodeData(byte[] data) {
byte[] lengthBytes = ByteBuffer.allocate(Integer.BYTES).putInt(data.length).array();
byte[] fullData = new byte[lengthBytes.length + data.length];
System.arraycopy(lengthBytes, 0, fullData, 0, lengthBytes.length);
System.arraycopy(data, 0, fullData, lengthBytes.length, data.length);
return fullData;
}

private class ServerThread extends Thread {
private static final double heartbeatTimeoutSecs = 3.0; // Close connection if heartbeat not received for this length

ServerSocket server;
Thread heartbeatThread;
RLOGEncoder encoder = new RLOGEncoder();

Thread broadcastThread;

ArrayBlockingQueue<byte[]> broadcastQueue = new ArrayBlockingQueue<>(500);
List<Socket> sockets = new ArrayList<>();
List<Double> lastHeartbeats = new ArrayList<>();

Expand All @@ -82,90 +99,72 @@ public void run() {
return;
}

// Check hearbeats periodically
heartbeatThread = new Thread(() -> {
while (true) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
return;
}

synchronized (this) {
for (int i = 0; i < sockets.size(); i++) {
Socket socket = sockets.get(i);
if (socket.isClosed()) {
continue;
}

try {
// Read heartbeat
InputStream inputStream = socket.getInputStream();
if (inputStream.available() > 0) {
inputStream.skip(inputStream.available());
lastHeartbeats.set(i, Logger.getRealTimestamp() / 1000000.0);
}

// Close connection if socket timed out
if (Logger.getRealTimestamp() / 1000000.0 - lastHeartbeats.get(i) > heartbeatTimeoutSecs) {
socket.close();
printDisconnectMessage(socket, "timeout");
continue;
}

// Broadcast message to stay alive
socket.getOutputStream().write(new byte[4]);
} catch (IOException e) {
try {
socket.close();
printDisconnectMessage(socket, "IOException");
} catch (IOException a) {
a.printStackTrace();
}
}
}
}
}
});
heartbeatThread.setName("RLOGServerHeartbeats");
heartbeatThread.setDaemon(true);
heartbeatThread.start();
// Start broadcast thread
broadcastThread = new Thread(this::runBroadcast);
broadcastThread.setName("RLOGServerBroadcast");
broadcastThread.setDaemon(true);
broadcastThread.start();

// Wait for clients
while (true) {
try {
Socket socket = server.accept();
sockets.add(socket);
lastHeartbeats.add(Logger.getRealTimestamp() / 1000000.0);
byte[] data;
synchronized (this) {
socket.getOutputStream().write(encodeData(encoder.getNewcomerData().array()));
data = encodeData(encoder.getNewcomerData().array());
}
socket.getOutputStream().write(data);
sockets.add(socket);
lastHeartbeats.add(Logger.getRealTimestamp() / 1000000.0);
System.out.println("Connected to RLOG client - " + socket.getInetAddress().getHostAddress());
} catch (IOException e) {
e.printStackTrace();
}
}
}

public void periodic(LogTable table) {
if (server == null) {
return;
}
public void runBroadcast() {
while (true) {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
return;
}

synchronized (this) {
// Encode data
encoder.encodeTable(table, false);
byte[] data = encodeData(encoder.getOutput().array());
// Get queue data
List<byte[]> broadcastData = new ArrayList<>();
broadcastQueue.drainTo(broadcastData);

// Broadcast data to each socket
// Broadcast to each client
for (int i = 0; i < sockets.size(); i++) {
Socket socket = sockets.get(i);
if (socket.isClosed()) {
continue;
}

try {
socket.getOutputStream().write(data);
// Read heartbeat
InputStream inputStream = socket.getInputStream();
if (inputStream.available() > 0) {
inputStream.skip(inputStream.available());
lastHeartbeats.set(i, Logger.getRealTimestamp() / 1000000.0);
}

// Close connection if socket timed out
if (Logger.getRealTimestamp() / 1000000.0 - lastHeartbeats.get(i) > heartbeatTimeoutSecs) {
socket.close();
printDisconnectMessage(socket, "timeout");
continue;
}

// Send message to stay alive
var outputStream = socket.getOutputStream();
outputStream.write(new byte[4]);

// Send broadcast data
for (byte[] data : broadcastData) {
outputStream.write(data);
}
} catch (IOException e) {
try {
socket.close();
Expand All @@ -178,14 +177,6 @@ public void periodic(LogTable table) {
}
}

private byte[] encodeData(byte[] data) {
byte[] lengthBytes = ByteBuffer.allocate(Integer.BYTES).putInt(data.length).array();
byte[] fullData = new byte[lengthBytes.length + data.length];
System.arraycopy(lengthBytes, 0, fullData, 0, lengthBytes.length);
System.arraycopy(data, 0, fullData, lengthBytes.length, data.length);
return fullData;
}

private void printDisconnectMessage(Socket socket, String reason) {
System.out.println("Disconnected from RLOG client (" + reason + ") - " + socket.getInetAddress().getHostAddress());
}
Expand All @@ -199,8 +190,8 @@ public void close() {
e.printStackTrace();
}
}
if (heartbeatThread != null) {
heartbeatThread.interrupt();
if (broadcastThread != null) {
broadcastThread.interrupt();
}
this.interrupt();
}
Expand Down

0 comments on commit 3b11c3b

Please sign in to comment.