Skip to content

Commit

Permalink
feat: add PBJ support to platform streams (#15400)
Browse files Browse the repository at this point in the history
Signed-off-by: Lazar Petrovic <[email protected]>
  • Loading branch information
lpetrovic05 authored Sep 9, 2024
1 parent 1668e85 commit bcea706
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static com.swirlds.common.io.streams.SerializableStreamConstants.NULL_VERSION;
import static com.swirlds.common.io.streams.SerializableStreamConstants.SERIALIZATION_PROTOCOL_VERSION;

import com.hedera.pbj.runtime.io.ReadableSequentialData;
import com.hedera.pbj.runtime.io.stream.ReadableStreamingData;
import com.swirlds.base.function.CheckedFunction;
import com.swirlds.common.constructable.ConstructableRegistry;
import com.swirlds.common.io.SelfSerializable;
Expand Down Expand Up @@ -49,13 +51,27 @@ public class SerializableDataInputStream extends AugmentedDataInputStream {

private static final int PROTOCOL_VERSION = SERIALIZATION_PROTOCOL_VERSION;

/** A stream used to read PBJ objects */
private final ReadableSequentialData readableSequentialData;

/**
* Creates a stream capable of deserializing serializable objects.
*
* @param in the specified input stream
*/
public SerializableDataInputStream(final InputStream in) {
super(in);
readableSequentialData = new ReadableStreamingData(in);
}

/**
* While transitioning serialization from {@link SelfSerializable} to protobuf, this stream will support both
* serialization methods by providing a separate instance to deserialize protobuf objects.
*
* @return the readable sequential data stream
*/
public @NonNull ReadableSequentialData getReadableSequentialData() {
return readableSequentialData;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
import static com.swirlds.common.io.streams.SerializableStreamConstants.SERIALIZATION_PROTOCOL_VERSION;
import static com.swirlds.common.io.streams.SerializableStreamConstants.VERSION_BYTES;

import com.hedera.pbj.runtime.io.WritableSequentialData;
import com.hedera.pbj.runtime.io.stream.WritableStreamingData;
import com.swirlds.common.io.FunctionalSerialize;
import com.swirlds.common.io.SelfSerializable;
import com.swirlds.common.io.SerializableDet;
import com.swirlds.common.io.SerializableWithKnownLength;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
Expand All @@ -40,6 +43,8 @@
* It is designed for use with the SerializableDet interface, and its use is described there.
*/
public class SerializableDataOutputStream extends AugmentedDataOutputStream {
/** A stream used to write PBJ objects */
private final WritableSequentialData writableSequentialData;

/**
* Creates a new data output stream to write data to the specified
Expand All @@ -53,6 +58,17 @@ public class SerializableDataOutputStream extends AugmentedDataOutputStream {
*/
public SerializableDataOutputStream(OutputStream out) {
super(out);
writableSequentialData = new WritableStreamingData(out);
}

/**
* While transitioning serialization from {@link SelfSerializable} to protobuf, this stream will support both
* serialization methods by providing a separate instance to serialize protobuf objects.
*
* @return the writable sequential data stream
*/
public @NonNull WritableSequentialData getWritableSequentialData() {
return writableSequentialData;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.hedera.pbj.runtime.io.buffer.Bytes;
import com.swirlds.common.io.exceptions.InvalidVersionException;
import com.swirlds.common.test.fixtures.io.InputOutputStream;
import com.swirlds.common.test.fixtures.io.SelfSerializableExample;
Expand Down Expand Up @@ -72,4 +73,27 @@ void deserializeInvalidVersions() throws IOException {
assertThrows(InvalidVersionException.class, () -> io2.getInput()
.readSerializable(true, SelfSerializableExample::new));
}

@Test
void pbjSupportTest() throws IOException {
final SelfSerializableExample serializable = new SelfSerializableExample(666, "Not a PBJ object");
final byte[] byteArray = {1, 2, 3};
final Bytes bytes = Bytes.wrap(byteArray);

try (final InputOutputStream io = new InputOutputStream()) {
io.getOutput().writeSerializable(serializable, true);
bytes.writeTo(io.getOutput().getWritableSequentialData());
io.getOutput().writeSerializable(serializable, false);

io.startReading();

final SelfSerializable readSer1 = io.getInput().readSerializable(true, SelfSerializableExample::new);
final Bytes readBytes = io.getInput().getReadableSequentialData().readBytes(byteArray.length);
final SelfSerializable readSer2 = io.getInput().readSerializable(false, SelfSerializableExample::new);

assertEquals(serializable, readSer1, "the serializable object should be the same as the one written");
assertEquals(bytes, readBytes, "the bytes should be the same as the ones written");
assertEquals(serializable, readSer2, "the serializable object should be the same as the one written");
}
}
}

0 comments on commit bcea706

Please sign in to comment.