Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport capture-and-replay-v0.1.0] MIGRATIONS-1289: Add end of segment message to capture serializer #282

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Timestamp;
import org.opensearch.migrations.trafficcapture.protos.EndOfSegmentsIndication;
import org.opensearch.migrations.trafficcapture.protos.TrafficObservation;
import org.opensearch.migrations.trafficcapture.protos.TrafficStream;

Expand All @@ -22,10 +23,11 @@ public static int getSizeOfTimestamp(Instant t) {
}

/**
* This function calculates the maximum bytes needed to store a message ByteBuffer and its associated
* Traffic Stream overhead into a CodedOutputStream. The actual required bytes could be marginally smaller.
* This function calculates the maximum bytes needed to store a message ByteBuffer that needs to be segmented into
* different ReadSegmentObservation or WriteSegmentObservation and its associated Traffic Stream overhead into a
* CodedOutputStream. The actual required bytes could be marginally smaller.
*/
public static int maxBytesNeededForMessage(Instant timestamp, int observationFieldNumber, int dataFieldNumber,
public static int maxBytesNeededForSegmentedMessage(Instant timestamp, int observationFieldNumber, int dataFieldNumber,
int dataCountFieldNumber, int dataCount, ByteBuffer buffer, int flushes) {
// Timestamp closure bytes
int tsContentSize = getSizeOfTimestamp(timestamp);
Expand All @@ -40,10 +42,27 @@ public static int maxBytesNeededForMessage(Instant timestamp, int observationFie
// Observation tag and closure size needed bytes
int observationTagAndClosureSize = CodedOutputStream.computeInt32Size(TrafficStream.SUBSTREAM_FIELD_NUMBER, tsClosureSize + captureClosureSize);

// Size for additional SegmentEndObservation to signify end of segments
int segmentEndBytes = bytesNeededForSegmentEndObservation(timestamp);

// Size for closing index, use arbitrary field to calculate
int indexSize = CodedOutputStream.computeInt32Size(TrafficStream.NUMBER_FIELD_NUMBER, flushes);

return observationTagAndClosureSize + tsClosureSize + captureClosureSize + indexSize;
return observationTagAndClosureSize + tsClosureSize + captureClosureSize + segmentEndBytes + indexSize;
}

public static int bytesNeededForSegmentEndObservation(Instant timestamp) {
// Timestamp closure bytes
int tsContentSize = getSizeOfTimestamp(timestamp);
int tsClosureSize = CodedOutputStream.computeInt32Size(TrafficObservation.TS_FIELD_NUMBER, tsContentSize) + tsContentSize;

// Capture closure bytes
int captureClosureSize = CodedOutputStream.computeMessageSize(TrafficObservation.SEGMENTEND_FIELD_NUMBER, EndOfSegmentsIndication.getDefaultInstance());

// Observation tag and closure size needed bytes
int observationTagAndClosureSize = CodedOutputStream.computeInt32Size(TrafficStream.SUBSTREAM_FIELD_NUMBER, tsClosureSize + captureClosureSize);

return observationTagAndClosureSize + tsClosureSize + captureClosureSize;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.migrations.trafficcapture.protos.CloseObservation;
import org.opensearch.migrations.trafficcapture.protos.ConnectionExceptionObservation;
import org.opensearch.migrations.trafficcapture.protos.EndOfMessageIndication;
import org.opensearch.migrations.trafficcapture.protos.EndOfSegmentsIndication;
import org.opensearch.migrations.trafficcapture.protos.ReadObservation;
import org.opensearch.migrations.trafficcapture.protos.ReadSegmentObservation;
import org.opensearch.migrations.trafficcapture.protos.TrafficObservation;
Expand Down Expand Up @@ -253,10 +254,10 @@ private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant
}

// The message bytes here are not optimizing for space and instead are calculated on the worst case estimate of
// the potentially required bytes for simplicity. This could leave ~5 bytes of unused space in the CodedOutputStream
// when considering the case of a message that does not need segments or the case of a smaller segment created
// the potentially required bytes for simplicity. This could leave ~25 bytes of unused space in the CodedOutputStream
// when considering the case of a message that does not need segments or ~5 for the case of a smaller segment created
// from a much larger message
int messageAndOverheadBytesLeft = CodedOutputStreamSizeUtil.maxBytesNeededForMessage(timestamp,
int messageAndOverheadBytesLeft = CodedOutputStreamSizeUtil.maxBytesNeededForSegmentedMessage(timestamp,
segmentFieldNumber, segmentDataFieldNumber, segmentCountFieldNumber, 2, byteBuffer, numFlushesSoFar + 1);
int trafficStreamOverhead = messageAndOverheadBytesLeft - byteBuffer.capacity();

Expand Down Expand Up @@ -298,6 +299,7 @@ private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant
messageAndOverheadBytesLeft = messageAndOverheadBytesLeft - chunkBytes;
}
}
writeEndOfSegmentMessage(timestamp);

}

Expand Down Expand Up @@ -425,4 +427,9 @@ private void writeEndOfHttpMessage(Instant timestamp) throws IOException {
getOrCreateCodedOutputStream().writeInt32(EndOfMessageIndication.FIRSTLINEBYTELENGTH_FIELD_NUMBER, firstLineByteLength);
getOrCreateCodedOutputStream().writeInt32(EndOfMessageIndication.HEADERSBYTELENGTH_FIELD_NUMBER, headersByteLength);
}

private void writeEndOfSegmentMessage(Instant timestamp) throws IOException {
beginSubstreamObservation(timestamp, TrafficObservation.SEGMENTEND_FIELD_NUMBER, 1);
getOrCreateCodedOutputStream().writeMessage(TrafficObservation.SEGMENTEND_FIELD_NUMBER, EndOfSegmentsIndication.getDefaultInstance());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.opensearch.migrations.trafficcapture.protos.CloseObservation;
import org.opensearch.migrations.trafficcapture.protos.ConnectionExceptionObservation;
import org.opensearch.migrations.trafficcapture.protos.EndOfMessageIndication;
import org.opensearch.migrations.trafficcapture.protos.EndOfSegmentsIndication;
import org.opensearch.migrations.trafficcapture.protos.ReadObservation;
import org.opensearch.migrations.trafficcapture.protos.TrafficObservation;
import org.opensearch.migrations.trafficcapture.protos.TrafficStream;
Expand Down Expand Up @@ -129,7 +130,7 @@ public void testBasicDataConsistencyWhenChunking() throws IOException, Execution
var outputBuffersCreated = new ConcurrentLinkedQueue<ByteBuffer>();
// Arbitrarily picking small buffer that can hold the overhead TrafficStream bytes as well as some
// data bytes but not all the data bytes and require chunking
var serializer = createSerializerWithTestHandler(outputBuffersCreated, 55);
var serializer = createSerializerWithTestHandler(outputBuffersCreated, 85);

var bb = Unpooled.wrappedBuffer(packetBytes);
serializer.addWriteEvent(referenceTimestamp, bb);
Expand Down Expand Up @@ -161,7 +162,7 @@ public void testEmptyPacketIsHandledForSmallCodedOutputStream()
var outputBuffersCreated = new ConcurrentLinkedQueue<ByteBuffer>();
// Arbitrarily picking small buffer size that can only hold one empty message
var serializer = createSerializerWithTestHandler(outputBuffersCreated,
TEST_NODE_ID_STRING.length() + 40);
TEST_NODE_ID_STRING.length() + 60);
var bb = Unpooled.buffer(0);
serializer.addWriteEvent(referenceTimestamp, bb);
serializer.addWriteEvent(referenceTimestamp, bb);
Expand Down Expand Up @@ -214,6 +215,75 @@ public void testThatReadCanBeDeserialized() throws IOException, ExecutionExcepti
Assertions.assertEquals(groundTruth, reconstitutedTrafficStream);
}

@Test
public void testEndOfSegmentsIndicationAddedWhenChunking() throws IOException, ExecutionException, InterruptedException {
final var referenceTimestamp = Instant.ofEpochMilli(1686593191*1000);
String packetData = "";
for (int i = 0; i < 500; i++) {
packetData += "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
}
byte[] packetBytes = packetData.getBytes(StandardCharsets.UTF_8);
var outputBuffersCreated = new ConcurrentLinkedQueue<ByteBuffer>();
// Arbitrarily picking small buffer that can hold the overhead TrafficStream bytes as well as some
// data bytes but not all the data bytes and require chunking
var serializer = createSerializerWithTestHandler(outputBuffersCreated, 85);

var bb = Unpooled.wrappedBuffer(packetBytes);
serializer.addWriteEvent(referenceTimestamp, bb);
CompletableFuture future = serializer.flushCommitAndResetStream(true);
future.get();
bb.release();

List<TrafficObservation> observations = new ArrayList<>();
for (ByteBuffer buffer : outputBuffersCreated) {
var trafficStream = TrafficStream.parseFrom(buffer);
observations.addAll(trafficStream.getSubStreamList());
}

int foundEndOfSegments = 0;
for (TrafficObservation observation : observations) {
if (observation.hasSegmentEnd()) {
foundEndOfSegments++;
EndOfSegmentsIndication endOfSegment = observation.getSegmentEnd();
Assertions.assertEquals(EndOfSegmentsIndication.getDefaultInstance(), endOfSegment);
}
}
Assertions.assertEquals(1, foundEndOfSegments);
}

@Test
public void testEndOfSegmentsIndicationNotAddedWhenNotChunking() throws IOException, ExecutionException, InterruptedException {
final var referenceTimestamp = Instant.ofEpochMilli(1686593191*1000);
String packetData = "";
for (int i = 0; i < 10; i++) {
packetData += "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
}
byte[] packetBytes = packetData.getBytes(StandardCharsets.UTF_8);
var outputBuffersCreated = new ConcurrentLinkedQueue<ByteBuffer>();
// Buffer size should be large enough to hold all packetData and overhead
var serializer = createSerializerWithTestHandler(outputBuffersCreated, 500);

var bb = Unpooled.wrappedBuffer(packetBytes);
serializer.addWriteEvent(referenceTimestamp, bb);
CompletableFuture future = serializer.flushCommitAndResetStream(true);
future.get();
bb.release();

List<TrafficObservation> observations = new ArrayList<>();
for (ByteBuffer buffer : outputBuffersCreated) {
var trafficStream = TrafficStream.parseFrom(buffer);
observations.addAll(trafficStream.getSubStreamList());
}

int foundEndOfSegments = 0;
for (TrafficObservation observation : observations) {
if (observation.hasSegmentEnd()) {
foundEndOfSegments++;
}
}
Assertions.assertEquals(0, foundEndOfSegments);
}

private StreamChannelConnectionCaptureSerializer
createSerializerWithTestHandler(ConcurrentLinkedQueue<ByteBuffer> outputBuffers, int bufferSize)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -529,8 +526,7 @@ private static String formatWorkItem(DiagnosticTrackableCompletableFuture<String

public void runReplay(Stream<TrafficStream> trafficChunkStream,
CapturedTrafficToHttpTransactionAccumulator trafficToHttpTransactionAccumulator) {
trafficChunkStream
.forEach(ts-> trafficToHttpTransactionAccumulator.accept(ts));
trafficChunkStream.forEach(ts-> trafficToHttpTransactionAccumulator.accept(ts));
}

}
Loading