Skip to content

Commit

Permalink
Add DataGenerator tool (#1059)
Browse files Browse the repository at this point in the history
  • Loading branch information
peternied authored Oct 13, 2024
1 parent 0b08287 commit b50a596
Show file tree
Hide file tree
Showing 24 changed files with 1,185 additions and 57 deletions.
93 changes: 93 additions & 0 deletions DataGenerator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Data Generator

This tool is used to generate data for testing a search cluster. The workloads are similar to those of [OpenSearch Benchmark](https://github.com/opensearch-project/OpenSearch-Benchmark).

> **⚠️ Test Infrastructure**
> This tool is for test infrastructure. Features may change without notice, and backward compatibility is not guaranteed.
- [Data Generator](#data-generator)
- [Workloads](#workloads)
- [HttpLogs](#httplogs)
- [Geonames](#geonames)
- [Nested](#nested)
- [NycTaxis](#nyctaxis)
- [Run Data Generator](#run-data-generator)
- [Run workloads programmatically](#run-workloads-programmatically)
- [Generate data via Gradle](#generate-data-via-gradle)

## Workloads

The following workloads are supported and can be controlled with the `--workloads [workload1] [workload2] [...]`: `HttpLogs`, `Geonames`, `Nested`, `NycTaxis`.

### HttpLogs

Multiple indices with HTTP request log file entries that include client IP, timestamp, and request details.

### Geonames

A single index containing a list of geographic features from all over the world.

### Nested

A single index of Stack Overflow questions with user IDs and timestamps, along with answers containing user IDs and timestamps, using the nested mapping type.

### NycTaxis

A single index of taxi trip record data from every time a taxi dropped off a fare in the area.

## Run Data Generator

This tool can be used from the command line and programmatically. The programmatic approach is recommended.

### Run workloads programmatically

Insert the following code into the test case:

```java
import org.opensearch.migrations.data.WorkloadGenerator;
import org.opensearch.migrations.data.WorkloadOptions;

// Create or use an existing OpenSearchClient
var client = new OpenSearchClient(...);

// Create an instance
var generator = new WorkloadGenerator(client);

// Pass workload options to the generate method, in this case using the defaults
generator.generate(new WorkloadOptions());
```

### Generate data via Gradle

To upload data onto a test cluster, use the following command:

```shell
./gradlew DataGenerator:run --args='--target-host http://hostname:9200'
```

<details>
<summary>
Example command output
</summary>

```
$ ./gradlew DataGenerator:run --args=' --target-host https://172.18.0.1:19200 --target-insecure --target-username admin --target-password admin --docs-per-workload-count 1000'
> Task :DataGenerator:run
2024-10-10 17:33:01,247 INFO o.o.m.u.ProcessHelpers [main] getNodeInstanceName()=generated_d0bf496d-1b80-4316-bf38-e3315321a3ef
2024-10-10 17:33:01,249 INFO o.o.m.DataGenerator [main] Starting DataGenerator with workerId=generated_d0bf496d-1b80-4316-bf38-e3315321a3ef
2024-10-10 17:33:01,552 INFO o.o.m.d.WorkloadGenerator [main] Starting document creation
2024-10-10 17:33:02,858 INFO o.o.m.d.WorkloadGenerator [main] All documents queued
2024-10-10 17:33:02,981 INFO o.o.m.d.WorkloadGenerator [main] All documents completed
2024-10-10 17:33:02,981 INFO o.o.m.DataGenerator [main] Generation complete, took 1,429.00ms
Deprecated Gradle features were used in this build, making it incompatible with Gradle 9.0.
You can use '--warning-mode all' to show the individual deprecation warnings and determine if they come from your own scripts or plugins.
See https://docs.gradle.org/8.0.2/userguide/command_line_interface.html#sec:command_line_warnings
BUILD SUCCESSFUL in 4s
25 actionable tasks: 1 executed, 24 up-to-date
```
</details>
40 changes: 40 additions & 0 deletions DataGenerator/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
plugins {
id 'application'
id 'java'
id 'io.freefair.lombok'
}

java.sourceCompatibility = JavaVersion.VERSION_11
java.targetCompatibility = JavaVersion.VERSION_11

dependencies {
implementation project(":coreUtilities")
implementation project(":RFS")

implementation group: 'org.jcommander', name: 'jcommander'
implementation group: 'org.slf4j', name: 'slf4j-api'
implementation group: 'org.apache.logging.log4j', name: 'log4j-slf4j2-impl'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind'
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core'

implementation platform('io.projectreactor:reactor-bom:2023.0.5')
implementation 'io.projectreactor.netty:reactor-netty-core'
implementation 'io.projectreactor.netty:reactor-netty-http'

testImplementation testFixtures(project(':RFS'))
testImplementation testFixtures(project(':testHelperFixtures'))
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-core'
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-slf4j2-impl'
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-api'
testImplementation group: 'org.junit.jupiter', name: 'junit-jupiter-params'
testImplementation group: 'org.mockito', name: 'mockito-core'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter'
testImplementation group: 'org.hamcrest', name: 'hamcrest'
testImplementation group: 'org.testcontainers', name: 'testcontainers'

testRuntimeOnly group: 'org.junit.jupiter', name: 'junit-jupiter-engine'
}

application {
mainClassName = 'org.opensearch.migrations.DataGenerator'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.opensearch.migrations;

import java.text.NumberFormat;

import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.data.WorkloadGenerator;
import org.opensearch.migrations.utils.ProcessHelpers;

import com.beust.jcommander.JCommander;
import lombok.extern.slf4j.Slf4j;


/** Command line tool to generate data on a search cluster */
@Slf4j
public class DataGenerator {

public static void main(String[] args) {
var workerId = ProcessHelpers.getNodeInstanceName();
log.info("Starting DataGenerator with workerId =" + workerId);

var arguments = new DataGeneratorArgs();
var jCommander = JCommander.newBuilder()
.addObject(arguments)
.build();
jCommander.parse(args);

if (arguments.help) {
jCommander.usage();
return;
}

var dataGenerator = new DataGenerator();
dataGenerator.run(arguments);
}

public void run(DataGeneratorArgs arguments) {
var connectionContext = arguments.targetArgs.toConnectionContext();
var client = new OpenSearchClient(connectionContext);

var startTimeMillis = System.currentTimeMillis();
var workloadGenerator = new WorkloadGenerator(client);
workloadGenerator.generate(arguments.workloadOptions);
var generateTimeMillis = System.currentTimeMillis() - startTimeMillis;

log.info("Generation complete, took {}ms", formatMillis(generateTimeMillis));
}

private String formatMillis(long millis) {
var numberFormat = NumberFormat.getInstance();
numberFormat.setMinimumFractionDigits(2);
numberFormat.setMaximumFractionDigits(2);
return numberFormat.format(millis);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.opensearch.migrations;

import org.opensearch.migrations.bulkload.common.http.ConnectionContext;
import org.opensearch.migrations.data.WorkloadOptions;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParametersDelegate;

public class DataGeneratorArgs {
@Parameter(names = {"--help", "-h"}, help = true, description = "Displays information about how to use this tool")
public boolean help;

@ParametersDelegate
public ConnectionContext.TargetArgs targetArgs = new ConnectionContext.TargetArgs();

@ParametersDelegate
public WorkloadOptions workloadOptions = new WorkloadOptions();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.opensearch.migrations.data;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

/** Shared ways to build fields for index mappings */
public class FieldBuilders {
private static final ObjectMapper mapper = new ObjectMapper();

public static ObjectNode createField(String type) {
var field = mapper.createObjectNode();
field.put("type", type);
return field;
}

public static ObjectNode createFieldTextRawKeyword() {
var fieldNode = mapper.createObjectNode();
fieldNode.put("type", "text");
var fieldsNode = mapper.createObjectNode();
fieldsNode.set("raw", createField("keyword"));
fieldNode.set("fields", fieldsNode);
return fieldNode;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.opensearch.migrations.data;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

/** Options index configuration */
public class IndexOptions {
private static final ObjectMapper mapper = new ObjectMapper();

/** Improvement to add more flexibility with these values */
public ObjectNode indexSettings = mapper.createObjectNode()
.put("index.number_of_shards", 5)
.put("index.number_of_replicas", 0)
.put("index.queries.cache.enabled", false)
.put("index.requests.cache.enable", false);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.opensearch.migrations.data;

import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Random;

import lombok.experimental.UtilityClass;

/** Shared ways to build random data */
@UtilityClass
public class RandomDataBuilders {
private static final ZoneId UTC_ZONE = ZoneId.of("UTC");
private static final DateTimeFormatter SIMPLE_DATE_PATTERN = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final int ONE_DAY_IN_MILLIS = 24 * 60 * 60 * 1000;

public static long randomTime(long timeFrom, Random random) {
return timeFrom - random.nextInt(ONE_DAY_IN_MILLIS);
}

public static String randomTimeISOString(long timeFrom, Random random) {
var timeMillis = randomTime(timeFrom, random);
var timeInstant = Instant.ofEpochMilli(timeMillis).atZone(UTC_ZONE);
return SIMPLE_DATE_PATTERN.format(timeInstant);
}

public static double randomDouble(Random random, double min, double max) {
return min + (max - min) * random.nextDouble();
}

public static String randomElement(String[] elements, Random random) {
return elements[random.nextInt(elements.length)];
}

public static int randomElement(int[] elements, Random random) {
return elements[random.nextInt(elements.length)];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package org.opensearch.migrations.data;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.opensearch.migrations.bulkload.common.DocumentReindexer;
import org.opensearch.migrations.bulkload.common.OpenSearchClient;
import org.opensearch.migrations.data.workloads.Workload;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@AllArgsConstructor
public class WorkloadGenerator {

private final OpenSearchClient client;

public void generate(WorkloadOptions options) {
log.info("Starting document creation");

// This workload creates ALL documents in memory, schedules them and waits for completion.
// If larger scale is needed remove the toList() calls and stream all data.
var allDocs = new ArrayList<CompletableFuture<?>>();
for (var workload : options.workloads) {
var workloadInstance = workload.getNewInstance().get();
var docs = workloadInstance
.indexNames()
.stream()
.map(indexName -> generateDocs(indexName, workloadInstance, options))
.flatMap(List::stream)
.collect(Collectors.toList());
allDocs.addAll(docs);
}

log.info("All documents queued");
CompletableFuture.allOf(allDocs.toArray(new CompletableFuture[0])).join();
log.info("All documents completed");
}

private List<CompletableFuture<?>> generateDocs(String indexName, Workload workload, WorkloadOptions options) {
// This happens inline to be sure the index exists before docs are indexed on it
client.createIndex(indexName, workload.createIndex(options.index.indexSettings.deepCopy()), null);

var docIdCounter = new AtomicInteger(0);
var allDocs = workload.createDocs(options.totalDocs)
.map(doc -> new DocumentReindexer.BulkDocSection(indexName + "_ " + docIdCounter.incrementAndGet(), doc.toString()))
.collect(Collectors.toList());

var bulkDocGroups = new ArrayList<List<DocumentReindexer.BulkDocSection>>();
for (int i = 0; i < allDocs.size(); i += options.maxBulkBatchSize) {
bulkDocGroups.add(allDocs.subList(i, Math.min(i + options.maxBulkBatchSize, allDocs.size())));
}

return bulkDocGroups.stream()
.map(docs -> client.sendBulkRequest(indexName, docs, null).toFuture())
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.opensearch.migrations.data;

import java.util.Arrays;
import java.util.List;

import org.opensearch.migrations.data.workloads.Workloads;

import com.beust.jcommander.Parameter;

public class WorkloadOptions {
@Parameter(names = { "--workloads", "-w" }, description = "The list of workloads to run, defaults to all available workloads.", required = false)
public List<Workloads> workloads = Arrays.asList(Workloads.values());

@Parameter(names = { "--docs-per-workload-count" }, description = "The number of documents per workload")
public int totalDocs = 1000;

@Parameter(names = { "--max-bulk-request-batch-count" }, description = "The maximum batch count for bulk requests")
public int maxBulkBatchSize = 50;

public IndexOptions index = new IndexOptions();
}
Loading

0 comments on commit b50a596

Please sign in to comment.