diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/ElasticsearchType.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/ElasticsearchType.java new file mode 100644 index 000000000..b9c2a9780 --- /dev/null +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/ElasticsearchType.java @@ -0,0 +1,23 @@ +package org.opensearch.migrations.data; + +public enum ElasticsearchType { + DATE("date"), + GEO_POINT("geo_point"), + INTEGER("integer"), + KEYWORD("keyword"), + LONG("long"), + TEXT("text"), + SCALED_FLOAT("scaled_float"), + IP("ip"), + NESTED("nested"); + + private final String value; + + ElasticsearchType(String value) { + this.value = value; + } + + public String getValue() { + return value; + } +} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/FieldBuilders.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/FieldBuilders.java deleted file mode 100644 index b2a922232..000000000 --- a/DataGenerator/src/main/java/org/opensearch/migrations/data/FieldBuilders.java +++ /dev/null @@ -1,24 +0,0 @@ -package org.opensearch.migrations.data; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; - -/** Shared ways to build fields for index mappings */ -public class FieldBuilders { - private static final ObjectMapper mapper = new ObjectMapper(); - - public static ObjectNode createField(String type) { - var field = mapper.createObjectNode(); - field.put("type", type); - return field; - } - - public static ObjectNode createFieldTextRawKeyword() { - var fieldNode = mapper.createObjectNode(); - fieldNode.put("type", "text"); - var fieldsNode = mapper.createObjectNode(); - fieldsNode.set("raw", createField("keyword")); - fieldNode.set("fields", fieldsNode); - return fieldNode; - } -} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/IFieldCreator.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/IFieldCreator.java new file mode 100644 index 000000000..8ee1fb9c9 --- /dev/null +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/IFieldCreator.java @@ -0,0 +1,41 @@ +package org.opensearch.migrations.data; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +/** + * Helpers to build fields for index mappings. + */ +public interface IFieldCreator { + ObjectMapper mapper = new ObjectMapper(); + + default ObjectNode createField(ElasticsearchType type) { + return mapper.createObjectNode().put("type", type.getValue()); + } + + default ObjectNode fieldGeoPoint() { return createField(ElasticsearchType.GEO_POINT); } + default ObjectNode fieldInt() { return createField(ElasticsearchType.INTEGER); } + default ObjectNode fieldIP() { return createField(ElasticsearchType.IP); } + default ObjectNode fieldKeyword() { return createField(ElasticsearchType.KEYWORD); } + default ObjectNode fieldLong() { return createField(ElasticsearchType.LONG); } + default ObjectNode fieldNested() { return createField(ElasticsearchType.NESTED); } + default ObjectNode fieldText() { return createField(ElasticsearchType.TEXT); } + + default ObjectNode fieldRawTextKeyword() { + return mapper.createObjectNode() + .put("type", "text") + .set("fields", mapper.createObjectNode() + .set("raw", createField(ElasticsearchType.KEYWORD))); + } + + default ObjectNode fieldScaledFloat(int scalingFactor) { + return createField(ElasticsearchType.SCALED_FLOAT) + .put("scaling_factor", scalingFactor); + } + default ObjectNode fieldScaledFloat() { return fieldScaledFloat(100); } + + default ObjectNode fieldDate() { return createField(ElasticsearchType.DATE); } + default ObjectNode fieldDateISO() { + return fieldDate().put("format", "yyyy-MM-dd HH:mm:ss"); + } +} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/IRandomDataBuilders.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/IRandomDataBuilders.java new file mode 100644 index 000000000..f3c582503 --- /dev/null +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/IRandomDataBuilders.java @@ -0,0 +1,35 @@ +package org.opensearch.migrations.data; + +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Random; + +/** Shared ways to build random data */ +public interface IRandomDataBuilders { + ZoneId UTC_ZONE = ZoneId.of("UTC"); + DateTimeFormatter SIMPLE_DATE_PATTERN = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + int ONE_DAY_IN_MILLIS = 24 * 60 * 60 * 1000; + + default long randomTime(long timeFrom, Random random) { + return timeFrom - random.nextInt(ONE_DAY_IN_MILLIS); + } + + default String randomTimeISOString(long timeFrom, Random random) { + var timeMillis = randomTime(timeFrom, random); + var timeInstant = Instant.ofEpochMilli(timeMillis).atZone(UTC_ZONE); + return SIMPLE_DATE_PATTERN.format(timeInstant); + } + + default double randomDouble(Random random, double min, double max) { + return min + (max - min) * random.nextDouble(); + } + + default String randomElement(String[] elements, Random random) { + return elements[random.nextInt(elements.length)]; + } + + default int randomElement(int[] elements, Random random) { + return elements[random.nextInt(elements.length)]; + } +} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/IndexOptions.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/IndexOptions.java index 9866cca92..910a483dc 100644 --- a/DataGenerator/src/main/java/org/opensearch/migrations/data/IndexOptions.java +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/IndexOptions.java @@ -8,7 +8,7 @@ public class IndexOptions { private static final ObjectMapper mapper = new ObjectMapper(); /** Improvement to add more flexibility with these values */ - public ObjectNode indexSettings = mapper.createObjectNode() + public final ObjectNode indexSettings = mapper.createObjectNode() .put("index.number_of_shards", 5) .put("index.number_of_replicas", 0) .put("index.queries.cache.enabled", false) diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/RandomDataBuilders.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/RandomDataBuilders.java deleted file mode 100644 index 6ccfe5ea3..000000000 --- a/DataGenerator/src/main/java/org/opensearch/migrations/data/RandomDataBuilders.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.opensearch.migrations.data; - -import java.time.Instant; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; -import java.util.Random; - -import lombok.experimental.UtilityClass; - -/** Shared ways to build random data */ -@UtilityClass -public class RandomDataBuilders { - private static final ZoneId UTC_ZONE = ZoneId.of("UTC"); - private static final DateTimeFormatter SIMPLE_DATE_PATTERN = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); - private static final int ONE_DAY_IN_MILLIS = 24 * 60 * 60 * 1000; - - public static long randomTime(long timeFrom, Random random) { - return timeFrom - random.nextInt(ONE_DAY_IN_MILLIS); - } - - public static String randomTimeISOString(long timeFrom, Random random) { - var timeMillis = randomTime(timeFrom, random); - var timeInstant = Instant.ofEpochMilli(timeMillis).atZone(UTC_ZONE); - return SIMPLE_DATE_PATTERN.format(timeInstant); - } - - public static double randomDouble(Random random, double min, double max) { - return min + (max - min) * random.nextDouble(); - } - - public static String randomElement(String[] elements, Random random) { - return elements[random.nextInt(elements.length)]; - } - - public static int randomElement(int[] elements, Random random) { - return elements[random.nextInt(elements.length)]; - } -} diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadGenerator.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadGenerator.java index f1257b37e..661b6c540 100644 --- a/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadGenerator.java +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadGenerator.java @@ -10,6 +10,8 @@ import org.opensearch.migrations.bulkload.common.OpenSearchClient; import org.opensearch.migrations.data.workloads.Workload; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -43,11 +45,25 @@ public void generate(WorkloadOptions options) { private List> generateDocs(String indexName, Workload workload, WorkloadOptions options) { // This happens inline to be sure the index exists before docs are indexed on it - client.createIndex(indexName, workload.createIndex(options.index.indexSettings.deepCopy()), null); + var indexRequestDoc = workload.createIndex(options.index.indexSettings.deepCopy()); + log.atInfo().setMessage("Creating index {} with {}").addArgument(indexName).addArgument(indexRequestDoc).log(); + client.createIndex(indexName, indexRequestDoc, null); var docIdCounter = new AtomicInteger(0); var allDocs = workload.createDocs(options.totalDocs) - .map(doc -> new DocumentReindexer.BulkDocSection(indexName + "_ " + docIdCounter.incrementAndGet(), doc.toString())) + .map(doc -> { + log.atTrace().setMessage("Created doc for index {}: {}") + .addArgument(indexName) + .addArgument(() -> { + try { + return new ObjectMapper().writeValueAsString(n); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + ).log(); + return new DocumentReindexer.BulkDocSection(indexName + "_" + docIdCounter.incrementAndGet(), doc.toString()); + }) .collect(Collectors.toList()); var bulkDocGroups = new ArrayList>(); diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadOptions.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadOptions.java index 6335d8a26..241818853 100644 --- a/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadOptions.java +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadOptions.java @@ -17,5 +17,5 @@ public class WorkloadOptions { @Parameter(names = { "--max-bulk-request-batch-count" }, description = "The maximum batch count for bulk requests") public int maxBulkBatchSize = 50; - public IndexOptions index = new IndexOptions(); + public final IndexOptions index = new IndexOptions(); } diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Geonames.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Geonames.java index 1728849dc..89cb72435 100644 --- a/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Geonames.java +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Geonames.java @@ -5,20 +5,18 @@ import java.util.stream.IntStream; import java.util.stream.Stream; +import org.opensearch.migrations.data.IFieldCreator; +import org.opensearch.migrations.data.IRandomDataBuilders; + import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import static org.opensearch.migrations.data.FieldBuilders.createField; -import static org.opensearch.migrations.data.FieldBuilders.createFieldTextRawKeyword; -import static org.opensearch.migrations.data.RandomDataBuilders.randomDouble; -import static org.opensearch.migrations.data.RandomDataBuilders.randomElement; - /** * Workload based off of Geonames * https://github.com/opensearch-project/opensearch-benchmark-workloads/tree/main/geonames */ -public class Geonames implements Workload { +public class Geonames implements Workload, IFieldCreator, IRandomDataBuilders { private static final ObjectMapper mapper = new ObjectMapper(); private static final String[] COUNTRY_CODES = { "US", "DE", "FR", "GB", "CN", "IN", "BR" }; @@ -34,36 +32,29 @@ public List indexNames() { */ @Override public ObjectNode createIndex(ObjectNode defaultSettings) { - var properties = mapper.createObjectNode(); - properties.set("geonameid", createField("long")); - properties.set("name", createFieldTextRawKeyword()); - properties.set("asciiname", createFieldTextRawKeyword()); - properties.set("alternatenames", createFieldTextRawKeyword()); - properties.set("feature_class", createFieldTextRawKeyword()); - properties.set("feature_code", createFieldTextRawKeyword()); - properties.set("cc2", createFieldTextRawKeyword()); - properties.set("admin1_code", createFieldTextRawKeyword()); - properties.set("admin2_code", createFieldTextRawKeyword()); - properties.set("admin3_code", createFieldTextRawKeyword()); - properties.set("admin4_code", createFieldTextRawKeyword()); - properties.set("elevation", createField("integer")); - properties.set("population", createField("long")); - properties.set("dem", createFieldTextRawKeyword()); - properties.set("timezone", createFieldTextRawKeyword()); - properties.set("location", createField("geo_point")); - - var countryCodeField = createFieldTextRawKeyword(); - countryCodeField.put("fielddata", true); - properties.set("country_code", countryCodeField); - - var mappings = mapper.createObjectNode(); - mappings.put("dynamic", "strict"); - mappings.set("properties", properties); - - var index = mapper.createObjectNode(); - index.set("mappings", mappings); - index.set("settings", defaultSettings); - return index; + return mapper.createObjectNode() + .set("mappings", mapper.createObjectNode() + .put("dynamic", "strict") + .set("properties", mapper.createObjectNode() + .set("geonameid", fieldLong()) + .set("name", fieldRawTextKeyword()) + .set("asciiname", fieldRawTextKeyword()) + .set("alternatenames", fieldRawTextKeyword()) + .set("feature_class", fieldRawTextKeyword()) + .set("feature_code", fieldRawTextKeyword()) + .set("cc2", fieldRawTextKeyword()) + .set("admin1_code", fieldRawTextKeyword()) + .set("admin2_code", fieldRawTextKeyword()) + .set("admin3_code", fieldRawTextKeyword()) + .set("admin4_code", fieldRawTextKeyword()) + .set("elevation", fieldInt()) + .set("population", fieldLong()) + .set("dem", fieldRawTextKeyword()) + .set("timezone", fieldRawTextKeyword()) + .set("location", fieldGeoPoint()) + .set("country_code", fieldRawTextKeyword() + .put("fielddata", true)))) + .set("settings", defaultSettings); } /** @@ -94,33 +85,32 @@ public Stream createDocs(int numDocs) { // These documents are have a low degree of uniqueness, // there is an opportunity to augment them by using Random more. var random = new Random(i); - var doc = mapper.createObjectNode(); - doc.put("geonameid", i + 1000); - doc.put("name", "City" + (i + 1)); - doc.put("asciiname", "City" + (i + 1)); - doc.put("alternatenames", "City" + (i + 1)); - doc.put("feature_class", "FCl" + (i + 1)); - doc.put("feature_code", "FCo" + (i + 1)); - doc.put("country_code", randomCountryCode(random)); - doc.put("cc2", "cc2" + (i + 1)); - doc.put("admin1_code", "admin" + (i + 1)); - doc.put("population", random.nextInt(1000)); - doc.put("dem", random.nextInt(1000) + ""); - doc.put("timezone", "TZ" + (i + 1)); - doc.set("location", randomLocation(random)); - return doc; + return mapper.createObjectNode() + .put("geonameid", i + 1000) + .put("name", "City" + (i + 1)) + .put("asciiname", "City" + (i + 1)) + .put("alternatenames", "City" + (i + 1)) + .put("feature_class", "FCl" + (i + 1)) + .put("feature_code", "FCo" + (i + 1)) + .put("country_code", randomCountryCode(random)) + .put("cc2", "cc2" + (i + 1)) + .put("admin1_code", "admin" + (i + 1)) + .put("population", random.nextInt(1000)) + .put("dem", random.nextInt(1000) + "") + .put("timezone", "TZ" + (i + 1)) + .set("location", randomLocation(random)); } ); } - private static ArrayNode randomLocation(Random random) { + private ArrayNode randomLocation(Random random) { var location = mapper.createArrayNode(); location.add(randomDouble(random, -180, 180)); // Longitude location.add(randomDouble(random, -90, 90)); // Latitude return location; } - private static String randomCountryCode(Random random) { + private String randomCountryCode(Random random) { return randomElement(COUNTRY_CODES, random); } } diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/HttpLogs.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/HttpLogs.java index 182a6287e..c20543040 100644 --- a/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/HttpLogs.java +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/HttpLogs.java @@ -5,19 +5,17 @@ import java.util.stream.IntStream; import java.util.stream.Stream; +import org.opensearch.migrations.data.IFieldCreator; +import org.opensearch.migrations.data.IRandomDataBuilders; + import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import static org.opensearch.migrations.data.FieldBuilders.createField; -import static org.opensearch.migrations.data.FieldBuilders.createFieldTextRawKeyword; -import static org.opensearch.migrations.data.RandomDataBuilders.randomElement; -import static org.opensearch.migrations.data.RandomDataBuilders.randomTime; - /** * Workload based off of http_logs * https://github.com/opensearch-project/opensearch-benchmark-workloads/tree/main/http_logs */ -public class HttpLogs implements Workload { +public class HttpLogs implements Workload, IFieldCreator, IRandomDataBuilders { private static final ObjectMapper mapper = new ObjectMapper(); private static final String[] HTTP_METHODS = { "GET", "POST", "PUT", "DELETE" }; @@ -45,37 +43,27 @@ public List indexNames() { */ @Override public ObjectNode createIndex(ObjectNode defaultSettings) { - var properties = mapper.createObjectNode(); - var timestamp = createField("date"); - timestamp.put("format", "strict_date_optional_time||epoch_second"); - properties.set("@timestamp", timestamp); - var message = createField("keyword"); - message.put("index", false); - message.put("doc_values", false); - properties.set("message", message); - properties.set("clientip", createField("ip")); - var request = createFieldTextRawKeyword(); - var requestRaw = (ObjectNode) request.get("fields").get("raw"); - requestRaw.put("ignore_above", 256); - properties.set("request", request); - properties.set("status", createField("integer")); - properties.set("size", createField("integer")); - var geoip = mapper.createObjectNode(); - var geoipProps = mapper.createObjectNode(); - geoip.set("properties", geoipProps); - geoipProps.set("country_name", createField("keyword")); - geoipProps.set("city_name", createField("keyword")); - geoipProps.set("location", createField("geo_point")); - properties.set("geoip", geoip); - - var mappings = mapper.createObjectNode(); - mappings.put("dynamic", "strict"); - mappings.set("properties", properties); - - var index = mapper.createObjectNode(); - index.set("mappings", mappings); - index.set("settings", defaultSettings); - return index; + return mapper.createObjectNode() + .set("mappings", mapper.createObjectNode() + .put("dynamic", "strict") + .set("properties", mapper.createObjectNode() + .set("@timestamp", fieldDate() + .put("format", "strict_date_optional_time||epoch_second")) + .set("message", fieldKeyword() + .put("index", false) + .put("doc_values", false)) + .set("clientip", fieldIP()) + .set("request", + ((ObjectNode) fieldRawTextKeyword().get("fields").get("raw")) + .put("ignore_above", 256)) + .set("status", fieldInt()) + .set("size", fieldInt()) + .set("geoip", mapper.createObjectNode() + .set("properties", mapper.createObjectNode() + .set("country_name", fieldKeyword()) + .set("city_name", fieldKeyword()) + .set("location", fieldGeoPoint()))))) + .set("settings", defaultSettings); } /** @@ -95,13 +83,12 @@ public Stream createDocs(int numDocs) { return IntStream.range(0, numDocs) .mapToObj(i -> { var random = new Random(i); - ObjectNode doc = mapper.createObjectNode(); - doc.put("@timestamp", randomTime(currentTime, random)); - doc.put("clientip", randomIpAddress(random)); - doc.put("request", randomRequest(random)); - doc.put("status", randomStatus(random)); - doc.put("size", randomResponseSize(random)); - return doc; + return mapper.createObjectNode() + .put("@timestamp", randomTime(currentTime, random)) + .put("clientip", randomIpAddress(random)) + .put("request", randomRequest(random)) + .put("status", randomStatus(random)) + .put("size", randomResponseSize(random)); } ); } @@ -110,23 +97,23 @@ private static String randomIpAddress(Random random) { return random.nextInt(256) + "." + random.nextInt(256) + "." + random.nextInt(256) + "." + random.nextInt(256); } - private static String randomHttpMethod(Random random) { + private String randomHttpMethod(Random random) { return randomElement(HTTP_METHODS, random); } - private static String randomRequest(Random random) { + private String randomRequest(Random random) { return randomHttpMethod(random) + " " + randomUrl(random) + " HTTP/1.0"; } - private static String randomUrl(Random random) { + private String randomUrl(Random random) { return randomElement(URLS, random); } - private static int randomStatus(Random random) { + private int randomStatus(Random random) { return randomElement(RESPONSE_CODES, random); } - private static int randomResponseSize(Random random) { + private int randomResponseSize(Random random) { return random.nextInt(50 * 1024 * 1024); } } diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Nested.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Nested.java index f8ba62869..038202a43 100644 --- a/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Nested.java +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Nested.java @@ -3,23 +3,23 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; -import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; +import org.opensearch.migrations.data.IFieldCreator; +import org.opensearch.migrations.data.IRandomDataBuilders; + import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; - -import static org.opensearch.migrations.data.FieldBuilders.createField; -import static org.opensearch.migrations.data.RandomDataBuilders.randomElement; -import static org.opensearch.migrations.data.RandomDataBuilders.randomTime; +import lombok.extern.slf4j.Slf4j; /** * Workload based off of nested * https://github.com/opensearch-project/opensearch-benchmark-workloads/tree/main/nested */ -public class Nested implements Workload { +@Slf4j +public class Nested implements Workload, IFieldCreator, IRandomDataBuilders { private static final ObjectMapper mapper = new ObjectMapper(); private static final String[] USER_NAMES = { @@ -45,29 +45,21 @@ public List indexNames() { */ @Override public ObjectNode createIndex(ObjectNode defaultSettings) { - var properties = mapper.createObjectNode(); - properties.set("user", createField("keyword")); - properties.set("creationDate", createField("date")); - properties.set("title", createField("text")); - properties.set("qid", createField("keyword")); - properties.set("tag", createField("keyword")); - properties.set("answer_count", createField("integer")); - var answers = createField("nested"); - var answersProps = mapper.createObjectNode(); - answers.set("properties", answersProps); - answersProps.set("user", createField("keyword")); - answersProps.set("date", createField("date")); - properties.set("answers", answers); - - var mappings = mapper.createObjectNode(); - mappings.put("dynamic", "strict"); - mappings.set("properties", properties); - - var index = mapper.createObjectNode(); - index.set("mappings", mappings); - index.set("settings", defaultSettings); - - return index; + return mapper.createObjectNode() + .set("mappings", mapper.createObjectNode() + .put("dynamic", "strict") + .set("properties", mapper.createObjectNode() + .set("user", fieldKeyword()) + .set("creationDate", fieldText()) + .set("title", fieldText()) + .set("qid", fieldKeyword()) + .set("tag", fieldKeyword()) + .set("answer_count", fieldInt()) + .set("answers", fieldNested() + .set("properties", mapper.createObjectNode() + .set("user", fieldKeyword()) + .set("date", fieldDate()))))) + .set("settings", defaultSettings); } /** @@ -96,38 +88,36 @@ public Stream createDocs(int numDocs) { .mapToObj(i -> { var random = new Random(i); var creationTime = randomTime(currentTime, random); - var doc = mapper.createObjectNode(); - doc.put("title", randomTitle(random)); - doc.put("qid", (i + 1000) + ""); - doc.set("answers", randomAnswers(mapper, creationTime, random)); - doc.set("tag", randomTags(random)); - doc.put("user", randomUser(random)); - doc.put("creationDate", creationTime); - return doc; + return mapper.createObjectNode() + .put("title", randomTitle(random)) + .put("qid", (i + 1000) + "") + .set("answers", randomAnswers(mapper, creationTime, random)) + .set("tag", randomTags(random)) + .put("user", randomUser(random)) + .put("creationDate", creationTime); } ); } - private static ArrayNode randomAnswers(ObjectMapper mapper, long timeFrom, Random random) { + private ArrayNode randomAnswers(ObjectMapper mapper, long timeFrom, Random random) { var answers = mapper.createArrayNode(); var numAnswers = random.nextInt(5) + 1; for (int i = 0; i < numAnswers; i++) { - var answer = mapper.createObjectNode(); - answer.put("date", randomTime(timeFrom, random)); - answer.put("user", randomUser(random)); - + var answer = mapper.createObjectNode() + .put("date", randomTime(timeFrom, random)) + .put("user", randomUser(random)); answers.add(answer); } return answers; } - private static String randomUser(Random random) { + private String randomUser(Random random) { // Extra random int simulates more users return randomElement(USER_NAMES, random) + " (" + (random.nextInt(10) + 1000) + ")"; } - private static ArrayNode randomTags(Random random) { + private ArrayNode randomTags(Random random) { var tags = mapper.createArrayNode(); var tagsToCreate = random.nextInt(3) + 1; @@ -137,13 +127,13 @@ private static ArrayNode randomTags(Random random) { return tags; } - private static String randomTitle(Random random) { + private String randomTitle(Random random) { var titleWordLength = random.nextInt(5); var words = new ArrayList(); for (int i = 0; i < titleWordLength; i++) { words.add(randomElement(WORDS, random) + "" + random.nextInt(10)); // Extra random int simulates more words } - return words.stream().collect(Collectors.joining(" ")); + return String.join(" ", words); } } diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/NycTaxis.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/NycTaxis.java index 26fa6da49..61ddbb5fd 100644 --- a/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/NycTaxis.java +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/NycTaxis.java @@ -5,20 +5,18 @@ import java.util.stream.IntStream; import java.util.stream.Stream; +import org.opensearch.migrations.data.IFieldCreator; +import org.opensearch.migrations.data.IRandomDataBuilders; + import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import static org.opensearch.migrations.data.FieldBuilders.createField; -import static org.opensearch.migrations.data.RandomDataBuilders.randomDouble; -import static org.opensearch.migrations.data.RandomDataBuilders.randomElement; -import static org.opensearch.migrations.data.RandomDataBuilders.randomTimeISOString; - /** * Workload based off of nyc_taxis * https://github.com/opensearch-project/opensearch-benchmark-workloads/tree/main/nyc_taxis */ -public class NycTaxis implements Workload { +public class NycTaxis implements Workload, IFieldCreator, IRandomDataBuilders { private static final ObjectMapper mapper = new ObjectMapper(); private static final String[] TRIP_TYPES = {"1" , "2"}; @@ -37,54 +35,33 @@ public List indexNames() { */ @Override public ObjectNode createIndex(ObjectNode defaultSettings) { - var properties = mapper.createObjectNode(); - properties.set("cab_color", createField("keyword")); - properties.set("dropoff_datetime", createDateField()); - properties.set("dropoff_location", createField("geo_point")); - properties.set("ehail_fee", createScaledFloatField()); - properties.set("extra", createScaledFloatField()); - properties.set("fare_amount", createScaledFloatField()); - properties.set("improvement_surcharge", createScaledFloatField()); - properties.set("mta_tax", createScaledFloatField()); - properties.set("passenger_count", createField("integer")); - properties.set("payment_type", createField("keyword")); - properties.set("pickup_datetime", createDateField()); - properties.set("pickup_location", createField("geo_point")); - properties.set("rate_code_id", createField("keyword")); - properties.set("store_and_fwd_flag", createField("keyword")); - properties.set("surcharge", createScaledFloatField()); - properties.set("tip_amount", createScaledFloatField()); - properties.set("tolls_amount", createScaledFloatField()); - properties.set("total_amount", createScaledFloatField()); - properties.set("trip_distance", createScaledFloatField()); - properties.set("trip_type", createField("keyword")); - properties.set("vendor_id", createField("keyword")); - properties.set("vendor_name", createField("text")); - - - var mappings = mapper.createObjectNode(); - mappings.set("properties", properties); - mappings.put("dynamic", "strict"); - - var index = mapper.createObjectNode(); - index.set("mappings", mappings); - index.set("settings", defaultSettings); - - return index; - } - - private static ObjectNode createScaledFloatField() { - var property = mapper.createObjectNode(); - property.put("type", "scaled_float"); - property.put("scaling_factor", 100); - return property; - } - - private static ObjectNode createDateField() { - var field = mapper.createObjectNode(); - field.put("type", "date"); - field.put("format", "yyyy-MM-dd HH:mm:ss"); - return field; + return mapper.createObjectNode() + .set("mappings", mapper.createObjectNode() + .set("properties", mapper.createObjectNode() + .set("cab_color", fieldKeyword()) + .set("dropoff_datetime", fieldDateISO()) + .set("dropoff_location", fieldGeoPoint()) + .set("ehail_fee", fieldScaledFloat()) + .set("extra", fieldScaledFloat()) + .set("fare_amount", fieldScaledFloat()) + .set("improvement_surcharge", fieldScaledFloat()) + .set("mta_tax", fieldScaledFloat()) + .set("passenger_count", fieldInt()) + .set("payment_type", fieldKeyword()) + .set("pickup_datetime", fieldDateISO()) + .set("pickup_location", fieldGeoPoint()) + .set("rate_code_id", fieldKeyword()) + .set("store_and_fwd_flag", fieldKeyword()) + .set("surcharge", fieldScaledFloat()) + .set("tip_amount", fieldScaledFloat()) + .set("tolls_amount", fieldScaledFloat()) + .set("total_amount", fieldScaledFloat()) + .set("trip_distance", fieldScaledFloat()) + .set("trip_type", fieldKeyword()) + .set("vendor_id", fieldKeyword()) + .set("vendor_name", fieldText())) + .put("dynamic", "strict")) + .set("settings", defaultSettings); } /** @@ -123,51 +100,49 @@ public Stream createDocs(int numDocs) { return IntStream.range(0, numDocs) .mapToObj(i -> { var random = new Random(i); - var doc = mapper.createObjectNode(); - doc.put("total_amount", randomDouble(random, 5.0, 50.0)); - doc.put("improvement_surcharge", 0.3); - doc.set("pickup_location", randomLocationInNyc(random)); - doc.put("pickup_datetime", randomTimeISOString(currentTime, random)); - doc.put("trip_type", randomTripType(random)); - doc.put("dropoff_datetime", randomTimeISOString(currentTime, random)); - doc.put("rate_code_id", "1"); - doc.put("tolls_amount", randomDouble(random, 0.0, 5.0)); - doc.set("dropoff_location", randomLocationInNyc(random)); - doc.put("passenger_count", random.nextInt(4) + 1); - doc.put("fare_amount", randomDouble(random, 5.0, 50.0)); - doc.put("extra", randomDouble(random, 0.0, 1.0)); - doc.put("trip_distance", randomDouble(random, 0.5, 20.0)); - doc.put("tip_amount", randomDouble(random, 0.0, 15.0)); - doc.put("store_and_fwd_flag", randomStoreAndFwdFlag(random)); - doc.put("payment_type", randomPaymentType(random)); - doc.put("mta_tax", 0.5); - doc.put("vendor_id", randomVendorId(random)); - - return doc; + return mapper.createObjectNode() + .put("total_amount", randomDouble(random, 5.0, 50.0)) + .put("improvement_surcharge", 0.3) + .set("pickup_location", randomLocationInNyc(random)) + .put("pickup_datetime", randomTimeISOString(currentTime, random)) + .put("trip_type", randomTripType(random)) + .put("dropoff_datetime", randomTimeISOString(currentTime, random)) + .put("rate_code_id", "1") + .put("tolls_amount", randomDouble(random, 0.0, 5.0)) + .set("dropoff_location", randomLocationInNyc(random)) + .put("passenger_count", random.nextInt(4) + 1) + .put("fare_amount", randomDouble(random, 5.0, 50.0)) + .put("extra", randomDouble(random, 0.0, 1.0)) + .put("trip_distance", randomDouble(random, 0.5, 20.0)) + .put("tip_amount", randomDouble(random, 0.0, 15.0)) + .put("store_and_fwd_flag", randomStoreAndFwdFlag(random)) + .put("payment_type", randomPaymentType(random)) + .put("mta_tax", 0.5) + .put("vendor_id", randomVendorId(random)); } ); } - private static ArrayNode randomLocationInNyc(Random random) { + private ArrayNode randomLocationInNyc(Random random) { var location = mapper.createArrayNode(); location.add(randomDouble(random, -74.05, -73.75)); // Longitude location.add(randomDouble(random, 40.63, 40.85)); // Latitude return location; } - private static String randomTripType(Random random) { + private String randomTripType(Random random) { return randomElement(TRIP_TYPES, random); } - private static String randomPaymentType(Random random) { + private String randomPaymentType(Random random) { return randomElement(PAYMENT_TYPES, random); } - private static String randomStoreAndFwdFlag(Random random) { + private String randomStoreAndFwdFlag(Random random) { return randomElement(STORE_AND_FWD_FLAGS, random); } - private static String randomVendorId(Random random) { + private String randomVendorId(Random random) { return randomElement(VENDOR_IDS, random); } } diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Workloads.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Workloads.java index 7ac08b976..8aa702b50 100644 --- a/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Workloads.java +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/workloads/Workloads.java @@ -7,10 +7,10 @@ @AllArgsConstructor public enum Workloads { - Geonames(Geonames::new), - HttpLogs(HttpLogs::new), - Nested(Nested::new), - NycTaxis(NycTaxis::new); + GEONAMES(Geonames::new), + HTTP_LOGS(HttpLogs::new), + NESTED(Nested::new), + NYC_TAXIS(NycTaxis::new); @Getter private Supplier newInstance; diff --git a/DataGenerator/src/test/resources/log4j2.properties b/DataGenerator/src/test/resources/log4j2.properties index 6def22d58..16cef4eee 100644 --- a/DataGenerator/src/test/resources/log4j2.properties +++ b/DataGenerator/src/test/resources/log4j2.properties @@ -6,7 +6,7 @@ appender.console.type = Console appender.console.name = Console appender.console.target = SYSTEM_OUT appender.console.layout.type = PatternLayout -appender.console.layout.pattern = %m%n +appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS}{UTC} %p %c{1.} [%t] %m%n property.ownedPackagesLogLevel=${sys:migrationLogLevel:-DEBUG} diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java index cb0cd40d8..c3f3a761e 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/OpenSearchClient.java @@ -373,6 +373,8 @@ public Mono sendBulkRequest(String indexName, List