Skip to content

Commit

Permalink
feat: Add a load generator to the examples (#304)
Browse files Browse the repository at this point in the history
* feat: Add a load generator to the examples

Add a load generator example in the style of load generators we've made
for other SDKs. It schedules a configurable number of alternating write
and read tasks, limiting the total tps using a Guava RateLimiter.
  • Loading branch information
nand4011 authored Jul 25, 2023
1 parent 879fcfb commit 8a8ffd6
Show file tree
Hide file tree
Showing 2 changed files with 291 additions and 0 deletions.
11 changes: 11 additions & 0 deletions examples/cache/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,14 @@ repositories {
dependencies {
implementation("software.momento.java:sdk:1.0.0")

implementation("com.google.guava:guava:31.1-android")

// Logging framework to log and enable logging in the Momento client.
implementation("ch.qos.logback:logback-classic:1.4.7")

// Histogram for collecting stats in the load generator
implementation("org.hdrhistogram:HdrHistogram:2.1.12")

// Use JUnit Jupiter for testing.
testImplementation("org.junit.jupiter:junit-jupiter:5.9.2")
}
Expand Down Expand Up @@ -68,6 +73,12 @@ task("dictionary", JavaExec::class) {
mainClass.set("momento.client.example.DictionaryExample")
}

task("loadgen", JavaExec::class) {
description = "Run the load generator"
classpath = sourceSets.main.get().runtimeClasspath
mainClass.set("momento.client.example.LoadGenerator")
}

task("sortedSet", JavaExec::class) {
description = "Run the sorted set example"
classpath = sourceSets.main.get().runtimeClasspath
Expand Down
280 changes: 280 additions & 0 deletions examples/cache/src/main/java/momento/client/example/LoadGenerator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
package momento.client.example;

import com.google.common.util.concurrent.RateLimiter;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiConsumer;
import java.util.function.Function;
import momento.sdk.CacheClient;
import momento.sdk.auth.CredentialProvider;
import momento.sdk.auth.EnvVarCredentialProvider;
import momento.sdk.config.Configurations;
import momento.sdk.exceptions.MomentoErrorCode;
import momento.sdk.exceptions.SdkException;
import momento.sdk.responses.cache.GetResponse;
import momento.sdk.responses.cache.SetResponse;
import org.HdrHistogram.ConcurrentHistogram;
import org.HdrHistogram.Histogram;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings("UnstableApiUsage")
public class LoadGenerator {
private static final String AUTH_TOKEN_ENV_VAR = "MOMENTO_AUTH_TOKEN";
private static final Duration DEFAULT_ITEM_TTL = Duration.ofSeconds(60);
private static final String CACHE_NAME = "java-loadgen";
private static final Logger logger = LoggerFactory.getLogger(LoadGenerator.class);

private final ScheduledExecutorService executorService;
private final RateLimiter rateLimiter;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);

private final int requestsPerSecond;

private final ConcurrentHistogram setHistogram = new ConcurrentHistogram(3);
private final ConcurrentHistogram getHistogram = new ConcurrentHistogram(3);
private final LongAdder globalRequestCount = new LongAdder();
private final LongAdder globalSuccessCount = new LongAdder();
private final LongAdder globalUnavailableCount = new LongAdder();
private final LongAdder globalTimeoutCount = new LongAdder();
private final LongAdder globalLimitExceededCount = new LongAdder();

private final String cacheValue;
private final CacheClient client;

private final long startTime;

public LoadGenerator(
int statsInterval,
int maxConcurrentRequests,
int requestsPerSecond,
int cacheValueLength,
int warmupTime) {
cacheValue = "x".repeat(cacheValueLength);

final CredentialProvider credentialProvider;
try {
credentialProvider = new EnvVarCredentialProvider(AUTH_TOKEN_ENV_VAR);
} catch (SdkException e) {
logger.error("Unable to load credential from environment variable " + AUTH_TOKEN_ENV_VAR, e);
throw e;
}
client =
CacheClient.builder(credentialProvider, Configurations.Laptop.v1(), DEFAULT_ITEM_TTL)
.build();

client.createCache(CACHE_NAME);

executorService = Executors.newScheduledThreadPool(maxConcurrentRequests);
rateLimiter = RateLimiter.create(requestsPerSecond);
this.requestsPerSecond = requestsPerSecond;

startTime = System.currentTimeMillis();

// Schedule initial tasks
for (int i = 0; i < maxConcurrentRequests; ++i) {
scheduleSet(i, 0);
}

// Schedule a histogram reset after the warmup
scheduler.schedule(
() -> {
setHistogram.reset();
getHistogram.reset();
},
warmupTime,
TimeUnit.SECONDS);

// Schedule a task to print the stats
scheduler.scheduleAtFixedRate(this::logInfo, statsInterval, statsInterval, TimeUnit.SECONDS);
}

private void scheduleSet(int workerId, int operationNum) {
scheduleOperation(
workerId,
operationNum,
key -> client.set(CACHE_NAME, key, cacheValue),
(response, operationNumValue) -> {
if (response instanceof SetResponse.Success) {
globalSuccessCount.increment();
} else if (response instanceof SetResponse.Error error) {
handleErrorResponse(error.getErrorCode());
}
scheduleGet(workerId, operationNumValue);
},
setHistogram);
}

private void scheduleGet(int workerId, int operationNum) {
final int nextOperationNum = operationNum + 1;
scheduleOperation(
workerId,
operationNum,
key -> client.get(CACHE_NAME, key),
(response, operationNumValue) -> {
if (response instanceof GetResponse.Hit || response instanceof GetResponse.Miss) {
globalSuccessCount.increment();
} else if (response instanceof GetResponse.Error error) {
handleErrorResponse(error.getErrorCode());
}
scheduleSet(workerId, nextOperationNum);
},
getHistogram);
}

private <T> void scheduleOperation(
int workerId,
int operationNum,
Function<String, CompletableFuture<T>> operation,
BiConsumer<T, Integer> responseHandler,
ConcurrentHistogram histogram) {
final String key = "worker" + workerId + "operation" + operationNum;
executorService.schedule(
() -> {
rateLimiter.acquire();
final long startTime = System.nanoTime();
T response = operation.apply(key).join();
final long endTime = System.nanoTime();

globalRequestCount.increment();
responseHandler.accept(response, operationNum);

histogram.recordValue(endTime - startTime);
},
0,
TimeUnit.MILLISECONDS);
}

private void handleErrorResponse(MomentoErrorCode errorCode) {
switch (errorCode) {
case TIMEOUT_ERROR -> globalTimeoutCount.increment();
case LIMIT_EXCEEDED_ERROR -> globalLimitExceededCount.increment();
}
}

private void logInfo() {
final StringBuilder builder = new StringBuilder();
builder.append("\nCumulative stats:\n");
final long requestCount = globalRequestCount.sum();
builder
.append(
String.format(
"%18s: %d (%.2f) tps, limited to %d tps",
"total requests", requestCount, formatTps(requestCount), requestsPerSecond))
.append('\n');

final long successCount = globalSuccessCount.sum();
builder.append(formatStat("success", requestCount, successCount)).append('\n');

final long unavailableCount = globalUnavailableCount.sum();
builder.append(formatStat("server unavailable", requestCount, unavailableCount)).append('\n');

final long timeoutCount = globalTimeoutCount.sum();
builder.append(formatStat("timeout", requestCount, timeoutCount)).append('\n');

final long limitExceededCount = globalLimitExceededCount.sum();
builder.append(formatStat("limit exceeded", requestCount, limitExceededCount)).append('\n');

builder.append("\nCumulative write latencies:\n");
builder.append(formatHistogram(setHistogram));

builder.append("\nCumulative read latencies:\n");
builder.append(formatHistogram(getHistogram));

logger.info(builder.toString());
}

private String formatStat(String name, long totalRequests, long requests) {
final double requestPercentage;
if (totalRequests == 0) {
requestPercentage = 0.0;
} else {
requestPercentage = (double) requests / totalRequests * 100;
}
return String.format("%18s: %d (%.2f)", name, requests, requestPercentage);
}

private double formatTps(long totalRequests) {
final long elapsedTime = System.currentTimeMillis() - startTime;
return totalRequests * 1000.0 / elapsedTime;
}

private String formatHistogram(Histogram histogram) {
return String.format("%5s: %d\n", "count", histogram.getTotalCount())
+ String.format("%5s: %.2f\n", "min", histogram.getMinValue() / 1_000_000.0)
+ String.format("%5s: %.2f\n", "p50", histogram.getValueAtPercentile(50.0) / 1_000_000.0)
+ String.format("%5s: %.2f\n", "p90", histogram.getValueAtPercentile(90.0) / 1_000_000.0)
+ String.format("%5s: %.2f\n", "p95", histogram.getValueAtPercentile(95.0) / 1_000_000.0)
+ String.format("%5s: %.2f\n", "p96", histogram.getValueAtPercentile(96.0) / 1_000_000.0)
+ String.format("%5s: %.2f\n", "p97", histogram.getValueAtPercentile(97.0) / 1_000_000.0)
+ String.format("%5s: %.2f\n", "p98", histogram.getValueAtPercentile(98.0) / 1_000_000.0)
+ String.format("%5s: %.2f\n", "p99", histogram.getValueAtPercentile(99.0) / 1_000_000.0)
+ String.format("%5s: %.2f\n", "p99.9", histogram.getValueAtPercentile(99.9) / 1_000_000.0)
+ String.format("%5s: %.2f\n", "max", histogram.getMaxValue() / 1_000_000.0);
}

@SuppressWarnings("ResultOfMethodCallIgnored")
public void shutdown() throws InterruptedException {
executorService.shutdown();
scheduler.shutdown();
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
scheduler.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
client.close();
}

public static void main(String[] args) throws InterruptedException {
//
// Each time this amount of time in seconds has passed, statistics about throughput and latency
// will be printed.
//
final int showStatsInterval = 5;
//
// Controls the size of the payload that will be used for the cache items in
// the load test. Smaller payloads will generally provide lower latencies than
// larger payloads.
//
final int cacheItemPayloadBytes = 100;
//
// Controls the number of concurrent requests that will be made (via asynchronous
// function calls) by the load test. Increasing this number may improve throughput,
// but it will also increase CPU consumption. As CPU usage increases and there
// is more contention between the concurrent function calls, client-side latencies
// may increase.
//
final int numberOfConcurrentRequests = 50;
//
// Sets an upper bound on how many requests per second will be sent to the server.
// Momento caches have a default throttling limit of 100 requests per second,
// so if you raise this, you may observe throttled requests. Contact
// [email protected] to inquire about raising your limits.
//
final int maxRequestsPerSecond = 50;
//
// Controls how long the load test will run.
//
final int howLongToRunSeconds = 60;
//
// Controls how long the load generator will run before resetting the histogram.
// Removes outlier times due to client connection or code loading/jit.
final int warmupTimeSeconds = 10;

final LoadGenerator loadGenerator =
new LoadGenerator(
showStatsInterval,
numberOfConcurrentRequests,
maxRequestsPerSecond,
cacheItemPayloadBytes,
warmupTimeSeconds);

// Wait for the desired time
Thread.sleep(howLongToRunSeconds * 1000);

loadGenerator.shutdown();

Thread.sleep(5000);
}
}

0 comments on commit 8a8ffd6

Please sign in to comment.