From ad212a9cfc3f2b2bb9e66f43b827a54e2fcdbe85 Mon Sep 17 00:00:00 2001 From: Jonah <47046556+jwbonner@users.noreply.github.com> Date: Tue, 12 Mar 2024 19:39:01 -0400 Subject: [PATCH 1/2] Add support for RLOG R2 --- docs/RLOG-SPEC.md | 19 ++-- .../junction/rlog/RLOGEncoder.java | 101 +++++++----------- .../junction/rlog/RLOGServer.java | 16 +-- 3 files changed, 61 insertions(+), 75 deletions(-) diff --git a/docs/RLOG-SPEC.md b/docs/RLOG-SPEC.md index 2d1afe9d..5df36bec 100644 --- a/docs/RLOG-SPEC.md +++ b/docs/RLOG-SPEC.md @@ -1,13 +1,13 @@ # Byte Format for Robot Logs (.rlog) -> Note: AdvantageKit v2 (2023) replaced RLOG with WPILOG as its preferred logging format. RLOG continues to be supported by AdvantageScope to interact with older code. - ## Log Revisions The first byte represents the log format revision. The decoding device should always check whether it supports the specified revision before continuing. Below is a list of possible revisions: -- 0x00 = Invalid file. This is the first byte of logs produced before a revision number was included. -- 0x01 = Current revision. The log follows the specification described below. +- R1 = Supported by AdvantageKit v0.0.1-v1.8.1 and v2.2.0-v3.1.1. Uses a predefined set of field types. +- R2 = Supported by AdvantageKit v3.2.0 and newer. Uses string names for field types. + +All values are stored in big endian order. ## Message Types @@ -28,15 +28,20 @@ Each human-readable string key (e.g. "/DriveTrain/LeftPositionRadians") is repre 1. Key ID (short, 2 bytes) 2. Number of bytes in string (short, 2 bytes) 3. String key (UTF-8 encoded) +4. Number of bytes in string (short, 2 bytes) - Only if RLOG R2 +5. String type (UTF-8 encoded) - Only if RLOG R2 ## Field -Field messages represent a change to a single value. If a key is not provided for a given cycle, the robot code should set its type to "null". A field's type can change over the course of a log, though each cycle can only contain a single value for each key (e.g. a field cannot have both an integer and boolean value at the same time). The structure of these messages begins with the following information: +Field messages represent a change to a single value. The structure of these messages begins with the following information: 1. Key ID (short, 2 bytes) -2. Value type (1 byte) +2. Value type (1 byte) - Only if RLOG R1 +2. Value length (short, 2 bytes) - Only if RLOG R2 + +For RLOG R2, the value can follow any format. By default, use the [WPILOG-specified data types](https://github.com/wpilibsuite/allwpilib/blob/main/wpiutil/doc/datalog.adoc#data-types). -The possible value types are listed below along with the format of the value. Null (0x00) does not include more information. +For RLOG R1, the possible value types are listed below along with the format of the value. Null (0x00) does not include more information. ### Boolean (0x01) diff --git a/junction/core/src/org/littletonrobotics/junction/rlog/RLOGEncoder.java b/junction/core/src/org/littletonrobotics/junction/rlog/RLOGEncoder.java index 3abd08f0..95810ef0 100644 --- a/junction/core/src/org/littletonrobotics/junction/rlog/RLOGEncoder.java +++ b/junction/core/src/org/littletonrobotics/junction/rlog/RLOGEncoder.java @@ -23,14 +23,18 @@ import org.littletonrobotics.junction.LogTable; import org.littletonrobotics.junction.LogTable.LogValue; -/** Converts log tables to the RLOG format. */ +/** + * Converts log tables to the RLOG format. Based on RLOG R2 with + * support for custom type strings. + */ class RLOGEncoder { - public static final byte logRevision = (byte) 1; + public static final byte logRevision = (byte) 2; private ByteBuffer nextOutput; private boolean isFirstTable = true; private LogTable lastTable = new LogTable(0); private Map keyIDs = new HashMap<>(); + private Map keyTypes = new HashMap<>(); private short nextKeyID = 0; /** Reads the encoded output of the last encoded table. */ @@ -38,15 +42,6 @@ public ByteBuffer getOutput() { return nextOutput; } - /** - * Encodes a single tables and returns the encoded output. Equivalent to calling - * "encodeTable()" and then "getOutput()" - */ - public ByteBuffer getOutput(LogTable table) { - encodeTable(table); - return nextOutput; - } - /** * Returns data required to start a new receiver (full contents of last table + * all key IDs). @@ -62,7 +57,7 @@ public ByteBuffer getNewcomerData() { // Encode key IDs for (Map.Entry keyID : keyIDs.entrySet()) { - buffers.add(encodeKey(keyID.getValue(), keyID.getKey())); + buffers.add(encodeKey(keyID.getValue(), keyID.getKey(), keyTypes.get(keyID.getKey()))); } // Encode fields @@ -83,14 +78,14 @@ public ByteBuffer getNewcomerData() { } /** Encodes a single table and stores the result. */ - public void encodeTable(LogTable table) { + public void encodeTable(LogTable table, boolean includeRevision) { List buffers = new ArrayList<>(); Map newMap = table.getAll(false); Map oldMap = lastTable.getAll(false); // Encode log revision - if (isFirstTable) { + if (isFirstTable && includeRevision) { buffers.add(ByteBuffer.allocate(1).put(logRevision)); isFirstTable = false; } @@ -109,20 +104,13 @@ public void encodeTable(LogTable table) { // Write new data if (!keyIDs.containsKey(field.getKey())) { keyIDs.put(field.getKey(), nextKeyID); - buffers.add(encodeKey(nextKeyID, field.getKey())); + keyTypes.put(field.getKey(), field.getValue().getWPILOGType()); + buffers.add(encodeKey(nextKeyID, field.getKey(), field.getValue().getWPILOGType())); nextKeyID++; } buffers.add(encodeValue(keyIDs.get(field.getKey()), newValue)); } - // Encode removed fields (no longer supported) - // - // for (Map.Entry field : oldMap.entrySet()) { - // if (!newMap.containsKey(field.getKey())) { - // buffers.add(encodeValue(keyIDs.get(field.getKey()), null)); - // } - // } - // Update last table lastTable = table; @@ -144,14 +132,19 @@ private static ByteBuffer encodeTimestamp(double timestamp) { return buffer; } - private static ByteBuffer encodeKey(short keyID, String key) { + private static ByteBuffer encodeKey(short keyID, String key, String type) { try { byte[] keyBytes = key.getBytes("UTF-8"); - ByteBuffer buffer = ByteBuffer.allocate(1 + Short.BYTES + Short.BYTES + keyBytes.length); + byte[] typeBytes = type.getBytes("UTF-8"); + ByteBuffer buffer = ByteBuffer.allocate( + 1 + Short.BYTES + Short.BYTES + keyBytes.length + Short.BYTES + typeBytes.length + ); buffer.put((byte) 1); buffer.putShort(keyID); buffer.putShort((short) keyBytes.length); buffer.put(keyBytes); + buffer.putShort((short) typeBytes.length); + buffer.put(typeBytes); return buffer; } catch (UnsupportedEncodingException e) { return ByteBuffer.allocate(0); @@ -160,8 +153,8 @@ private static ByteBuffer encodeKey(short keyID, String key) { private static ByteBuffer encodeValue(short keyID, LogValue value) { try { - // Generate key and type buffer - ByteBuffer keyBuffer = ByteBuffer.allocate(1 + Short.BYTES + 1); + // Generate key and length buffer + ByteBuffer keyBuffer = ByteBuffer.allocate(1 + Short.BYTES + Short.BYTES); keyBuffer.put((byte) 2); keyBuffer.putShort(keyID); @@ -169,84 +162,67 @@ private static ByteBuffer encodeValue(short keyID, LogValue value) { ByteBuffer valueBuffer; switch (value.type) { case Raw: - keyBuffer.put((byte) 10); byte[] byteArray = value.getRaw(); - valueBuffer = ByteBuffer.allocate(Short.BYTES + byteArray.length); - valueBuffer.putShort((short) byteArray.length); + valueBuffer = ByteBuffer.allocate(byteArray.length); valueBuffer.put(byteArray); break; case Boolean: - keyBuffer.put((byte) 1); valueBuffer = ByteBuffer.allocate(1).put(value.getBoolean() ? (byte) 1 : (byte) 0); break; - case Integer: // Save as Integer (int32) - keyBuffer.put((byte) 3); - valueBuffer = ByteBuffer.allocate(Integer.BYTES).putInt((int) value.getInteger()); + case Integer: + valueBuffer = ByteBuffer.allocate(Long.BYTES).putLong(value.getInteger()); break; - case Float: // Save as Double - keyBuffer.put((byte) 5); - valueBuffer = ByteBuffer.allocate(Double.BYTES).putDouble(value.getFloat()); + case Float: + valueBuffer = ByteBuffer.allocate(Float.BYTES).putFloat(value.getFloat()); break; case Double: - keyBuffer.put((byte) 5); valueBuffer = ByteBuffer.allocate(Double.BYTES).putDouble(value.getDouble()); break; case String: - keyBuffer.put((byte) 7); String stringValue = value.getString(); byte[] stringBytes = stringValue.getBytes("UTF-8"); - valueBuffer = ByteBuffer.allocate(Short.BYTES + stringBytes.length); - valueBuffer.putShort((short) stringBytes.length); + valueBuffer = ByteBuffer.allocate(stringBytes.length); valueBuffer.put(stringBytes); break; case BooleanArray: - keyBuffer.put((byte) 2); boolean[] booleanArray = value.getBooleanArray(); - valueBuffer = ByteBuffer.allocate(Short.BYTES + booleanArray.length); - valueBuffer.putShort((short) booleanArray.length); + valueBuffer = ByteBuffer.allocate(booleanArray.length); for (boolean i : booleanArray) { valueBuffer.put(i ? (byte) 1 : (byte) 0); } break; - case IntegerArray: // Save as IntegerArray (int32[]) - keyBuffer.put((byte) 4); + case IntegerArray: long[] intArray = value.getIntegerArray(); - valueBuffer = ByteBuffer.allocate(Short.BYTES + (intArray.length * Integer.BYTES)); - valueBuffer.putShort((short) intArray.length); + valueBuffer = ByteBuffer.allocate(intArray.length * Long.BYTES); for (long i : intArray) { - valueBuffer.putInt((int) i); + valueBuffer.putLong(i); } break; - case FloatArray: // Save as DoubleArray - keyBuffer.put((byte) 6); + case FloatArray: float[] floatArray = value.getFloatArray(); - valueBuffer = ByteBuffer.allocate(Short.BYTES + (floatArray.length * Double.BYTES)); - valueBuffer.putShort((short) floatArray.length); + valueBuffer = ByteBuffer.allocate(floatArray.length * Float.BYTES); for (float i : floatArray) { - valueBuffer.putDouble(i); + valueBuffer.putFloat(i); } break; case DoubleArray: - keyBuffer.put((byte) 6); double[] doubleArray = value.getDoubleArray(); - valueBuffer = ByteBuffer.allocate(Short.BYTES + (doubleArray.length * Double.BYTES)); - valueBuffer.putShort((short) doubleArray.length); + valueBuffer = ByteBuffer.allocate(doubleArray.length * Double.BYTES); for (double i : doubleArray) { valueBuffer.putDouble(i); } break; case StringArray: - keyBuffer.put((byte) 8); String[] stringArray = value.getStringArray(); - int capacity = Short.BYTES; + int capacity = Integer.BYTES; for (String i : stringArray) { - capacity += Short.BYTES + i.getBytes("UTF-8").length; + capacity += Integer.BYTES + i.getBytes("UTF-8").length; } valueBuffer = ByteBuffer.allocate(capacity); - valueBuffer.putShort((short) stringArray.length); + valueBuffer.putInt(stringArray.length); for (String i : stringArray) { byte[] bytes = i.getBytes("UTF-8"); - valueBuffer.putShort((short) bytes.length); + valueBuffer.putInt(bytes.length); valueBuffer.put(bytes); } break; @@ -254,6 +230,7 @@ private static ByteBuffer encodeValue(short keyID, LogValue value) { valueBuffer = ByteBuffer.allocate(0); } + keyBuffer.putShort((short) valueBuffer.capacity()); return ByteBuffer.allocate(keyBuffer.capacity() + valueBuffer.capacity()).put(keyBuffer.array()) .put(valueBuffer.array()); } catch (UnsupportedEncodingException e) { diff --git a/junction/core/src/org/littletonrobotics/junction/rlog/RLOGServer.java b/junction/core/src/org/littletonrobotics/junction/rlog/RLOGServer.java index 82fe5309..9968b07f 100644 --- a/junction/core/src/org/littletonrobotics/junction/rlog/RLOGServer.java +++ b/junction/core/src/org/littletonrobotics/junction/rlog/RLOGServer.java @@ -30,6 +30,10 @@ public class RLOGServer implements LogDataReceiver { private final int port; private ServerThread thread; + public RLOGServer() { + this(5800); + } + public RLOGServer(int port) { this.port = port; } @@ -37,7 +41,7 @@ public RLOGServer(int port) { public void start() { thread = new ServerThread(port); thread.start(); - System.out.println("Log server started on port " + Integer.toString(port)); + System.out.println("RLOG server started on port " + Integer.toString(port)); } public void end() { @@ -54,7 +58,7 @@ public void putTable(LogTable table) { } private class ServerThread extends Thread { - private static final double heartbeatTimeoutSecs = 3.0; // Close connection if hearbeat not received for this length + private static final double heartbeatTimeoutSecs = 3.0; // Close connection if heartbeat not received for this length ServerSocket server; RLOGEncoder encoder = new RLOGEncoder(); @@ -63,7 +67,7 @@ private class ServerThread extends Thread { List lastHeartbeats = new ArrayList<>(); public ServerThread(int port) { - super("LogSocketServer"); + super("RLOGServer"); this.setDaemon(true); try { server = new ServerSocket(port); @@ -82,7 +86,7 @@ public void run() { socket.getOutputStream().write(encodeData(encoder.getNewcomerData().array())); sockets.add(socket); lastHeartbeats.add(Logger.getRealTimestamp() / 1000000.0); - System.out.println("Connected to log client - " + socket.getInetAddress().getHostAddress()); + System.out.println("Connected to RLOG client - " + socket.getInetAddress().getHostAddress()); } catch (IOException e) { e.printStackTrace(); } @@ -94,7 +98,7 @@ public void periodic(LogTable table) { return; } - encoder.encodeTable(table); + encoder.encodeTable(table, false); byte[] data = encodeData(encoder.getOutput().array()); for (int i = 0; i < sockets.size(); i++) { Socket socket = sockets.get(i); @@ -139,7 +143,7 @@ private byte[] encodeData(byte[] data) { } private void printDisconnectMessage(Socket socket, String reason) { - System.out.println("Disconnected from log client (" + reason + ") - " + socket.getInetAddress().getHostAddress()); + System.out.println("Disconnected from RLOG client (" + reason + ") - " + socket.getInetAddress().getHostAddress()); } public void close() { From 264fd767f6fcc1507e4043cb49ff13d0a8a94021 Mon Sep 17 00:00:00 2001 From: Jonah <47046556+jwbonner@users.noreply.github.com> Date: Wed, 13 Mar 2024 00:25:32 -0400 Subject: [PATCH 2/2] RLOG improvements --- .../junction/rlog/RLOGServer.java | 103 +++++++++++++----- 1 file changed, 75 insertions(+), 28 deletions(-) diff --git a/junction/core/src/org/littletonrobotics/junction/rlog/RLOGServer.java b/junction/core/src/org/littletonrobotics/junction/rlog/RLOGServer.java index 9968b07f..d2c5288d 100644 --- a/junction/core/src/org/littletonrobotics/junction/rlog/RLOGServer.java +++ b/junction/core/src/org/littletonrobotics/junction/rlog/RLOGServer.java @@ -61,6 +61,7 @@ private class ServerThread extends Thread { private static final double heartbeatTimeoutSecs = 3.0; // Close connection if heartbeat not received for this length ServerSocket server; + Thread heartbeatThread; RLOGEncoder encoder = new RLOGEncoder(); List sockets = new ArrayList<>(); @@ -80,12 +81,65 @@ public void run() { if (server == null) { return; } + + // Check hearbeats periodically + heartbeatThread = new Thread(() -> { + while (true) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + return; + } + + synchronized (this) { + for (int i = 0; i < sockets.size(); i++) { + Socket socket = sockets.get(i); + if (socket.isClosed()) { + continue; + } + + try { + // Read heartbeat + InputStream inputStream = socket.getInputStream(); + if (inputStream.available() > 0) { + inputStream.skip(inputStream.available()); + lastHeartbeats.set(i, Logger.getRealTimestamp() / 1000000.0); + } + + // Close connection if socket timed out + if (Logger.getRealTimestamp() / 1000000.0 - lastHeartbeats.get(i) > heartbeatTimeoutSecs) { + socket.close(); + printDisconnectMessage(socket, "timeout"); + continue; + } + + // Broadcast message to stay alive + socket.getOutputStream().write(new byte[4]); + } catch (IOException e) { + try { + socket.close(); + printDisconnectMessage(socket, "IOException"); + } catch (IOException a) { + a.printStackTrace(); + } + } + } + } + } + }); + heartbeatThread.setName("RLOGServerHeartbeats"); + heartbeatThread.setDaemon(true); + heartbeatThread.start(); + + // Wait for clients while (true) { try { Socket socket = server.accept(); - socket.getOutputStream().write(encodeData(encoder.getNewcomerData().array())); sockets.add(socket); lastHeartbeats.add(Logger.getRealTimestamp() / 1000000.0); + synchronized (this) { + socket.getOutputStream().write(encodeData(encoder.getNewcomerData().array())); + } System.out.println("Connected to RLOG client - " + socket.getInetAddress().getHostAddress()); } catch (IOException e) { e.printStackTrace(); @@ -98,37 +152,27 @@ public void periodic(LogTable table) { return; } - encoder.encodeTable(table, false); - byte[] data = encodeData(encoder.getOutput().array()); - for (int i = 0; i < sockets.size(); i++) { - Socket socket = sockets.get(i); - if (socket.isClosed()) { - continue; - } + synchronized (this) { + // Encode data + encoder.encodeTable(table, false); + byte[] data = encodeData(encoder.getOutput().array()); - try { - // Read heartbeat - InputStream inputStream = socket.getInputStream(); - if (inputStream.available() > 0) { - inputStream.skip(inputStream.available()); - lastHeartbeats.set(i, Logger.getRealTimestamp() / 1000000.0); + // Broadcast data to each socket + for (int i = 0; i < sockets.size(); i++) { + Socket socket = sockets.get(i); + if (socket.isClosed()) { + continue; } - // Close connection if socket timed out - if (Logger.getRealTimestamp() / 1000000.0 - lastHeartbeats.get(i) > heartbeatTimeoutSecs) { - socket.close(); - printDisconnectMessage(socket, "timeout"); - } else { - - // Send new data - socket.getOutputStream().write(data); - } - } catch (IOException e) { try { - socket.close(); - printDisconnectMessage(socket, "IOException"); - } catch (IOException a) { - a.printStackTrace(); + socket.getOutputStream().write(data); + } catch (IOException e) { + try { + socket.close(); + printDisconnectMessage(socket, "IOException"); + } catch (IOException a) { + a.printStackTrace(); + } } } } @@ -155,6 +199,9 @@ public void close() { e.printStackTrace(); } } + if (heartbeatThread != null) { + heartbeatThread.interrupt(); + } this.interrupt(); } }