Skip to content

Commit

Permalink
Merge branch 'main' into capture-and-replay-v0.1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
sumobrian committed Sep 12, 2023
2 parents ded26c9 + 8ef119d commit 67c8af0
Show file tree
Hide file tree
Showing 29 changed files with 515 additions and 252 deletions.
25 changes: 25 additions & 0 deletions SECURITY.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
## Reporting a Vulnerability

If you discover a potential security issue in this project we ask that you notify AWS/Amazon Security via our [vulnerability reporting page](http://aws.amazon.com/security/vulnerability-reporting/) or directly via email to [email protected]. Please do **not** create a public GitHub issue.

## While using the Traffic Capture & Replay tools

We strive to make these tools reliable and secure to use. However, before you begin using them with production or potentially sensitive data, we'd like to highlight a few points you should be aware of.

### Requests replayed with more permissive credentials

When configuring the Traffic Replayer, you can provide a set of credentials that will be attached to the replayed requests. It could be possible for an attacker to send a series of requests that they are not authorized to execute against the source cluster. The requests would be denied on the source cluster, but replayed against the target cluster with the configured credentials which may have higher privileges and allow the requests to be executed. The attacker could not directly read data this way, but any side effects of the requests (e.g. deleting data) would be executed.

### Denial of Service against the source cluster

In this case, an attacker could send a series of large and potentially malformed requests to the source cluster via the capture proxy. These messages would be relayed to the Kafka cluster, and if they were PUT/POST/UPDATE requests, block sending the request to the source cluster until the message was finished. If the attacker is able to use this strategy to tie up the proxy and/or Kafka cluster, all other incoming mutating requests to the source cluster would be blocked.

We have partially mitigated this by preventing the proxy from blocking for more than a fixed period of time (10 seconds by default, configurable [here](./TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/Main.java#L182)), however the flow of messages to Kafka could still be disrupted.

If you are concerned about this scenario, we recommend fully mitigating it by putting a load-shedder in front of the proxy.

### Credentials are available in the output tuples

The output tuples, available on the shared EFS volume via the Migration Console, contain the exact requests and responses received from both the source and target clusters with the headers and the body of the messages. The Authorization header is present on SigV4 signed requests and those using basic authorization, and with basic authorization credentials can be extracted from the header value. These values are often essential for debugging and so are not censored from the output.

If you use basic authorization credentials, ensure that access to your output tuples is protected similarly to the credentials themselves.
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ public IChannelConnectionCaptureSerializer createOffloader(String connectionId)
producer.send(record, handleProducerRecordSent(cf, recordId));
metricsLogger.atSuccess()
.addKeyValue("channelId", connectionId)
.addKeyValue("topic-name", topicNameForTraffic)
.addKeyValue("size", record.value().length)
.addKeyValue("topicName", topicNameForTraffic)
.addKeyValue("sizeInBytes", record.value().length)
.addKeyValue("diagnosticId", recordId)
.setMessage("Sent message to Kafka").log();
// Note that ordering is not guaranteed to be preserved here
Expand All @@ -85,7 +85,7 @@ public IChannelConnectionCaptureSerializer createOffloader(String connectionId)
} catch (Exception e) {
metricsLogger.atError(e)
.addKeyValue("channelId", connectionId)
.addKeyValue("topic-name", topicNameForTraffic)
.addKeyValue("topicName", topicNameForTraffic)
.setMessage("Sending message to Kafka failed.").log();
throw new RuntimeException(e);
}
Expand Down
22 changes: 11 additions & 11 deletions TrafficCapture/coreUtilities/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@ buildscript {

plugins {
id 'org.opensearch.migrations.java-library-conventions'
id "com.github.spotbugs" version "4.7.3"
id 'checkstyle'
// id "com.github.spotbugs" version "4.7.3"
// id 'checkstyle'
id 'org.owasp.dependencycheck' version '8.2.1'
id "io.freefair.lombok" version "8.0.1"
}

spotbugs {
includeFilter = new File(rootDir, 'config/spotbugs/spotbugs-include.xml')
}
//spotbugs {
// includeFilter = new File(rootDir, 'config/spotbugs/spotbugs-include.xml')
//}

checkstyle {
toolVersion = '10.12.3'
configFile = new File(rootDir, 'config/checkstyle/checkstyle.xml')
System.setProperty('checkstyle.cache.file', String.format('%s/%s',
buildDir, 'checkstyle.cachefile'))
}
//checkstyle {
// toolVersion = '10.12.3'
// configFile = new File(rootDir, 'config/checkstyle/checkstyle.xml')
// System.setProperty('checkstyle.cache.file', String.format('%s/%s',
// buildDir, 'checkstyle.cachefile'))
//}

repositories {
mavenCentral()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,8 @@ public LoggingEventBuilder atError(Throwable cause) {
public LoggingEventBuilder atError() {
return logger.makeLoggingEventBuilder(Level.ERROR);
}

public LoggingEventBuilder atTrace() {
return logger.makeLoggingEventBuilder(Level.TRACE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ FROM ubuntu:jammy
ENV DEBIAN_FRONTEND noninteractive

RUN apt-get update && \
apt-get install -y --no-install-recommends python3.9 python3-pip python3-dev gcc libc-dev git curl vim && \
apt-get install -y --no-install-recommends python3.9 python3-pip python3-dev gcc libc-dev git curl vim jq && \
pip3 install urllib3==1.25.11 opensearch-benchmark==1.1.0 awscurl tqdm

COPY runTestBenchmarks.sh /root/
Expand Down
24 changes: 12 additions & 12 deletions TrafficCapture/testUtilities/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,29 @@ buildscript {

plugins {
id 'org.opensearch.migrations.java-library-conventions'
id "com.github.spotbugs" version "4.7.3"
id 'checkstyle'
// id "com.github.spotbugs" version "4.7.3"
// id 'checkstyle'
id 'org.owasp.dependencycheck' version '8.2.1'
id "io.freefair.lombok" version "8.0.1"
}

spotbugs {
includeFilter = new File(rootDir, 'config/spotbugs/spotbugs-include.xml')
}
//spotbugs {
// includeFilter = new File(rootDir, 'config/spotbugs/spotbugs-include.xml')
//}

checkstyle {
toolVersion = '10.12.3'
configFile = new File(rootDir, 'config/checkstyle/checkstyle.xml')
System.setProperty('checkstyle.cache.file', String.format('%s/%s',
buildDir, 'checkstyle.cachefile'))
}
//checkstyle {
// toolVersion = '10.12.3'
// configFile = new File(rootDir, 'config/checkstyle/checkstyle.xml')
// System.setProperty('checkstyle.cache.file', String.format('%s/%s',
// buildDir, 'checkstyle.cachefile'))
//}

repositories {
mavenCentral()
}

dependencies {
spotbugs 'com.github.spotbugs:spotbugs:4.7.3'
// spotbugs 'com.github.spotbugs:spotbugs:4.7.3'

implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.15.0'
implementation group: 'com.google.guava', name: 'guava', version: '32.0.1-jre'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.netty.util.concurrent.GenericFutureListener;
import lombok.Getter;
import lombok.NonNull;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.time.Duration;
Expand Down Expand Up @@ -64,6 +65,27 @@ public static class Stats {
@Getter
Duration totalWaitTimeForCallers = Duration.ZERO;

public Stats() {}

public Stats(long nItemsCreated,
long nItemsExpired,
long nHotGets,
long nColdGets,
Duration totalDurationBuildingItems,
Duration totalWaitTimeForCallers) {
this.nItemsCreated = nItemsCreated;
this.nItemsExpired = nItemsExpired;
this.nHotGets = nHotGets;
this.nColdGets = nColdGets;
this.totalDurationBuildingItems = totalDurationBuildingItems;
this.totalWaitTimeForCallers = totalWaitTimeForCallers;
}

public Stats(Stats o) {
this(o.nItemsCreated, o.nItemsExpired, o.nHotGets, o.nColdGets,
o.totalDurationBuildingItems, o.totalWaitTimeForCallers);
}

@Override
public String toString() {
return new StringJoiner(", ", Stats.class.getSimpleName() + "[", "]")
Expand Down Expand Up @@ -130,8 +152,7 @@ private void addExpiredItem() {
private final EventLoop eventLoop;
private Duration inactivityTimeout;
private GenericFutureListener<F> shuffleInProgressToReady;
@Getter
private Stats stats = new Stats();
private final Stats stats;
private int poolSize;

public ExpiringSubstitutableItemPool(@NonNull Duration inactivityTimeout,
Expand All @@ -153,6 +174,7 @@ public ExpiringSubstitutableItemPool(@NonNull Duration inactivityTimeout,
this.eventLoop = eventLoop;
this.inactivityTimeout = inactivityTimeout;
this.onExpirationConsumer = onExpirationConsumer;
this.stats = new Stats();
this.itemSupplier = () -> {
var startTime = Instant.now();
var rval = itemSupplier.get();
Expand All @@ -177,6 +199,19 @@ public ExpiringSubstitutableItemPool(@NonNull Duration inactivityTimeout,
};
}

@SneakyThrows
public Stats getStats() {
// make a copy on the original thread making changes, which will be up to date at the time of capture and
// immutable for future accessors, making it thread-safe
var copiedStats = eventLoop.submit(()->{
log.atTrace().setMessage(()->"copying stats ("+System.identityHashCode(stats)+")="+stats).log();
return new Stats(stats);
}).get();
log.atTrace()
.setMessage(()->"Got copied value of (" + System.identityHashCode(copiedStats) + ")="+copiedStats).log();
return copiedStats;
}

public int reduceCapacity(int delta) {
assert delta >= 0 : "expected capacity delta to be >= 0";
poolSize -= delta;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,14 @@ void get() throws Exception {
Assertions.assertTrue(pool.getStats().getNItemsExpired() >= 4);

for (int i=1; i<=NUM_POOLED_ITEMS*2; ++i) {
log.info("Pool=" + pool);
Assertions.assertEquals(NUM_POOLED_ITEMS+i, getNextItem(pool));
var nextItemGrabbed = getNextItem(pool);
log.debug("Pool=" + pool + " nextItem="+nextItemGrabbed);
Assertions.assertEquals(NUM_POOLED_ITEMS+i, nextItemGrabbed);
}

Assertions.assertTrue(pool.getStats().getNItemsCreated() >= 15);
var numItemsCreated = pool.getStats().getNItemsCreated();
log.debug("numItemsCreated="+numItemsCreated);
Assertions.assertTrue(numItemsCreated >= 15);
Assertions.assertEquals(11, pool.getStats().getNHotGets()+pool.getStats().getNColdGets());
Assertions.assertTrue(pool.getStats().getNItemsExpired() >= 4);

Expand All @@ -141,7 +144,7 @@ private static Integer getNextItem(ExpiringSubstitutableItemPool<Future<Integer>
private static Integer getIntegerItem(AtomicInteger builtItemCursor,
AtomicReference<Instant> lastCreation,
CountDownLatch countdownLatchToUse) {
log.info("Building item (" +builtItemCursor.hashCode() + ") " + (builtItemCursor.get()+1));
log.debug("Building item (" +builtItemCursor.hashCode() + ") " + (builtItemCursor.get()+1));
countdownLatchToUse.countDown();
lastCreation.set(Instant.now());
return Integer.valueOf(builtItemCursor.incrementAndGet());
Expand Down
3 changes: 1 addition & 2 deletions TrafficCapture/trafficReplayer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ plugins {
id "io.freefair.lombok" version "8.0.1"
}

import org.opensearch.migrations.common.CommonUtils

//spotbugs {
// includeFilter = new File(rootDir, 'config/spotbugs/spotbugs-include.xml')
//}
Expand All @@ -45,6 +43,7 @@ dependencies {
def resilience4jVersion = "1.7.0";

implementation project(':captureProtobufs')
implementation project(':coreUtilities')

implementation group: 'com.beust', name: 'jcommander', version: '1.82'
implementation group: 'com.bazaarvoice.jolt', name: 'jolt-core', version: '0.1.7'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.opensearch.migrations.replay;

import lombok.extern.slf4j.Slf4j;
import org.opensearch.migrations.coreutils.MetricsLogger;
import org.opensearch.migrations.replay.datatypes.UniqueRequestKey;
import org.opensearch.migrations.replay.traffic.expiration.BehavioralPolicy;
import org.opensearch.migrations.replay.traffic.expiration.ExpiringTrafficStreamMap;
Expand Down Expand Up @@ -57,6 +58,8 @@ public class CapturedTrafficToHttpTransactionAccumulator {
private final AtomicInteger connectionsExpiredCounter = new AtomicInteger();
private final AtomicInteger requestsTerminatedUponAccumulatorCloseCounter = new AtomicInteger();

private final static MetricsLogger metricsLogger = new MetricsLogger("CapturedTrafficToHttpTransactionAccumulator");

public CapturedTrafficToHttpTransactionAccumulator(Duration minTimeout,
BiConsumer<UniqueRequestKey,HttpMessageAndTimestamp> requestReceivedHandler,
Consumer<RequestResponsePacketPair> fullDataHandler,
Expand Down Expand Up @@ -240,6 +243,10 @@ private boolean rotateAccumulationOnReadIfNecessary(String connectionId, Accumu
private boolean handleEndOfRequest(Accumulation accumulation) {
assert accumulation.state == Accumulation.State.NOTHING_SENT : "state == " + accumulation.state;
var requestPacketBytes = accumulation.rrPair.requestData;
metricsLogger.atSuccess()
.addKeyValue("requestId", accumulation.getRequestId())
.addKeyValue("connectionId", accumulation.getRequestId().connectionId)
.setMessage("Full captured source request was accumulated").log();
if (requestPacketBytes != null) {
requestHandler.accept(accumulation.getRequestId(), requestPacketBytes);
accumulation.state = Accumulation.State.REQUEST_SENT;
Expand All @@ -254,6 +261,10 @@ private boolean handleEndOfRequest(Accumulation accumulation) {

private void handleEndOfResponse(Accumulation accumulation) {
assert accumulation.state == Accumulation.State.REQUEST_SENT;
metricsLogger.atSuccess()
.addKeyValue("requestId", accumulation.getRequestId())
.addKeyValue("connectionId", accumulation.getRequestId().connectionId)
.setMessage("Full captured source response was accumulated").log();
fullDataHandler.accept(accumulation.rrPair);
accumulation.resetForNextRequest();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ public PacketToTransformingHttpHandlerFactory(IJsonTransformer jsonTransformer,
create(UniqueRequestKey requestKey) {
log.trace("creating HttpJsonTransformingConsumer");
return new HttpJsonTransformingConsumer(jsonTransformer, authTransformerFactory,
new TransformedPacketReceiver(), requestKey.toString());
new TransformedPacketReceiver(), requestKey.toString(), requestKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.ScheduledFuture;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.migrations.coreutils.MetricsLogger;
import org.opensearch.migrations.replay.datatypes.UniqueRequestKey;
import org.opensearch.migrations.replay.util.DiagnosticTrackableCompletableFuture;

Expand All @@ -25,6 +26,7 @@ public class ReplayEngine {
private final AtomicLong lastIdleUpdatedTimestampEpochMs;
private final TimeShifter timeShifter;
private final double maxSpeedupFactor;
private static final MetricsLogger metricsLogger = new MetricsLogger("ReplayEngine");
/**
* If this proves to be a contention bottleneck, we can move to a scheme with ThreadLocals
* and on a scheduled basis, submit work to each thread to find out if they're idle or not.
Expand Down Expand Up @@ -129,6 +131,12 @@ public boolean isWorkOutstanding() {
var start = timeShifter.transformSourceTimeToRealTime(originalStart);
var end = timeShifter.transformSourceTimeToRealTime(originalEnd);
var interval = numPackets > 1 ? Duration.between(start, end).dividedBy(numPackets-1) : Duration.ZERO;
metricsLogger.atSuccess()
.addKeyValue("requestId", requestKey.toString())
.addKeyValue("connectionId", requestKey.connectionId)
.addKeyValue("delayFromOriginalToScheduledStartInMs", Duration.between(originalStart, start).toMillis())
.addKeyValue("scheduledStartTime", start.toString())
.setMessage("Request scheduled to be sent").log();
var sendResult = networkSendOrchestrator.scheduleRequest(requestKey, start, interval, packets);
return hookWorkFinishingUpdates(sendResult, originalStart);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ private long getDelayFromNowMs(Instant to) {
getPacketReceiver(UniqueRequestKey requestKey, ChannelFuture channelFuture,
AtomicReference<NettyPacketToHttpConsumer> packetReceiver) {
if (packetReceiver.get() == null) {
packetReceiver.set(new NettyPacketToHttpConsumer(channelFuture, requestKey.toString()));
packetReceiver.set(new NettyPacketToHttpConsumer(channelFuture, requestKey.toString(), requestKey));
}
return packetReceiver.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public TransformedTargetRequestAndResponse(TransformedPackets requestPackets,
Throwable exception) {
super(0, Duration.ZERO, null, exception);
this.requestPackets = requestPackets;
transformationStatus = transformationStatus;
this.transformationStatus = transformationStatus;
}

public TransformedTargetRequestAndResponse(TransformedPackets requestPackets,
Expand Down
Loading

0 comments on commit 67c8af0

Please sign in to comment.