From bcea70657d21bb416ac3ef0cb63c6cf22bb62a37 Mon Sep 17 00:00:00 2001 From: Lazar Petrovic Date: Mon, 9 Sep 2024 15:14:30 +0200 Subject: [PATCH] feat: add PBJ support to platform streams (#15400) Signed-off-by: Lazar Petrovic --- .../streams/SerializableDataInputStream.java | 16 +++++++++++++ .../streams/SerializableDataOutputStream.java | 16 +++++++++++++ .../common/io/SelfSerializableTest.java | 24 +++++++++++++++++++ 3 files changed, 56 insertions(+) diff --git a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/io/streams/SerializableDataInputStream.java b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/io/streams/SerializableDataInputStream.java index 597844eea463..cd68a81f9618 100644 --- a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/io/streams/SerializableDataInputStream.java +++ b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/io/streams/SerializableDataInputStream.java @@ -21,6 +21,8 @@ import static com.swirlds.common.io.streams.SerializableStreamConstants.NULL_VERSION; import static com.swirlds.common.io.streams.SerializableStreamConstants.SERIALIZATION_PROTOCOL_VERSION; +import com.hedera.pbj.runtime.io.ReadableSequentialData; +import com.hedera.pbj.runtime.io.stream.ReadableStreamingData; import com.swirlds.base.function.CheckedFunction; import com.swirlds.common.constructable.ConstructableRegistry; import com.swirlds.common.io.SelfSerializable; @@ -49,6 +51,9 @@ public class SerializableDataInputStream extends AugmentedDataInputStream { private static final int PROTOCOL_VERSION = SERIALIZATION_PROTOCOL_VERSION; + /** A stream used to read PBJ objects */ + private final ReadableSequentialData readableSequentialData; + /** * Creates a stream capable of deserializing serializable objects. * @@ -56,6 +61,17 @@ public class SerializableDataInputStream extends AugmentedDataInputStream { */ public SerializableDataInputStream(final InputStream in) { super(in); + readableSequentialData = new ReadableStreamingData(in); + } + + /** + * While transitioning serialization from {@link SelfSerializable} to protobuf, this stream will support both + * serialization methods by providing a separate instance to deserialize protobuf objects. + * + * @return the readable sequential data stream + */ + public @NonNull ReadableSequentialData getReadableSequentialData() { + return readableSequentialData; } /** diff --git a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/io/streams/SerializableDataOutputStream.java b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/io/streams/SerializableDataOutputStream.java index 28fb1c480c51..e3d90b5f9a11 100644 --- a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/io/streams/SerializableDataOutputStream.java +++ b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/io/streams/SerializableDataOutputStream.java @@ -24,10 +24,13 @@ import static com.swirlds.common.io.streams.SerializableStreamConstants.SERIALIZATION_PROTOCOL_VERSION; import static com.swirlds.common.io.streams.SerializableStreamConstants.VERSION_BYTES; +import com.hedera.pbj.runtime.io.WritableSequentialData; +import com.hedera.pbj.runtime.io.stream.WritableStreamingData; import com.swirlds.common.io.FunctionalSerialize; import com.swirlds.common.io.SelfSerializable; import com.swirlds.common.io.SerializableDet; import com.swirlds.common.io.SerializableWithKnownLength; +import edu.umd.cs.findbugs.annotations.NonNull; import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -40,6 +43,8 @@ * It is designed for use with the SerializableDet interface, and its use is described there. */ public class SerializableDataOutputStream extends AugmentedDataOutputStream { + /** A stream used to write PBJ objects */ + private final WritableSequentialData writableSequentialData; /** * Creates a new data output stream to write data to the specified @@ -53,6 +58,17 @@ public class SerializableDataOutputStream extends AugmentedDataOutputStream { */ public SerializableDataOutputStream(OutputStream out) { super(out); + writableSequentialData = new WritableStreamingData(out); + } + + /** + * While transitioning serialization from {@link SelfSerializable} to protobuf, this stream will support both + * serialization methods by providing a separate instance to serialize protobuf objects. + * + * @return the writable sequential data stream + */ + public @NonNull WritableSequentialData getWritableSequentialData() { + return writableSequentialData; } /** diff --git a/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/io/SelfSerializableTest.java b/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/io/SelfSerializableTest.java index ae91b2dc216f..b6d22ec04eb4 100644 --- a/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/io/SelfSerializableTest.java +++ b/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/io/SelfSerializableTest.java @@ -19,6 +19,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import com.hedera.pbj.runtime.io.buffer.Bytes; import com.swirlds.common.io.exceptions.InvalidVersionException; import com.swirlds.common.test.fixtures.io.InputOutputStream; import com.swirlds.common.test.fixtures.io.SelfSerializableExample; @@ -72,4 +73,27 @@ void deserializeInvalidVersions() throws IOException { assertThrows(InvalidVersionException.class, () -> io2.getInput() .readSerializable(true, SelfSerializableExample::new)); } + + @Test + void pbjSupportTest() throws IOException { + final SelfSerializableExample serializable = new SelfSerializableExample(666, "Not a PBJ object"); + final byte[] byteArray = {1, 2, 3}; + final Bytes bytes = Bytes.wrap(byteArray); + + try (final InputOutputStream io = new InputOutputStream()) { + io.getOutput().writeSerializable(serializable, true); + bytes.writeTo(io.getOutput().getWritableSequentialData()); + io.getOutput().writeSerializable(serializable, false); + + io.startReading(); + + final SelfSerializable readSer1 = io.getInput().readSerializable(true, SelfSerializableExample::new); + final Bytes readBytes = io.getInput().getReadableSequentialData().readBytes(byteArray.length); + final SelfSerializable readSer2 = io.getInput().readSerializable(false, SelfSerializableExample::new); + + assertEquals(serializable, readSer1, "the serializable object should be the same as the one written"); + assertEquals(bytes, readBytes, "the bytes should be the same as the ones written"); + assertEquals(serializable, readSer2, "the serializable object should be the same as the one written"); + } + } }