diff --git a/junction/core/src/org/littletonrobotics/junction/LogDataReceiver.java b/junction/core/src/org/littletonrobotics/junction/LogDataReceiver.java index 10dca54..eb6fe3f 100644 --- a/junction/core/src/org/littletonrobotics/junction/LogDataReceiver.java +++ b/junction/core/src/org/littletonrobotics/junction/LogDataReceiver.java @@ -40,5 +40,5 @@ public default void end() { * * @param table A copy of the data to save. */ - public void putTable(LogTable table); + public void putTable(LogTable table) throws InterruptedException; } diff --git a/junction/core/src/org/littletonrobotics/junction/rlog/RLOGServer.java b/junction/core/src/org/littletonrobotics/junction/rlog/RLOGServer.java index d2c5288..348a7a7 100644 --- a/junction/core/src/org/littletonrobotics/junction/rlog/RLOGServer.java +++ b/junction/core/src/org/littletonrobotics/junction/rlog/RLOGServer.java @@ -20,6 +20,8 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; import org.littletonrobotics.junction.LogDataReceiver; import org.littletonrobotics.junction.LogTable; @@ -29,6 +31,7 @@ public class RLOGServer implements LogDataReceiver { private final int port; private ServerThread thread; + private RLOGEncoder encoder = new RLOGEncoder(); public RLOGServer() { this(5800); @@ -51,19 +54,33 @@ public void end() { } } - public void putTable(LogTable table) { - if (thread != null) { - thread.periodic(table); + public void putTable(LogTable table) throws InterruptedException { + if (thread != null && thread.broadcastQueue.remainingCapacity() > 0) { + // If broadcast is behind, drop this cycle and encode changes in the next cycle + byte[] data; + synchronized (thread) { + encoder.encodeTable(table, false); + data = encodeData(encoder.getOutput().array()); + } + thread.broadcastQueue.put(data); } } + private byte[] encodeData(byte[] data) { + byte[] lengthBytes = ByteBuffer.allocate(Integer.BYTES).putInt(data.length).array(); + byte[] fullData = new byte[lengthBytes.length + data.length]; + System.arraycopy(lengthBytes, 0, fullData, 0, lengthBytes.length); + System.arraycopy(data, 0, fullData, lengthBytes.length, data.length); + return fullData; + } + private class ServerThread extends Thread { private static final double heartbeatTimeoutSecs = 3.0; // Close connection if heartbeat not received for this length ServerSocket server; - Thread heartbeatThread; - RLOGEncoder encoder = new RLOGEncoder(); - + Thread broadcastThread; + + ArrayBlockingQueue broadcastQueue = new ArrayBlockingQueue<>(500); List sockets = new ArrayList<>(); List lastHeartbeats = new ArrayList<>(); @@ -82,64 +99,23 @@ public void run() { return; } - // Check hearbeats periodically - heartbeatThread = new Thread(() -> { - while (true) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - return; - } - - synchronized (this) { - for (int i = 0; i < sockets.size(); i++) { - Socket socket = sockets.get(i); - if (socket.isClosed()) { - continue; - } - - try { - // Read heartbeat - InputStream inputStream = socket.getInputStream(); - if (inputStream.available() > 0) { - inputStream.skip(inputStream.available()); - lastHeartbeats.set(i, Logger.getRealTimestamp() / 1000000.0); - } - - // Close connection if socket timed out - if (Logger.getRealTimestamp() / 1000000.0 - lastHeartbeats.get(i) > heartbeatTimeoutSecs) { - socket.close(); - printDisconnectMessage(socket, "timeout"); - continue; - } - - // Broadcast message to stay alive - socket.getOutputStream().write(new byte[4]); - } catch (IOException e) { - try { - socket.close(); - printDisconnectMessage(socket, "IOException"); - } catch (IOException a) { - a.printStackTrace(); - } - } - } - } - } - }); - heartbeatThread.setName("RLOGServerHeartbeats"); - heartbeatThread.setDaemon(true); - heartbeatThread.start(); + // Start broadcast thread + broadcastThread = new Thread(this::runBroadcast); + broadcastThread.setName("RLOGServerBroadcast"); + broadcastThread.setDaemon(true); + broadcastThread.start(); // Wait for clients while (true) { try { Socket socket = server.accept(); - sockets.add(socket); - lastHeartbeats.add(Logger.getRealTimestamp() / 1000000.0); + byte[] data; synchronized (this) { - socket.getOutputStream().write(encodeData(encoder.getNewcomerData().array())); + data = encodeData(encoder.getNewcomerData().array()); } + socket.getOutputStream().write(data); + sockets.add(socket); + lastHeartbeats.add(Logger.getRealTimestamp() / 1000000.0); System.out.println("Connected to RLOG client - " + socket.getInetAddress().getHostAddress()); } catch (IOException e) { e.printStackTrace(); @@ -147,17 +123,19 @@ public void run() { } } - public void periodic(LogTable table) { - if (server == null) { - return; - } + public void runBroadcast() { + while (true) { + try { + Thread.sleep(20); + } catch (InterruptedException e) { + return; + } - synchronized (this) { - // Encode data - encoder.encodeTable(table, false); - byte[] data = encodeData(encoder.getOutput().array()); + // Get queue data + List broadcastData = new ArrayList<>(); + broadcastQueue.drainTo(broadcastData); - // Broadcast data to each socket + // Broadcast to each client for (int i = 0; i < sockets.size(); i++) { Socket socket = sockets.get(i); if (socket.isClosed()) { @@ -165,7 +143,28 @@ public void periodic(LogTable table) { } try { - socket.getOutputStream().write(data); + // Read heartbeat + InputStream inputStream = socket.getInputStream(); + if (inputStream.available() > 0) { + inputStream.skip(inputStream.available()); + lastHeartbeats.set(i, Logger.getRealTimestamp() / 1000000.0); + } + + // Close connection if socket timed out + if (Logger.getRealTimestamp() / 1000000.0 - lastHeartbeats.get(i) > heartbeatTimeoutSecs) { + socket.close(); + printDisconnectMessage(socket, "timeout"); + continue; + } + + // Send message to stay alive + var outputStream = socket.getOutputStream(); + outputStream.write(new byte[4]); + + // Send broadcast data + for (byte[] data : broadcastData) { + outputStream.write(data); + } } catch (IOException e) { try { socket.close(); @@ -178,14 +177,6 @@ public void periodic(LogTable table) { } } - private byte[] encodeData(byte[] data) { - byte[] lengthBytes = ByteBuffer.allocate(Integer.BYTES).putInt(data.length).array(); - byte[] fullData = new byte[lengthBytes.length + data.length]; - System.arraycopy(lengthBytes, 0, fullData, 0, lengthBytes.length); - System.arraycopy(data, 0, fullData, lengthBytes.length, data.length); - return fullData; - } - private void printDisconnectMessage(Socket socket, String reason) { System.out.println("Disconnected from RLOG client (" + reason + ") - " + socket.getInetAddress().getHostAddress()); } @@ -199,8 +190,8 @@ public void close() { e.printStackTrace(); } } - if (heartbeatThread != null) { - heartbeatThread.interrupt(); + if (broadcastThread != null) { + broadcastThread.interrupt(); } this.interrupt(); }