Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add a load generator to the examples #304

Merged
merged 2 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions examples/cache/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@ repositories {
dependencies {
implementation("software.momento.java:sdk:1.0.0")

implementation("com.google.guava:guava:31.1-android")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the version of guava transitively pulled in by our protos.


// Logging framework to log and enable logging in the Momento client.
implementation("ch.qos.logback:logback-classic:1.4.7")

// Histogram for collecting stats in the load generator
implementation("org.hdrhistogram:HdrHistogram:2.1.12")

// Use JUnit Jupiter for testing.
testImplementation("org.junit.jupiter:junit-jupiter:5.9.2")
}
Expand Down Expand Up @@ -68,6 +73,12 @@ task("dictionary", JavaExec::class) {
mainClass.set("momento.client.example.DictionaryExample")
}

task("loadgen", JavaExec::class) {
description = "Run the load generator"
classpath = sourceSets.main.get().runtimeClasspath
mainClass.set("momento.client.example.LoadGenerator")
}

task("sortedSet", JavaExec::class) {
description = "Run the sorted set example"
classpath = sourceSets.main.get().runtimeClasspath
Expand Down
280 changes: 280 additions & 0 deletions examples/cache/src/main/java/momento/client/example/LoadGenerator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
package momento.client.example;

import com.google.common.util.concurrent.RateLimiter;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiConsumer;
import java.util.function.Function;
import momento.sdk.CacheClient;
import momento.sdk.auth.CredentialProvider;
import momento.sdk.auth.EnvVarCredentialProvider;
import momento.sdk.config.Configurations;
import momento.sdk.exceptions.MomentoErrorCode;
import momento.sdk.exceptions.SdkException;
import momento.sdk.responses.cache.GetResponse;
import momento.sdk.responses.cache.SetResponse;
import org.HdrHistogram.ConcurrentHistogram;
import org.HdrHistogram.Histogram;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings("UnstableApiUsage")
public class LoadGenerator {
private static final String AUTH_TOKEN_ENV_VAR = "MOMENTO_AUTH_TOKEN";
private static final Duration DEFAULT_ITEM_TTL = Duration.ofSeconds(60);
private static final String CACHE_NAME = "java-loadgen";
private static final Logger logger = LoggerFactory.getLogger(LoadGenerator.class);

private final ScheduledExecutorService executorService;
private final RateLimiter rateLimiter;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);

private final int requestsPerSecond;

private final ConcurrentHistogram setHistogram = new ConcurrentHistogram(3);
private final ConcurrentHistogram getHistogram = new ConcurrentHistogram(3);
private final LongAdder globalRequestCount = new LongAdder();
private final LongAdder globalSuccessCount = new LongAdder();
private final LongAdder globalUnavailableCount = new LongAdder();
private final LongAdder globalTimeoutCount = new LongAdder();
private final LongAdder globalLimitExceededCount = new LongAdder();

private final String cacheValue;
private final CacheClient client;

private final long startTime;

public LoadGenerator(
int statsInterval,
int maxConcurrentRequests,
int requestsPerSecond,
int cacheValueLength,
int warmupTime) {
cacheValue = "x".repeat(cacheValueLength);

final CredentialProvider credentialProvider;
try {
credentialProvider = new EnvVarCredentialProvider(AUTH_TOKEN_ENV_VAR);
} catch (SdkException e) {
logger.error("Unable to load credential from environment variable " + AUTH_TOKEN_ENV_VAR, e);
throw e;
}
client =
CacheClient.builder(credentialProvider, Configurations.Laptop.v1(), DEFAULT_ITEM_TTL)
.build();

client.createCache(CACHE_NAME);

executorService = Executors.newScheduledThreadPool(maxConcurrentRequests);
rateLimiter = RateLimiter.create(requestsPerSecond);
this.requestsPerSecond = requestsPerSecond;

startTime = System.currentTimeMillis();

// Schedule initial tasks
for (int i = 0; i < maxConcurrentRequests; ++i) {
scheduleSet(i, 0);
}

// Schedule a histogram reset after the warmup
scheduler.schedule(
() -> {
setHistogram.reset();
getHistogram.reset();
},
warmupTime,
TimeUnit.SECONDS);

// Schedule a task to print the stats
scheduler.scheduleAtFixedRate(this::logInfo, statsInterval, statsInterval, TimeUnit.SECONDS);
}

private void scheduleSet(int workerId, int operationNum) {
scheduleOperation(
workerId,
operationNum,
key -> client.set(CACHE_NAME, key, cacheValue),
(response, operationNumValue) -> {
if (response instanceof SetResponse.Success) {
globalSuccessCount.increment();
} else if (response instanceof SetResponse.Error error) {
handleErrorResponse(error.getErrorCode());
}
scheduleGet(workerId, operationNumValue);
},
setHistogram);
}

private void scheduleGet(int workerId, int operationNum) {
final int nextOperationNum = operationNum + 1;
scheduleOperation(
workerId,
operationNum,
key -> client.get(CACHE_NAME, key),
(response, operationNumValue) -> {
if (response instanceof GetResponse.Hit || response instanceof GetResponse.Miss) {
globalSuccessCount.increment();
} else if (response instanceof GetResponse.Error error) {
handleErrorResponse(error.getErrorCode());
}
scheduleSet(workerId, nextOperationNum);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like unintended else we'll be in an infinite loop of set-get-set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is intended. It schedules alternating write and read tasks that reschedule themselves forever. The throughput is controlled by the rate limiter blocking when the tasks run too quickly. It is broken by the shutdown method that the main method calls at the end of the benchmark.

},
getHistogram);
}

private <T> void scheduleOperation(
int workerId,
int operationNum,
Function<String, CompletableFuture<T>> operation,
BiConsumer<T, Integer> responseHandler,
ConcurrentHistogram histogram) {
final String key = "worker" + workerId + "operation" + operationNum;
executorService.schedule(
() -> {
rateLimiter.acquire();
final long startTime = System.nanoTime();
T response = operation.apply(key).join();
final long endTime = System.nanoTime();

globalRequestCount.increment();
responseHandler.accept(response, operationNum);

histogram.recordValue(endTime - startTime);
},
0,
TimeUnit.MILLISECONDS);
}

private void handleErrorResponse(MomentoErrorCode errorCode) {
switch (errorCode) {
case TIMEOUT_ERROR -> globalTimeoutCount.increment();
case LIMIT_EXCEEDED_ERROR -> globalLimitExceededCount.increment();
}
}

private void logInfo() {
final StringBuilder builder = new StringBuilder();
builder.append("\nCumulative stats:\n");
final long requestCount = globalRequestCount.sum();
builder
.append(
String.format(
"%18s: %d (%.2f) tps, limited to %d tps",
"total requests", requestCount, formatTps(requestCount), requestsPerSecond))
.append('\n');

final long successCount = globalSuccessCount.sum();
builder.append(formatStat("success", requestCount, successCount)).append('\n');

final long unavailableCount = globalUnavailableCount.sum();
builder.append(formatStat("server unavailable", requestCount, unavailableCount)).append('\n');

final long timeoutCount = globalTimeoutCount.sum();
builder.append(formatStat("timeout", requestCount, timeoutCount)).append('\n');

final long limitExceededCount = globalLimitExceededCount.sum();
builder.append(formatStat("limit exceeded", requestCount, limitExceededCount)).append('\n');

builder.append("\nCumulative write latencies:\n");
builder.append(formatHistogram(setHistogram));

builder.append("\nCumulative read latencies:\n");
builder.append(formatHistogram(getHistogram));

logger.info(builder.toString());
}

private String formatStat(String name, long totalRequests, long requests) {
final double requestPercentage;
if (totalRequests == 0) {
requestPercentage = 0.0;
} else {
requestPercentage = (double) requests / totalRequests * 100;
}
return String.format("%18s: %d (%.2f)", name, requests, requestPercentage);
}

private double formatTps(long totalRequests) {
final long elapsedTime = System.currentTimeMillis() - startTime;
return totalRequests * 1000.0 / elapsedTime;
}

private String formatHistogram(Histogram histogram) {
return String.format("%5s: %d\n", "count", histogram.getTotalCount())
+ String.format("%5s: %.2f\n", "min", histogram.getMinValue() / 1_000_000.0)
+ String.format("%5s: %.2f\n", "p50", histogram.getValueAtPercentile(50.0) / 1_000_000.0)
+ String.format("%5s: %.2f\n", "p90", histogram.getValueAtPercentile(90.0) / 1_000_000.0)
+ String.format("%5s: %.2f\n", "p95", histogram.getValueAtPercentile(95.0) / 1_000_000.0)
+ String.format("%5s: %.2f\n", "p96", histogram.getValueAtPercentile(96.0) / 1_000_000.0)
+ String.format("%5s: %.2f\n", "p97", histogram.getValueAtPercentile(97.0) / 1_000_000.0)
+ String.format("%5s: %.2f\n", "p98", histogram.getValueAtPercentile(98.0) / 1_000_000.0)
+ String.format("%5s: %.2f\n", "p99", histogram.getValueAtPercentile(99.0) / 1_000_000.0)
+ String.format("%5s: %.2f\n", "p99.9", histogram.getValueAtPercentile(99.9) / 1_000_000.0)
+ String.format("%5s: %.2f\n", "max", histogram.getMaxValue() / 1_000_000.0);
}

@SuppressWarnings("ResultOfMethodCallIgnored")
public void shutdown() throws InterruptedException {
executorService.shutdown();
scheduler.shutdown();
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
scheduler.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
client.close();
}

public static void main(String[] args) throws InterruptedException {
//
// Each time this amount of time in seconds has passed, statistics about throughput and latency
// will be printed.
//
final int showStatsInterval = 5;
//
// Controls the size of the payload that will be used for the cache items in
// the load test. Smaller payloads will generally provide lower latencies than
// larger payloads.
//
final int cacheItemPayloadBytes = 100;
//
// Controls the number of concurrent requests that will be made (via asynchronous
// function calls) by the load test. Increasing this number may improve throughput,
// but it will also increase CPU consumption. As CPU usage increases and there
// is more contention between the concurrent function calls, client-side latencies
// may increase.
//
final int numberOfConcurrentRequests = 50;
//
// Sets an upper bound on how many requests per second will be sent to the server.
// Momento caches have a default throttling limit of 100 requests per second,
// so if you raise this, you may observe throttled requests. Contact
// [email protected] to inquire about raising your limits.
//
final int maxRequestsPerSecond = 50;
//
// Controls how long the load test will run.
//
final int howLongToRunSeconds = 60;
//
// Controls how long the load generator will run before resetting the histogram.
// Removes outlier times due to client connection or code loading/jit.
final int warmupTimeSeconds = 10;

final LoadGenerator loadGenerator =
new LoadGenerator(
showStatsInterval,
numberOfConcurrentRequests,
maxRequestsPerSecond,
cacheItemPayloadBytes,
warmupTimeSeconds);

// Wait for the desired time
Thread.sleep(howLongToRunSeconds * 1000);

loadGenerator.shutdown();

Thread.sleep(5000);
}
}
Loading